| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560 |
- """Base class for mixture models."""
- # Author: Wei Xue <xuewei4d@gmail.com>
- # Modified by Thierry Guillemot <thierry.guillemot.work@gmail.com>
- # License: BSD 3 clause
- import warnings
- from abc import ABCMeta, abstractmethod
- from numbers import Integral, Real
- from time import time
- import numpy as np
- from scipy.special import logsumexp
- from .. import cluster
- from ..base import BaseEstimator, DensityMixin, _fit_context
- from ..cluster import kmeans_plusplus
- from ..exceptions import ConvergenceWarning
- from ..utils import check_random_state
- from ..utils._param_validation import Interval, StrOptions
- from ..utils.validation import check_is_fitted
- def _check_shape(param, param_shape, name):
- """Validate the shape of the input parameter 'param'.
- Parameters
- ----------
- param : array
- param_shape : tuple
- name : str
- """
- param = np.array(param)
- if param.shape != param_shape:
- raise ValueError(
- "The parameter '%s' should have the shape of %s, but got %s"
- % (name, param_shape, param.shape)
- )
- class BaseMixture(DensityMixin, BaseEstimator, metaclass=ABCMeta):
- """Base class for mixture models.
- This abstract class specifies an interface for all mixture classes and
- provides basic common methods for mixture models.
- """
- _parameter_constraints: dict = {
- "n_components": [Interval(Integral, 1, None, closed="left")],
- "tol": [Interval(Real, 0.0, None, closed="left")],
- "reg_covar": [Interval(Real, 0.0, None, closed="left")],
- "max_iter": [Interval(Integral, 0, None, closed="left")],
- "n_init": [Interval(Integral, 1, None, closed="left")],
- "init_params": [
- StrOptions({"kmeans", "random", "random_from_data", "k-means++"})
- ],
- "random_state": ["random_state"],
- "warm_start": ["boolean"],
- "verbose": ["verbose"],
- "verbose_interval": [Interval(Integral, 1, None, closed="left")],
- }
- def __init__(
- self,
- n_components,
- tol,
- reg_covar,
- max_iter,
- n_init,
- init_params,
- random_state,
- warm_start,
- verbose,
- verbose_interval,
- ):
- self.n_components = n_components
- self.tol = tol
- self.reg_covar = reg_covar
- self.max_iter = max_iter
- self.n_init = n_init
- self.init_params = init_params
- self.random_state = random_state
- self.warm_start = warm_start
- self.verbose = verbose
- self.verbose_interval = verbose_interval
- @abstractmethod
- def _check_parameters(self, X):
- """Check initial parameters of the derived class.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- """
- pass
- def _initialize_parameters(self, X, random_state):
- """Initialize the model parameters.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- random_state : RandomState
- A random number generator instance that controls the random seed
- used for the method chosen to initialize the parameters.
- """
- n_samples, _ = X.shape
- if self.init_params == "kmeans":
- resp = np.zeros((n_samples, self.n_components))
- label = (
- cluster.KMeans(
- n_clusters=self.n_components, n_init=1, random_state=random_state
- )
- .fit(X)
- .labels_
- )
- resp[np.arange(n_samples), label] = 1
- elif self.init_params == "random":
- resp = random_state.uniform(size=(n_samples, self.n_components))
- resp /= resp.sum(axis=1)[:, np.newaxis]
- elif self.init_params == "random_from_data":
- resp = np.zeros((n_samples, self.n_components))
- indices = random_state.choice(
- n_samples, size=self.n_components, replace=False
- )
- resp[indices, np.arange(self.n_components)] = 1
- elif self.init_params == "k-means++":
- resp = np.zeros((n_samples, self.n_components))
- _, indices = kmeans_plusplus(
- X,
- self.n_components,
- random_state=random_state,
- )
- resp[indices, np.arange(self.n_components)] = 1
- self._initialize(X, resp)
- @abstractmethod
- def _initialize(self, X, resp):
- """Initialize the model parameters of the derived class.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- resp : array-like of shape (n_samples, n_components)
- """
- pass
- def fit(self, X, y=None):
- """Estimate model parameters with the EM algorithm.
- The method fits the model ``n_init`` times and sets the parameters with
- which the model has the largest likelihood or lower bound. Within each
- trial, the method iterates between E-step and M-step for ``max_iter``
- times until the change of likelihood or lower bound is less than
- ``tol``, otherwise, a ``ConvergenceWarning`` is raised.
- If ``warm_start`` is ``True``, then ``n_init`` is ignored and a single
- initialization is performed upon the first call. Upon consecutive
- calls, training starts where it left off.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- List of n_features-dimensional data points. Each row
- corresponds to a single data point.
- y : Ignored
- Not used, present for API consistency by convention.
- Returns
- -------
- self : object
- The fitted mixture.
- """
- # parameters are validated in fit_predict
- self.fit_predict(X, y)
- return self
- @_fit_context(prefer_skip_nested_validation=True)
- def fit_predict(self, X, y=None):
- """Estimate model parameters using X and predict the labels for X.
- The method fits the model n_init times and sets the parameters with
- which the model has the largest likelihood or lower bound. Within each
- trial, the method iterates between E-step and M-step for `max_iter`
- times until the change of likelihood or lower bound is less than
- `tol`, otherwise, a :class:`~sklearn.exceptions.ConvergenceWarning` is
- raised. After fitting, it predicts the most probable label for the
- input data points.
- .. versionadded:: 0.20
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- List of n_features-dimensional data points. Each row
- corresponds to a single data point.
- y : Ignored
- Not used, present for API consistency by convention.
- Returns
- -------
- labels : array, shape (n_samples,)
- Component labels.
- """
- X = self._validate_data(X, dtype=[np.float64, np.float32], ensure_min_samples=2)
- if X.shape[0] < self.n_components:
- raise ValueError(
- "Expected n_samples >= n_components "
- f"but got n_components = {self.n_components}, "
- f"n_samples = {X.shape[0]}"
- )
- self._check_parameters(X)
- # if we enable warm_start, we will have a unique initialisation
- do_init = not (self.warm_start and hasattr(self, "converged_"))
- n_init = self.n_init if do_init else 1
- max_lower_bound = -np.inf
- self.converged_ = False
- random_state = check_random_state(self.random_state)
- n_samples, _ = X.shape
- for init in range(n_init):
- self._print_verbose_msg_init_beg(init)
- if do_init:
- self._initialize_parameters(X, random_state)
- lower_bound = -np.inf if do_init else self.lower_bound_
- if self.max_iter == 0:
- best_params = self._get_parameters()
- best_n_iter = 0
- else:
- for n_iter in range(1, self.max_iter + 1):
- prev_lower_bound = lower_bound
- log_prob_norm, log_resp = self._e_step(X)
- self._m_step(X, log_resp)
- lower_bound = self._compute_lower_bound(log_resp, log_prob_norm)
- change = lower_bound - prev_lower_bound
- self._print_verbose_msg_iter_end(n_iter, change)
- if abs(change) < self.tol:
- self.converged_ = True
- break
- self._print_verbose_msg_init_end(lower_bound)
- if lower_bound > max_lower_bound or max_lower_bound == -np.inf:
- max_lower_bound = lower_bound
- best_params = self._get_parameters()
- best_n_iter = n_iter
- # Should only warn about convergence if max_iter > 0, otherwise
- # the user is assumed to have used 0-iters initialization
- # to get the initial means.
- if not self.converged_ and self.max_iter > 0:
- warnings.warn(
- "Initialization %d did not converge. "
- "Try different init parameters, "
- "or increase max_iter, tol "
- "or check for degenerate data." % (init + 1),
- ConvergenceWarning,
- )
- self._set_parameters(best_params)
- self.n_iter_ = best_n_iter
- self.lower_bound_ = max_lower_bound
- # Always do a final e-step to guarantee that the labels returned by
- # fit_predict(X) are always consistent with fit(X).predict(X)
- # for any value of max_iter and tol (and any random_state).
- _, log_resp = self._e_step(X)
- return log_resp.argmax(axis=1)
- def _e_step(self, X):
- """E step.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- Returns
- -------
- log_prob_norm : float
- Mean of the logarithms of the probabilities of each sample in X
- log_responsibility : array, shape (n_samples, n_components)
- Logarithm of the posterior probabilities (or responsibilities) of
- the point of each sample in X.
- """
- log_prob_norm, log_resp = self._estimate_log_prob_resp(X)
- return np.mean(log_prob_norm), log_resp
- @abstractmethod
- def _m_step(self, X, log_resp):
- """M step.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- log_resp : array-like of shape (n_samples, n_components)
- Logarithm of the posterior probabilities (or responsibilities) of
- the point of each sample in X.
- """
- pass
- @abstractmethod
- def _get_parameters(self):
- pass
- @abstractmethod
- def _set_parameters(self, params):
- pass
- def score_samples(self, X):
- """Compute the log-likelihood of each sample.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- List of n_features-dimensional data points. Each row
- corresponds to a single data point.
- Returns
- -------
- log_prob : array, shape (n_samples,)
- Log-likelihood of each sample in `X` under the current model.
- """
- check_is_fitted(self)
- X = self._validate_data(X, reset=False)
- return logsumexp(self._estimate_weighted_log_prob(X), axis=1)
- def score(self, X, y=None):
- """Compute the per-sample average log-likelihood of the given data X.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_dimensions)
- List of n_features-dimensional data points. Each row
- corresponds to a single data point.
- y : Ignored
- Not used, present for API consistency by convention.
- Returns
- -------
- log_likelihood : float
- Log-likelihood of `X` under the Gaussian mixture model.
- """
- return self.score_samples(X).mean()
- def predict(self, X):
- """Predict the labels for the data samples in X using trained model.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- List of n_features-dimensional data points. Each row
- corresponds to a single data point.
- Returns
- -------
- labels : array, shape (n_samples,)
- Component labels.
- """
- check_is_fitted(self)
- X = self._validate_data(X, reset=False)
- return self._estimate_weighted_log_prob(X).argmax(axis=1)
- def predict_proba(self, X):
- """Evaluate the components' density for each sample.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- List of n_features-dimensional data points. Each row
- corresponds to a single data point.
- Returns
- -------
- resp : array, shape (n_samples, n_components)
- Density of each Gaussian component for each sample in X.
- """
- check_is_fitted(self)
- X = self._validate_data(X, reset=False)
- _, log_resp = self._estimate_log_prob_resp(X)
- return np.exp(log_resp)
- def sample(self, n_samples=1):
- """Generate random samples from the fitted Gaussian distribution.
- Parameters
- ----------
- n_samples : int, default=1
- Number of samples to generate.
- Returns
- -------
- X : array, shape (n_samples, n_features)
- Randomly generated sample.
- y : array, shape (nsamples,)
- Component labels.
- """
- check_is_fitted(self)
- if n_samples < 1:
- raise ValueError(
- "Invalid value for 'n_samples': %d . The sampling requires at "
- "least one sample." % (self.n_components)
- )
- _, n_features = self.means_.shape
- rng = check_random_state(self.random_state)
- n_samples_comp = rng.multinomial(n_samples, self.weights_)
- if self.covariance_type == "full":
- X = np.vstack(
- [
- rng.multivariate_normal(mean, covariance, int(sample))
- for (mean, covariance, sample) in zip(
- self.means_, self.covariances_, n_samples_comp
- )
- ]
- )
- elif self.covariance_type == "tied":
- X = np.vstack(
- [
- rng.multivariate_normal(mean, self.covariances_, int(sample))
- for (mean, sample) in zip(self.means_, n_samples_comp)
- ]
- )
- else:
- X = np.vstack(
- [
- mean
- + rng.standard_normal(size=(sample, n_features))
- * np.sqrt(covariance)
- for (mean, covariance, sample) in zip(
- self.means_, self.covariances_, n_samples_comp
- )
- ]
- )
- y = np.concatenate(
- [np.full(sample, j, dtype=int) for j, sample in enumerate(n_samples_comp)]
- )
- return (X, y)
- def _estimate_weighted_log_prob(self, X):
- """Estimate the weighted log-probabilities, log P(X | Z) + log weights.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- Returns
- -------
- weighted_log_prob : array, shape (n_samples, n_component)
- """
- return self._estimate_log_prob(X) + self._estimate_log_weights()
- @abstractmethod
- def _estimate_log_weights(self):
- """Estimate log-weights in EM algorithm, E[ log pi ] in VB algorithm.
- Returns
- -------
- log_weight : array, shape (n_components, )
- """
- pass
- @abstractmethod
- def _estimate_log_prob(self, X):
- """Estimate the log-probabilities log P(X | Z).
- Compute the log-probabilities per each component for each sample.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- Returns
- -------
- log_prob : array, shape (n_samples, n_component)
- """
- pass
- def _estimate_log_prob_resp(self, X):
- """Estimate log probabilities and responsibilities for each sample.
- Compute the log probabilities, weighted log probabilities per
- component and responsibilities for each sample in X with respect to
- the current state of the model.
- Parameters
- ----------
- X : array-like of shape (n_samples, n_features)
- Returns
- -------
- log_prob_norm : array, shape (n_samples,)
- log p(X)
- log_responsibilities : array, shape (n_samples, n_components)
- logarithm of the responsibilities
- """
- weighted_log_prob = self._estimate_weighted_log_prob(X)
- log_prob_norm = logsumexp(weighted_log_prob, axis=1)
- with np.errstate(under="ignore"):
- # ignore underflow
- log_resp = weighted_log_prob - log_prob_norm[:, np.newaxis]
- return log_prob_norm, log_resp
- def _print_verbose_msg_init_beg(self, n_init):
- """Print verbose message on initialization."""
- if self.verbose == 1:
- print("Initialization %d" % n_init)
- elif self.verbose >= 2:
- print("Initialization %d" % n_init)
- self._init_prev_time = time()
- self._iter_prev_time = self._init_prev_time
- def _print_verbose_msg_iter_end(self, n_iter, diff_ll):
- """Print verbose message on initialization."""
- if n_iter % self.verbose_interval == 0:
- if self.verbose == 1:
- print(" Iteration %d" % n_iter)
- elif self.verbose >= 2:
- cur_time = time()
- print(
- " Iteration %d\t time lapse %.5fs\t ll change %.5f"
- % (n_iter, cur_time - self._iter_prev_time, diff_ll)
- )
- self._iter_prev_time = cur_time
- def _print_verbose_msg_init_end(self, ll):
- """Print verbose message on the end of iteration."""
- if self.verbose == 1:
- print("Initialization converged: %s" % self.converged_)
- elif self.verbose >= 2:
- print(
- "Initialization converged: %s\t time lapse %.5fs\t ll %.5f"
- % (self.converged_, time() - self._init_prev_time, ll)
- )
|