""" Gaussian Mixture Models. This implementation corresponds to frequentist (non-Bayesian) formulation of Gaussian Mixture Models. """ # Author: Ron Weiss # Fabian Pedregosa # Bertrand Thirion import warnings import numpy as np from scipy import linalg from time import time from ..base import BaseEstimator from ..utils import check_random_state, check_array from ..utils.extmath import logsumexp from ..utils.validation import check_is_fitted from .. import cluster from sklearn.externals.six.moves import zip EPS = np.finfo(float).eps def log_multivariate_normal_density(X, means, covars, covariance_type='diag'): """Compute the log probability under a multivariate Gaussian distribution. Parameters ---------- X : array_like, shape (n_samples, n_features) List of n_features-dimensional data points. Each row corresponds to a single data point. means : array_like, shape (n_components, n_features) List of n_features-dimensional mean vectors for n_components Gaussians. Each row corresponds to a single mean vector. covars : array_like List of n_components covariance parameters for each Gaussian. The shape depends on `covariance_type`: (n_components, n_features) if 'spherical', (n_features, n_features) if 'tied', (n_components, n_features) if 'diag', (n_components, n_features, n_features) if 'full' covariance_type : string Type of the covariance parameters. Must be one of 'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. Returns ------- lpr : array_like, shape (n_samples, n_components) Array containing the log probabilities of each data point in X under each of the n_components multivariate Gaussian distributions. """ log_multivariate_normal_density_dict = { 'spherical': _log_multivariate_normal_density_spherical, 'tied': _log_multivariate_normal_density_tied, 'diag': _log_multivariate_normal_density_diag, 'full': _log_multivariate_normal_density_full} return log_multivariate_normal_density_dict[covariance_type]( X, means, covars) def sample_gaussian(mean, covar, covariance_type='diag', n_samples=1, random_state=None): """Generate random samples from a Gaussian distribution. Parameters ---------- mean : array_like, shape (n_features,) Mean of the distribution. covar : array_like, optional Covariance of the distribution. The shape depends on `covariance_type`: scalar if 'spherical', (n_features) if 'diag', (n_features, n_features) if 'tied', or 'full' covariance_type : string, optional Type of the covariance parameters. Must be one of 'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. n_samples : int, optional Number of samples to generate. Defaults to 1. Returns ------- X : array, shape (n_features, n_samples) Randomly generated sample """ rng = check_random_state(random_state) n_dim = len(mean) rand = rng.randn(n_dim, n_samples) if n_samples == 1: rand.shape = (n_dim,) if covariance_type == 'spherical': rand *= np.sqrt(covar) elif covariance_type == 'diag': rand = np.dot(np.diag(np.sqrt(covar)), rand) else: s, U = linalg.eigh(covar) s.clip(0, out=s) # get rid of tiny negatives np.sqrt(s, out=s) U *= s rand = np.dot(U, rand) return (rand.T + mean).T class GMM(BaseEstimator): """Gaussian Mixture Model Representation of a Gaussian mixture model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of a GMM distribution. Initializes parameters such that every mixture component has zero mean and identity covariance. Read more in the :ref:`User Guide `. Parameters ---------- n_components : int, optional Number of mixture components. Defaults to 1. covariance_type : string, optional String describing the type of covariance parameters to use. Must be one of 'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. random_state: RandomState or an int seed (None by default) A random number generator instance min_covar : float, optional Floor on the diagonal of the covariance matrix to prevent overfitting. Defaults to 1e-3. tol : float, optional Convergence threshold. EM iterations will stop when average gain in log-likelihood is below this threshold. Defaults to 1e-3. n_iter : int, optional Number of EM iterations to perform. n_init : int, optional Number of initializations to perform. the best results is kept params : string, optional Controls which parameters are updated in the training process. Can contain any combination of 'w' for weights, 'm' for means, and 'c' for covars. Defaults to 'wmc'. init_params : string, optional Controls which parameters are updated in the initialization process. Can contain any combination of 'w' for weights, 'm' for means, and 'c' for covars. Defaults to 'wmc'. verbose : int, default: 0 Enable verbose output. If 1 then it always prints the current initialization and iteration step. If greater than 1 then it prints additionally the change and time needed for each step. Attributes ---------- weights_ : array, shape (`n_components`,) This attribute stores the mixing weights for each mixture component. means_ : array, shape (`n_components`, `n_features`) Mean parameters for each mixture component. covars_ : array Covariance parameters for each mixture component. The shape depends on `covariance_type`:: (n_components, n_features) if 'spherical', (n_features, n_features) if 'tied', (n_components, n_features) if 'diag', (n_components, n_features, n_features) if 'full' converged_ : bool True when convergence was reached in fit(), False otherwise. See Also -------- DPGMM : Infinite gaussian mixture model, using the dirichlet process, fit with a variational algorithm VBGMM : Finite gaussian mixture model fit with a variational algorithm, better for situations where there might be too little data to get a good estimate of the covariance matrix. Examples -------- >>> import numpy as np >>> from sklearn import mixture >>> np.random.seed(1) >>> g = mixture.GMM(n_components=2) >>> # Generate random observations with two modes centered on 0 >>> # and 10 to use for training. >>> obs = np.concatenate((np.random.randn(100, 1), ... 10 + np.random.randn(300, 1))) >>> g.fit(obs) # doctest: +NORMALIZE_WHITESPACE GMM(covariance_type='diag', init_params='wmc', min_covar=0.001, n_components=2, n_init=1, n_iter=100, params='wmc', random_state=None, thresh=None, tol=0.001, verbose=0) >>> np.round(g.weights_, 2) array([ 0.75, 0.25]) >>> np.round(g.means_, 2) array([[ 10.05], [ 0.06]]) >>> np.round(g.covars_, 2) #doctest: +SKIP array([[[ 1.02]], [[ 0.96]]]) >>> g.predict([[0], [2], [9], [10]]) #doctest: +ELLIPSIS array([1, 1, 0, 0]...) >>> np.round(g.score([[0], [2], [9], [10]]), 2) array([-2.19, -4.58, -1.75, -1.21]) >>> # Refit the model on new data (initial parameters remain the >>> # same), this time with an even split between the two modes. >>> g.fit(20 * [[0]] + 20 * [[10]]) # doctest: +NORMALIZE_WHITESPACE GMM(covariance_type='diag', init_params='wmc', min_covar=0.001, n_components=2, n_init=1, n_iter=100, params='wmc', random_state=None, thresh=None, tol=0.001, verbose=0) >>> np.round(g.weights_, 2) array([ 0.5, 0.5]) """ def __init__(self, n_components=1, covariance_type='diag', random_state=None, thresh=None, tol=1e-3, min_covar=1e-3, n_iter=100, n_init=1, params='wmc', init_params='wmc', verbose=0): if thresh is not None: warnings.warn("'thresh' has been replaced by 'tol' in 0.16 " " and will be removed in 0.18.", DeprecationWarning) self.n_components = n_components self.covariance_type = covariance_type self.thresh = thresh self.tol = tol self.min_covar = min_covar self.random_state = random_state self.n_iter = n_iter self.n_init = n_init self.params = params self.init_params = init_params self.verbose = verbose if covariance_type not in ['spherical', 'tied', 'diag', 'full']: raise ValueError('Invalid value for covariance_type: %s' % covariance_type) if n_init < 1: raise ValueError('GMM estimation requires at least one run') self.weights_ = np.ones(self.n_components) / self.n_components # flag to indicate exit status of fit() method: converged (True) or # n_iter reached (False) self.converged_ = False def _get_covars(self): """Covariance parameters for each mixture component. The shape depends on ``cvtype``:: (n_states, n_features) if 'spherical', (n_features, n_features) if 'tied', (n_states, n_features) if 'diag', (n_states, n_features, n_features) if 'full' """ if self.covariance_type == 'full': return self.covars_ elif self.covariance_type == 'diag': return [np.diag(cov) for cov in self.covars_] elif self.covariance_type == 'tied': return [self.covars_] * self.n_components elif self.covariance_type == 'spherical': return [np.diag(cov) for cov in self.covars_] def _set_covars(self, covars): """Provide values for covariance""" covars = np.asarray(covars) _validate_covars(covars, self.covariance_type, self.n_components) self.covars_ = covars def score_samples(self, X): """Return the per-sample likelihood of the data under the model. Compute the log probability of X under the model and return the posterior distribution (responsibilities) of each mixture component for each element of X. Parameters ---------- X: array_like, shape (n_samples, n_features) List of n_features-dimensional data points. Each row corresponds to a single data point. Returns ------- logprob : array_like, shape (n_samples,) Log probabilities of each data point in X. responsibilities : array_like, shape (n_samples, n_components) Posterior probabilities of each mixture component for each observation """ check_is_fitted(self, 'means_') X = check_array(X) if X.ndim == 1: X = X[:, np.newaxis] if X.size == 0: return np.array([]), np.empty((0, self.n_components)) if X.shape[1] != self.means_.shape[1]: raise ValueError('The shape of X is not compatible with self') lpr = (log_multivariate_normal_density(X, self.means_, self.covars_, self.covariance_type) + np.log(self.weights_)) logprob = logsumexp(lpr, axis=1) responsibilities = np.exp(lpr - logprob[:, np.newaxis]) return logprob, responsibilities def score(self, X, y=None): """Compute the log probability under the model. Parameters ---------- X : array_like, shape (n_samples, n_features) List of n_features-dimensional data points. Each row corresponds to a single data point. Returns ------- logprob : array_like, shape (n_samples,) Log probabilities of each data point in X """ logprob, _ = self.score_samples(X) return logprob def predict(self, X): """Predict label for data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- C : array, shape = (n_samples,) component memberships """ logprob, responsibilities = self.score_samples(X) return responsibilities.argmax(axis=1) def predict_proba(self, X): """Predict posterior probability of data under each Gaussian in the model. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- responsibilities : array-like, shape = (n_samples, n_components) Returns the probability of the sample for each Gaussian (state) in the model. """ logprob, responsibilities = self.score_samples(X) return responsibilities def sample(self, n_samples=1, random_state=None): """Generate random samples from the model. Parameters ---------- n_samples : int, optional Number of samples to generate. Defaults to 1. Returns ------- X : array_like, shape (n_samples, n_features) List of samples """ check_is_fitted(self, 'means_') if random_state is None: random_state = self.random_state random_state = check_random_state(random_state) weight_cdf = np.cumsum(self.weights_) X = np.empty((n_samples, self.means_.shape[1])) rand = random_state.rand(n_samples) # decide which component to use for each sample comps = weight_cdf.searchsorted(rand) # for each component, generate all needed samples for comp in range(self.n_components): # occurrences of current component in X comp_in_X = (comp == comps) # number of those occurrences num_comp_in_X = comp_in_X.sum() if num_comp_in_X > 0: if self.covariance_type == 'tied': cv = self.covars_ elif self.covariance_type == 'spherical': cv = self.covars_[comp][0] else: cv = self.covars_[comp] X[comp_in_X] = sample_gaussian( self.means_[comp], cv, self.covariance_type, num_comp_in_X, random_state=random_state).T return X def fit_predict(self, X, y=None): """Fit and then predict labels for data. Warning: due to the final maximization step in the EM algorithm, with low iterations the prediction may not be 100% accurate .. versionadded:: 0.17 *fit_predict* method in Gaussian Mixture Model. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- C : array, shape = (n_samples,) component memberships """ return self._fit(X, y).argmax(axis=1) def _fit(self, X, y=None, do_prediction=False): """Estimate model parameters with the EM algorithm. A initialization step is performed before entering the expectation-maximization (EM) algorithm. If you want to avoid this step, set the keyword argument init_params to the empty string '' when creating the GMM object. Likewise, if you would like just to do an initialization, set n_iter=0. Parameters ---------- X : array_like, shape (n, n_features) List of n_features-dimensional data points. Each row corresponds to a single data point. Returns ------- responsibilities : array, shape (n_samples, n_components) Posterior probabilities of each mixture component for each observation. """ # initialization step X = check_array(X, dtype=np.float64, ensure_min_samples=2, estimator=self) if X.shape[0] < self.n_components: raise ValueError( 'GMM estimation with %s components, but got only %s samples' % (self.n_components, X.shape[0])) max_log_prob = -np.infty if self.verbose > 0: print('Expectation-maximization algorithm started.') for init in range(self.n_init): if self.verbose > 0: print('Initialization ' + str(init + 1)) start_init_time = time() if 'm' in self.init_params or not hasattr(self, 'means_'): self.means_ = cluster.KMeans( n_clusters=self.n_components, random_state=self.random_state).fit(X).cluster_centers_ if self.verbose > 1: print('\tMeans have been initialized.') if 'w' in self.init_params or not hasattr(self, 'weights_'): self.weights_ = np.tile(1.0 / self.n_components, self.n_components) if self.verbose > 1: print('\tWeights have been initialized.') if 'c' in self.init_params or not hasattr(self, 'covars_'): cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1]) if not cv.shape: cv.shape = (1, 1) self.covars_ = \ distribute_covar_matrix_to_match_covariance_type( cv, self.covariance_type, self.n_components) if self.verbose > 1: print('\tCovariance matrices have been initialized.') # EM algorithms current_log_likelihood = None # reset self.converged_ to False self.converged_ = False # this line should be removed when 'thresh' is removed in v0.18 tol = (self.tol if self.thresh is None else self.thresh / float(X.shape[0])) for i in range(self.n_iter): if self.verbose > 0: print('\tEM iteration ' + str(i + 1)) start_iter_time = time() prev_log_likelihood = current_log_likelihood # Expectation step log_likelihoods, responsibilities = self.score_samples(X) current_log_likelihood = log_likelihoods.mean() # Check for convergence. # (should compare to self.tol when deprecated 'thresh' is # removed in v0.18) if prev_log_likelihood is not None: change = abs(current_log_likelihood - prev_log_likelihood) if self.verbose > 1: print('\t\tChange: ' + str(change)) if change < tol: self.converged_ = True if self.verbose > 0: print('\t\tEM algorithm converged.') break # Maximization step self._do_mstep(X, responsibilities, self.params, self.min_covar) if self.verbose > 1: print('\t\tEM iteration ' + str(i + 1) + ' took {0:.5f}s'.format( time() - start_iter_time)) # if the results are better, keep it if self.n_iter: if current_log_likelihood > max_log_prob: max_log_prob = current_log_likelihood best_params = {'weights': self.weights_, 'means': self.means_, 'covars': self.covars_} if self.verbose > 1: print('\tBetter parameters were found.') if self.verbose > 1: print('\tInitialization ' + str(init + 1) + ' took {0:.5f}s'.format( time() - start_init_time)) # check the existence of an init param that was not subject to # likelihood computation issue. if np.isneginf(max_log_prob) and self.n_iter: raise RuntimeError( "EM algorithm was never able to compute a valid likelihood " + "given initial parameters. Try different init parameters " + "(or increasing n_init) or check for degenerate data.") if self.n_iter: self.covars_ = best_params['covars'] self.means_ = best_params['means'] self.weights_ = best_params['weights'] else: # self.n_iter == 0 occurs when using GMM within HMM # Need to make sure that there are responsibilities to output # Output zeros because it was just a quick initialization responsibilities = np.zeros((X.shape[0], self.n_components)) return responsibilities def fit(self, X, y=None): """Estimate model parameters with the EM algorithm. A initialization step is performed before entering the expectation-maximization (EM) algorithm. If you want to avoid this step, set the keyword argument init_params to the empty string '' when creating the GMM object. Likewise, if you would like just to do an initialization, set n_iter=0. Parameters ---------- X : array_like, shape (n, n_features) List of n_features-dimensional data points. Each row corresponds to a single data point. Returns ------- self """ self._fit(X, y) return self def _do_mstep(self, X, responsibilities, params, min_covar=0): """ Perform the Mstep of the EM algorithm and return the class weights """ weights = responsibilities.sum(axis=0) weighted_X_sum = np.dot(responsibilities.T, X) inverse_weights = 1.0 / (weights[:, np.newaxis] + 10 * EPS) if 'w' in params: self.weights_ = (weights / (weights.sum() + 10 * EPS) + EPS) if 'm' in params: self.means_ = weighted_X_sum * inverse_weights if 'c' in params: covar_mstep_func = _covar_mstep_funcs[self.covariance_type] self.covars_ = covar_mstep_func( self, X, responsibilities, weighted_X_sum, inverse_weights, min_covar) return weights def _n_parameters(self): """Return the number of free parameters in the model.""" ndim = self.means_.shape[1] if self.covariance_type == 'full': cov_params = self.n_components * ndim * (ndim + 1) / 2. elif self.covariance_type == 'diag': cov_params = self.n_components * ndim elif self.covariance_type == 'tied': cov_params = ndim * (ndim + 1) / 2. elif self.covariance_type == 'spherical': cov_params = self.n_components mean_params = ndim * self.n_components return int(cov_params + mean_params + self.n_components - 1) def bic(self, X): """Bayesian information criterion for the current model fit and the proposed data Parameters ---------- X : array of shape(n_samples, n_dimensions) Returns ------- bic: float (the lower the better) """ return (-2 * self.score(X).sum() + self._n_parameters() * np.log(X.shape[0])) def aic(self, X): """Akaike information criterion for the current model fit and the proposed data Parameters ---------- X : array of shape(n_samples, n_dimensions) Returns ------- aic: float (the lower the better) """ return - 2 * self.score(X).sum() + 2 * self._n_parameters() ######################################################################### # some helper routines ######################################################################### def _log_multivariate_normal_density_diag(X, means, covars): """Compute Gaussian log-density at X for a diagonal model""" n_samples, n_dim = X.shape lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1) + np.sum((means ** 2) / covars, 1) - 2 * np.dot(X, (means / covars).T) + np.dot(X ** 2, (1.0 / covars).T)) return lpr def _log_multivariate_normal_density_spherical(X, means, covars): """Compute Gaussian log-density at X for a spherical model""" cv = covars.copy() if covars.ndim == 1: cv = cv[:, np.newaxis] if covars.shape[1] == 1: cv = np.tile(cv, (1, X.shape[-1])) return _log_multivariate_normal_density_diag(X, means, cv) def _log_multivariate_normal_density_tied(X, means, covars): """Compute Gaussian log-density at X for a tied model""" cv = np.tile(covars, (means.shape[0], 1, 1)) return _log_multivariate_normal_density_full(X, means, cv) def _log_multivariate_normal_density_full(X, means, covars, min_covar=1.e-7): """Log probability for full covariance matrices.""" n_samples, n_dim = X.shape nmix = len(means) log_prob = np.empty((n_samples, nmix)) for c, (mu, cv) in enumerate(zip(means, covars)): try: cv_chol = linalg.cholesky(cv, lower=True) except linalg.LinAlgError: # The model is most probably stuck in a component with too # few observations, we need to reinitialize this components try: cv_chol = linalg.cholesky(cv + min_covar * np.eye(n_dim), lower=True) except linalg.LinAlgError: raise ValueError("'covars' must be symmetric, " "positive-definite") cv_log_det = 2 * np.sum(np.log(np.diagonal(cv_chol))) cv_sol = linalg.solve_triangular(cv_chol, (X - mu).T, lower=True).T log_prob[:, c] = - .5 * (np.sum(cv_sol ** 2, axis=1) + n_dim * np.log(2 * np.pi) + cv_log_det) return log_prob def _validate_covars(covars, covariance_type, n_components): """Do basic checks on matrix covariance sizes and values """ from scipy import linalg if covariance_type == 'spherical': if len(covars) != n_components: raise ValueError("'spherical' covars have length n_components") elif np.any(covars <= 0): raise ValueError("'spherical' covars must be non-negative") elif covariance_type == 'tied': if covars.shape[0] != covars.shape[1]: raise ValueError("'tied' covars must have shape (n_dim, n_dim)") elif (not np.allclose(covars, covars.T) or np.any(linalg.eigvalsh(covars) <= 0)): raise ValueError("'tied' covars must be symmetric, " "positive-definite") elif covariance_type == 'diag': if len(covars.shape) != 2: raise ValueError("'diag' covars must have shape " "(n_components, n_dim)") elif np.any(covars <= 0): raise ValueError("'diag' covars must be non-negative") elif covariance_type == 'full': if len(covars.shape) != 3: raise ValueError("'full' covars must have shape " "(n_components, n_dim, n_dim)") elif covars.shape[1] != covars.shape[2]: raise ValueError("'full' covars must have shape " "(n_components, n_dim, n_dim)") for n, cv in enumerate(covars): if (not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0)): raise ValueError("component %d of 'full' covars must be " "symmetric, positive-definite" % n) else: raise ValueError("covariance_type must be one of " + "'spherical', 'tied', 'diag', 'full'") def distribute_covar_matrix_to_match_covariance_type( tied_cv, covariance_type, n_components): """Create all the covariance matrices from a given template""" if covariance_type == 'spherical': cv = np.tile(tied_cv.mean() * np.ones(tied_cv.shape[1]), (n_components, 1)) elif covariance_type == 'tied': cv = tied_cv elif covariance_type == 'diag': cv = np.tile(np.diag(tied_cv), (n_components, 1)) elif covariance_type == 'full': cv = np.tile(tied_cv, (n_components, 1, 1)) else: raise ValueError("covariance_type must be one of " + "'spherical', 'tied', 'diag', 'full'") return cv def _covar_mstep_diag(gmm, X, responsibilities, weighted_X_sum, norm, min_covar): """Performing the covariance M step for diagonal cases""" avg_X2 = np.dot(responsibilities.T, X * X) * norm avg_means2 = gmm.means_ ** 2 avg_X_means = gmm.means_ * weighted_X_sum * norm return avg_X2 - 2 * avg_X_means + avg_means2 + min_covar def _covar_mstep_spherical(*args): """Performing the covariance M step for spherical cases""" cv = _covar_mstep_diag(*args) return np.tile(cv.mean(axis=1)[:, np.newaxis], (1, cv.shape[1])) def _covar_mstep_full(gmm, X, responsibilities, weighted_X_sum, norm, min_covar): """Performing the covariance M step for full cases""" # Eq. 12 from K. Murphy, "Fitting a Conditional Linear Gaussian # Distribution" n_features = X.shape[1] cv = np.empty((gmm.n_components, n_features, n_features)) for c in range(gmm.n_components): post = responsibilities[:, c] mu = gmm.means_[c] diff = X - mu with np.errstate(under='ignore'): # Underflow Errors in doing post * X.T are not important avg_cv = np.dot(post * diff.T, diff) / (post.sum() + 10 * EPS) cv[c] = avg_cv + min_covar * np.eye(n_features) return cv def _covar_mstep_tied(gmm, X, responsibilities, weighted_X_sum, norm, min_covar): # Eq. 15 from K. Murphy, "Fitting a Conditional Linear Gaussian # Distribution" avg_X2 = np.dot(X.T, X) avg_means2 = np.dot(gmm.means_.T, weighted_X_sum) out = avg_X2 - avg_means2 out *= 1. / X.shape[0] out.flat[::len(out) + 1] += min_covar return out _covar_mstep_funcs = {'spherical': _covar_mstep_spherical, 'diag': _covar_mstep_diag, 'tied': _covar_mstep_tied, 'full': _covar_mstep_full, }