| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- """Matrix factorization with Sparse PCA."""
- # Author: Vlad Niculae, Gael Varoquaux, Alexandre Gramfort
- # License: BSD 3 clause
- from numbers import Integral, Real
- import numpy as np
- from ..base import (
- BaseEstimator,
- ClassNamePrefixFeaturesOutMixin,
- TransformerMixin,
- _fit_context,
- )
- from ..linear_model import ridge_regression
- from ..utils import check_random_state
- from ..utils._param_validation import Hidden, Interval, StrOptions
- from ..utils.extmath import svd_flip
- from ..utils.validation import check_array, check_is_fitted
- from ._dict_learning import MiniBatchDictionaryLearning, dict_learning
- class _BaseSparsePCA(ClassNamePrefixFeaturesOutMixin, TransformerMixin, BaseEstimator):
- """Base class for SparsePCA and MiniBatchSparsePCA"""
- _parameter_constraints: dict = {
- "n_components": [None, Interval(Integral, 1, None, closed="left")],
- "alpha": [Interval(Real, 0.0, None, closed="left")],
- "ridge_alpha": [Interval(Real, 0.0, None, closed="left")],
- "max_iter": [Interval(Integral, 0, None, closed="left")],
- "tol": [Interval(Real, 0.0, None, closed="left")],
- "method": [StrOptions({"lars", "cd"})],
- "n_jobs": [Integral, None],
- "verbose": ["verbose"],
- "random_state": ["random_state"],
- }
- def __init__(
- self,
- n_components=None,
- *,
- alpha=1,
- ridge_alpha=0.01,
- max_iter=1000,
- tol=1e-8,
- method="lars",
- n_jobs=None,
- verbose=False,
- random_state=None,
- ):
- self.n_components = n_components
- self.alpha = alpha
- self.ridge_alpha = ridge_alpha
- self.max_iter = max_iter
- self.tol = tol
- self.method = method
- self.n_jobs = n_jobs
- self.verbose = verbose
- self.random_state = random_state
- @_fit_context(prefer_skip_nested_validation=True)
- def fit(self, X, y=None):
- """Fit the model from data in X.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- Training vector, where `n_samples` is the number of samples
- and `n_features` is the number of features.
- y : Ignored
- Not used, present here for API consistency by convention.
- Returns
- -------
- self : object
- Returns the instance itself.
- """
- random_state = check_random_state(self.random_state)
- X = self._validate_data(X)
- self.mean_ = X.mean(axis=0)
- X = X - self.mean_
- if self.n_components is None:
- n_components = X.shape[1]
- else:
- n_components = self.n_components
- return self._fit(X, n_components, random_state)
- def transform(self, X):
- """Least Squares projection of the data onto the sparse components.
- To avoid instability issues in case the system is under-determined,
- regularization can be applied (Ridge regression) via the
- `ridge_alpha` parameter.
- Note that Sparse PCA components orthogonality is not enforced as in PCA
- hence one cannot use a simple linear projection.
- Parameters
- ----------
- X : ndarray of shape (n_samples, n_features)
- Test data to be transformed, must have the same number of
- features as the data used to train the model.
- Returns
- -------
- X_new : ndarray of shape (n_samples, n_components)
- Transformed data.
- """
- check_is_fitted(self)
- X = self._validate_data(X, reset=False)
- X = X - self.mean_
- U = ridge_regression(
- self.components_.T, X.T, self.ridge_alpha, solver="cholesky"
- )
- return U
- def inverse_transform(self, X):
- """Transform data from the latent space to the original space.
- This inversion is an approximation due to the loss of information
- induced by the forward decomposition.
- .. versionadded:: 1.2
- Parameters
- ----------
- X : ndarray of shape (n_samples, n_components)
- Data in the latent space.
- Returns
- -------
- X_original : ndarray of shape (n_samples, n_features)
- Reconstructed data in the original space.
- """
- check_is_fitted(self)
- X = check_array(X)
- return (X @ self.components_) + self.mean_
- @property
- def _n_features_out(self):
- """Number of transformed output features."""
- return self.components_.shape[0]
- def _more_tags(self):
- return {
- "preserves_dtype": [np.float64, np.float32],
- }
- class SparsePCA(_BaseSparsePCA):
- """Sparse Principal Components Analysis (SparsePCA).
- Finds the set of sparse components that can optimally reconstruct
- the data. The amount of sparseness is controllable by the coefficient
- of the L1 penalty, given by the parameter alpha.
- Read more in the :ref:`User Guide <SparsePCA>`.
- Parameters
- ----------
- n_components : int, default=None
- Number of sparse atoms to extract. If None, then ``n_components``
- is set to ``n_features``.
- alpha : float, default=1
- Sparsity controlling parameter. Higher values lead to sparser
- components.
- ridge_alpha : float, default=0.01
- Amount of ridge shrinkage to apply in order to improve
- conditioning when calling the transform method.
- max_iter : int, default=1000
- Maximum number of iterations to perform.
- tol : float, default=1e-8
- Tolerance for the stopping condition.
- method : {'lars', 'cd'}, default='lars'
- Method to be used for optimization.
- lars: uses the least angle regression method to solve the lasso problem
- (linear_model.lars_path)
- cd: uses the coordinate descent method to compute the
- Lasso solution (linear_model.Lasso). Lars will be faster if
- the estimated components are sparse.
- n_jobs : int, default=None
- Number of parallel jobs to run.
- ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
- ``-1`` means using all processors. See :term:`Glossary <n_jobs>`
- for more details.
- U_init : ndarray of shape (n_samples, n_components), default=None
- Initial values for the loadings for warm restart scenarios. Only used
- if `U_init` and `V_init` are not None.
- V_init : ndarray of shape (n_components, n_features), default=None
- Initial values for the components for warm restart scenarios. Only used
- if `U_init` and `V_init` are not None.
- verbose : int or bool, default=False
- Controls the verbosity; the higher, the more messages. Defaults to 0.
- random_state : int, RandomState instance or None, default=None
- Used during dictionary learning. Pass an int for reproducible results
- across multiple function calls.
- See :term:`Glossary <random_state>`.
- Attributes
- ----------
- components_ : ndarray of shape (n_components, n_features)
- Sparse components extracted from the data.
- error_ : ndarray
- Vector of errors at each iteration.
- n_components_ : int
- Estimated number of components.
- .. versionadded:: 0.23
- n_iter_ : int
- Number of iterations run.
- mean_ : ndarray of shape (n_features,)
- Per-feature empirical mean, estimated from the training set.
- Equal to ``X.mean(axis=0)``.
- n_features_in_ : int
- Number of features seen during :term:`fit`.
- .. versionadded:: 0.24
- feature_names_in_ : ndarray of shape (`n_features_in_`,)
- Names of features seen during :term:`fit`. Defined only when `X`
- has feature names that are all strings.
- .. versionadded:: 1.0
- See Also
- --------
- PCA : Principal Component Analysis implementation.
- MiniBatchSparsePCA : Mini batch variant of `SparsePCA` that is faster but less
- accurate.
- DictionaryLearning : Generic dictionary learning problem using a sparse code.
- Examples
- --------
- >>> import numpy as np
- >>> from sklearn.datasets import make_friedman1
- >>> from sklearn.decomposition import SparsePCA
- >>> X, _ = make_friedman1(n_samples=200, n_features=30, random_state=0)
- >>> transformer = SparsePCA(n_components=5, random_state=0)
- >>> transformer.fit(X)
- SparsePCA(...)
- >>> X_transformed = transformer.transform(X)
- >>> X_transformed.shape
- (200, 5)
- >>> # most values in the components_ are zero (sparsity)
- >>> np.mean(transformer.components_ == 0)
- 0.9666...
- """
- _parameter_constraints: dict = {
- **_BaseSparsePCA._parameter_constraints,
- "U_init": [None, np.ndarray],
- "V_init": [None, np.ndarray],
- }
- def __init__(
- self,
- n_components=None,
- *,
- alpha=1,
- ridge_alpha=0.01,
- max_iter=1000,
- tol=1e-8,
- method="lars",
- n_jobs=None,
- U_init=None,
- V_init=None,
- verbose=False,
- random_state=None,
- ):
- super().__init__(
- n_components=n_components,
- alpha=alpha,
- ridge_alpha=ridge_alpha,
- max_iter=max_iter,
- tol=tol,
- method=method,
- n_jobs=n_jobs,
- verbose=verbose,
- random_state=random_state,
- )
- self.U_init = U_init
- self.V_init = V_init
- def _fit(self, X, n_components, random_state):
- """Specialized `fit` for SparsePCA."""
- code_init = self.V_init.T if self.V_init is not None else None
- dict_init = self.U_init.T if self.U_init is not None else None
- code, dictionary, E, self.n_iter_ = dict_learning(
- X.T,
- n_components,
- alpha=self.alpha,
- tol=self.tol,
- max_iter=self.max_iter,
- method=self.method,
- n_jobs=self.n_jobs,
- verbose=self.verbose,
- random_state=random_state,
- code_init=code_init,
- dict_init=dict_init,
- return_n_iter=True,
- )
- # flip eigenvectors' sign to enforce deterministic output
- code, dictionary = svd_flip(code, dictionary, u_based_decision=False)
- self.components_ = code.T
- components_norm = np.linalg.norm(self.components_, axis=1)[:, np.newaxis]
- components_norm[components_norm == 0] = 1
- self.components_ /= components_norm
- self.n_components_ = len(self.components_)
- self.error_ = E
- return self
- class MiniBatchSparsePCA(_BaseSparsePCA):
- """Mini-batch Sparse Principal Components Analysis.
- Finds the set of sparse components that can optimally reconstruct
- the data. The amount of sparseness is controllable by the coefficient
- of the L1 penalty, given by the parameter alpha.
- Read more in the :ref:`User Guide <SparsePCA>`.
- Parameters
- ----------
- n_components : int, default=None
- Number of sparse atoms to extract. If None, then ``n_components``
- is set to ``n_features``.
- alpha : int, default=1
- Sparsity controlling parameter. Higher values lead to sparser
- components.
- ridge_alpha : float, default=0.01
- Amount of ridge shrinkage to apply in order to improve
- conditioning when calling the transform method.
- n_iter : int, default=100
- Number of iterations to perform for each mini batch.
- .. deprecated:: 1.2
- `n_iter` is deprecated in 1.2 and will be removed in 1.4. Use
- `max_iter` instead.
- max_iter : int, default=None
- Maximum number of iterations over the complete dataset before
- stopping independently of any early stopping criterion heuristics.
- If `max_iter` is not `None`, `n_iter` is ignored.
- .. versionadded:: 1.2
- callback : callable, default=None
- Callable that gets invoked every five iterations.
- batch_size : int, default=3
- The number of features to take in each mini batch.
- verbose : int or bool, default=False
- Controls the verbosity; the higher, the more messages. Defaults to 0.
- shuffle : bool, default=True
- Whether to shuffle the data before splitting it in batches.
- n_jobs : int, default=None
- Number of parallel jobs to run.
- ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
- ``-1`` means using all processors. See :term:`Glossary <n_jobs>`
- for more details.
- method : {'lars', 'cd'}, default='lars'
- Method to be used for optimization.
- lars: uses the least angle regression method to solve the lasso problem
- (linear_model.lars_path)
- cd: uses the coordinate descent method to compute the
- Lasso solution (linear_model.Lasso). Lars will be faster if
- the estimated components are sparse.
- random_state : int, RandomState instance or None, default=None
- Used for random shuffling when ``shuffle`` is set to ``True``,
- during online dictionary learning. Pass an int for reproducible results
- across multiple function calls.
- See :term:`Glossary <random_state>`.
- tol : float, default=1e-3
- Control early stopping based on the norm of the differences in the
- dictionary between 2 steps. Used only if `max_iter` is not None.
- To disable early stopping based on changes in the dictionary, set
- `tol` to 0.0.
- .. versionadded:: 1.1
- max_no_improvement : int or None, default=10
- Control early stopping based on the consecutive number of mini batches
- that does not yield an improvement on the smoothed cost function. Used only if
- `max_iter` is not None.
- To disable convergence detection based on cost function, set
- `max_no_improvement` to `None`.
- .. versionadded:: 1.1
- Attributes
- ----------
- components_ : ndarray of shape (n_components, n_features)
- Sparse components extracted from the data.
- n_components_ : int
- Estimated number of components.
- .. versionadded:: 0.23
- n_iter_ : int
- Number of iterations run.
- mean_ : ndarray of shape (n_features,)
- Per-feature empirical mean, estimated from the training set.
- Equal to ``X.mean(axis=0)``.
- n_features_in_ : int
- Number of features seen during :term:`fit`.
- .. versionadded:: 0.24
- feature_names_in_ : ndarray of shape (`n_features_in_`,)
- Names of features seen during :term:`fit`. Defined only when `X`
- has feature names that are all strings.
- .. versionadded:: 1.0
- See Also
- --------
- DictionaryLearning : Find a dictionary that sparsely encodes data.
- IncrementalPCA : Incremental principal components analysis.
- PCA : Principal component analysis.
- SparsePCA : Sparse Principal Components Analysis.
- TruncatedSVD : Dimensionality reduction using truncated SVD.
- Examples
- --------
- >>> import numpy as np
- >>> from sklearn.datasets import make_friedman1
- >>> from sklearn.decomposition import MiniBatchSparsePCA
- >>> X, _ = make_friedman1(n_samples=200, n_features=30, random_state=0)
- >>> transformer = MiniBatchSparsePCA(n_components=5, batch_size=50,
- ... max_iter=10, random_state=0)
- >>> transformer.fit(X)
- MiniBatchSparsePCA(...)
- >>> X_transformed = transformer.transform(X)
- >>> X_transformed.shape
- (200, 5)
- >>> # most values in the components_ are zero (sparsity)
- >>> np.mean(transformer.components_ == 0)
- 0.9...
- """
- _parameter_constraints: dict = {
- **_BaseSparsePCA._parameter_constraints,
- "max_iter": [Interval(Integral, 0, None, closed="left"), None],
- "n_iter": [
- Interval(Integral, 0, None, closed="left"),
- Hidden(StrOptions({"deprecated"})),
- ],
- "callback": [None, callable],
- "batch_size": [Interval(Integral, 1, None, closed="left")],
- "shuffle": ["boolean"],
- "max_no_improvement": [Interval(Integral, 0, None, closed="left"), None],
- }
- def __init__(
- self,
- n_components=None,
- *,
- alpha=1,
- ridge_alpha=0.01,
- n_iter="deprecated",
- max_iter=None,
- callback=None,
- batch_size=3,
- verbose=False,
- shuffle=True,
- n_jobs=None,
- method="lars",
- random_state=None,
- tol=1e-3,
- max_no_improvement=10,
- ):
- super().__init__(
- n_components=n_components,
- alpha=alpha,
- ridge_alpha=ridge_alpha,
- max_iter=max_iter,
- tol=tol,
- method=method,
- n_jobs=n_jobs,
- verbose=verbose,
- random_state=random_state,
- )
- self.n_iter = n_iter
- self.callback = callback
- self.batch_size = batch_size
- self.shuffle = shuffle
- self.max_no_improvement = max_no_improvement
- def _fit(self, X, n_components, random_state):
- """Specialized `fit` for MiniBatchSparsePCA."""
- transform_algorithm = "lasso_" + self.method
- est = MiniBatchDictionaryLearning(
- n_components=n_components,
- alpha=self.alpha,
- n_iter=self.n_iter,
- max_iter=self.max_iter,
- dict_init=None,
- batch_size=self.batch_size,
- shuffle=self.shuffle,
- n_jobs=self.n_jobs,
- fit_algorithm=self.method,
- random_state=random_state,
- transform_algorithm=transform_algorithm,
- transform_alpha=self.alpha,
- verbose=self.verbose,
- callback=self.callback,
- tol=self.tol,
- max_no_improvement=self.max_no_improvement,
- ).fit(X.T)
- self.components_, self.n_iter_ = est.transform(X.T).T, est.n_iter_
- components_norm = np.linalg.norm(self.components_, axis=1)[:, np.newaxis]
- components_norm[components_norm == 0] = 1
- self.components_ /= components_norm
- self.n_components_ = len(self.components_)
- return self
|