test_sparse_pca.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # Author: Vlad Niculae
  2. # License: BSD 3 clause
  3. import sys
  4. import numpy as np
  5. import pytest
  6. from numpy.testing import assert_array_equal
  7. from sklearn.decomposition import PCA, MiniBatchSparsePCA, SparsePCA
  8. from sklearn.utils import check_random_state
  9. from sklearn.utils._testing import (
  10. assert_allclose,
  11. assert_array_almost_equal,
  12. if_safe_multiprocessing_with_blas,
  13. )
  14. def generate_toy_data(n_components, n_samples, image_size, random_state=None):
  15. n_features = image_size[0] * image_size[1]
  16. rng = check_random_state(random_state)
  17. U = rng.randn(n_samples, n_components)
  18. V = rng.randn(n_components, n_features)
  19. centers = [(3, 3), (6, 7), (8, 1)]
  20. sz = [1, 2, 1]
  21. for k in range(n_components):
  22. img = np.zeros(image_size)
  23. xmin, xmax = centers[k][0] - sz[k], centers[k][0] + sz[k]
  24. ymin, ymax = centers[k][1] - sz[k], centers[k][1] + sz[k]
  25. img[xmin:xmax][:, ymin:ymax] = 1.0
  26. V[k, :] = img.ravel()
  27. # Y is defined by : Y = UV + noise
  28. Y = np.dot(U, V)
  29. Y += 0.1 * rng.randn(Y.shape[0], Y.shape[1]) # Add noise
  30. return Y, U, V
  31. # SparsePCA can be a bit slow. To avoid having test times go up, we
  32. # test different aspects of the code in the same test
  33. def test_correct_shapes():
  34. rng = np.random.RandomState(0)
  35. X = rng.randn(12, 10)
  36. spca = SparsePCA(n_components=8, random_state=rng)
  37. U = spca.fit_transform(X)
  38. assert spca.components_.shape == (8, 10)
  39. assert U.shape == (12, 8)
  40. # test overcomplete decomposition
  41. spca = SparsePCA(n_components=13, random_state=rng)
  42. U = spca.fit_transform(X)
  43. assert spca.components_.shape == (13, 10)
  44. assert U.shape == (12, 13)
  45. def test_fit_transform():
  46. alpha = 1
  47. rng = np.random.RandomState(0)
  48. Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
  49. spca_lars = SparsePCA(n_components=3, method="lars", alpha=alpha, random_state=0)
  50. spca_lars.fit(Y)
  51. # Test that CD gives similar results
  52. spca_lasso = SparsePCA(n_components=3, method="cd", random_state=0, alpha=alpha)
  53. spca_lasso.fit(Y)
  54. assert_array_almost_equal(spca_lasso.components_, spca_lars.components_)
  55. @if_safe_multiprocessing_with_blas
  56. def test_fit_transform_parallel():
  57. alpha = 1
  58. rng = np.random.RandomState(0)
  59. Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
  60. spca_lars = SparsePCA(n_components=3, method="lars", alpha=alpha, random_state=0)
  61. spca_lars.fit(Y)
  62. U1 = spca_lars.transform(Y)
  63. # Test multiple CPUs
  64. spca = SparsePCA(
  65. n_components=3, n_jobs=2, method="lars", alpha=alpha, random_state=0
  66. ).fit(Y)
  67. U2 = spca.transform(Y)
  68. assert not np.all(spca_lars.components_ == 0)
  69. assert_array_almost_equal(U1, U2)
  70. def test_transform_nan():
  71. # Test that SparsePCA won't return NaN when there is 0 feature in all
  72. # samples.
  73. rng = np.random.RandomState(0)
  74. Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
  75. Y[:, 0] = 0
  76. estimator = SparsePCA(n_components=8)
  77. assert not np.any(np.isnan(estimator.fit_transform(Y)))
  78. def test_fit_transform_tall():
  79. rng = np.random.RandomState(0)
  80. Y, _, _ = generate_toy_data(3, 65, (8, 8), random_state=rng) # tall array
  81. spca_lars = SparsePCA(n_components=3, method="lars", random_state=rng)
  82. U1 = spca_lars.fit_transform(Y)
  83. spca_lasso = SparsePCA(n_components=3, method="cd", random_state=rng)
  84. U2 = spca_lasso.fit(Y).transform(Y)
  85. assert_array_almost_equal(U1, U2)
  86. def test_initialization():
  87. rng = np.random.RandomState(0)
  88. U_init = rng.randn(5, 3)
  89. V_init = rng.randn(3, 4)
  90. model = SparsePCA(
  91. n_components=3, U_init=U_init, V_init=V_init, max_iter=0, random_state=rng
  92. )
  93. model.fit(rng.randn(5, 4))
  94. assert_allclose(model.components_, V_init / np.linalg.norm(V_init, axis=1)[:, None])
  95. def test_mini_batch_correct_shapes():
  96. rng = np.random.RandomState(0)
  97. X = rng.randn(12, 10)
  98. pca = MiniBatchSparsePCA(n_components=8, max_iter=1, random_state=rng)
  99. U = pca.fit_transform(X)
  100. assert pca.components_.shape == (8, 10)
  101. assert U.shape == (12, 8)
  102. # test overcomplete decomposition
  103. pca = MiniBatchSparsePCA(n_components=13, max_iter=1, random_state=rng)
  104. U = pca.fit_transform(X)
  105. assert pca.components_.shape == (13, 10)
  106. assert U.shape == (12, 13)
  107. # XXX: test always skipped
  108. @pytest.mark.skipif(True, reason="skipping mini_batch_fit_transform.")
  109. def test_mini_batch_fit_transform():
  110. alpha = 1
  111. rng = np.random.RandomState(0)
  112. Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
  113. spca_lars = MiniBatchSparsePCA(n_components=3, random_state=0, alpha=alpha).fit(Y)
  114. U1 = spca_lars.transform(Y)
  115. # Test multiple CPUs
  116. if sys.platform == "win32": # fake parallelism for win32
  117. import joblib
  118. _mp = joblib.parallel.multiprocessing
  119. joblib.parallel.multiprocessing = None
  120. try:
  121. spca = MiniBatchSparsePCA(
  122. n_components=3, n_jobs=2, alpha=alpha, random_state=0
  123. )
  124. U2 = spca.fit(Y).transform(Y)
  125. finally:
  126. joblib.parallel.multiprocessing = _mp
  127. else: # we can efficiently use parallelism
  128. spca = MiniBatchSparsePCA(n_components=3, n_jobs=2, alpha=alpha, random_state=0)
  129. U2 = spca.fit(Y).transform(Y)
  130. assert not np.all(spca_lars.components_ == 0)
  131. assert_array_almost_equal(U1, U2)
  132. # Test that CD gives similar results
  133. spca_lasso = MiniBatchSparsePCA(
  134. n_components=3, method="cd", alpha=alpha, random_state=0
  135. ).fit(Y)
  136. assert_array_almost_equal(spca_lasso.components_, spca_lars.components_)
  137. def test_scaling_fit_transform():
  138. alpha = 1
  139. rng = np.random.RandomState(0)
  140. Y, _, _ = generate_toy_data(3, 1000, (8, 8), random_state=rng)
  141. spca_lars = SparsePCA(n_components=3, method="lars", alpha=alpha, random_state=rng)
  142. results_train = spca_lars.fit_transform(Y)
  143. results_test = spca_lars.transform(Y[:10])
  144. assert_allclose(results_train[0], results_test[0])
  145. def test_pca_vs_spca():
  146. rng = np.random.RandomState(0)
  147. Y, _, _ = generate_toy_data(3, 1000, (8, 8), random_state=rng)
  148. Z, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng)
  149. spca = SparsePCA(alpha=0, ridge_alpha=0, n_components=2)
  150. pca = PCA(n_components=2)
  151. pca.fit(Y)
  152. spca.fit(Y)
  153. results_test_pca = pca.transform(Z)
  154. results_test_spca = spca.transform(Z)
  155. assert_allclose(
  156. np.abs(spca.components_.dot(pca.components_.T)), np.eye(2), atol=1e-5
  157. )
  158. results_test_pca *= np.sign(results_test_pca[0, :])
  159. results_test_spca *= np.sign(results_test_spca[0, :])
  160. assert_allclose(results_test_pca, results_test_spca)
  161. @pytest.mark.parametrize("SPCA", [SparsePCA, MiniBatchSparsePCA])
  162. @pytest.mark.parametrize("n_components", [None, 3])
  163. def test_spca_n_components_(SPCA, n_components):
  164. rng = np.random.RandomState(0)
  165. n_samples, n_features = 12, 10
  166. X = rng.randn(n_samples, n_features)
  167. model = SPCA(n_components=n_components).fit(X)
  168. if n_components is not None:
  169. assert model.n_components_ == n_components
  170. else:
  171. assert model.n_components_ == n_features
  172. @pytest.mark.parametrize("SPCA", (SparsePCA, MiniBatchSparsePCA))
  173. @pytest.mark.parametrize("method", ("lars", "cd"))
  174. @pytest.mark.parametrize(
  175. "data_type, expected_type",
  176. (
  177. (np.float32, np.float32),
  178. (np.float64, np.float64),
  179. (np.int32, np.float64),
  180. (np.int64, np.float64),
  181. ),
  182. )
  183. def test_sparse_pca_dtype_match(SPCA, method, data_type, expected_type):
  184. # Verify output matrix dtype
  185. n_samples, n_features, n_components = 12, 10, 3
  186. rng = np.random.RandomState(0)
  187. input_array = rng.randn(n_samples, n_features).astype(data_type)
  188. model = SPCA(n_components=n_components, method=method)
  189. transformed = model.fit_transform(input_array)
  190. assert transformed.dtype == expected_type
  191. assert model.components_.dtype == expected_type
  192. @pytest.mark.parametrize("SPCA", (SparsePCA, MiniBatchSparsePCA))
  193. @pytest.mark.parametrize("method", ("lars", "cd"))
  194. def test_sparse_pca_numerical_consistency(SPCA, method):
  195. # Verify numericall consistentency among np.float32 and np.float64
  196. rtol = 1e-3
  197. alpha = 2
  198. n_samples, n_features, n_components = 12, 10, 3
  199. rng = np.random.RandomState(0)
  200. input_array = rng.randn(n_samples, n_features)
  201. model_32 = SPCA(
  202. n_components=n_components, alpha=alpha, method=method, random_state=0
  203. )
  204. transformed_32 = model_32.fit_transform(input_array.astype(np.float32))
  205. model_64 = SPCA(
  206. n_components=n_components, alpha=alpha, method=method, random_state=0
  207. )
  208. transformed_64 = model_64.fit_transform(input_array.astype(np.float64))
  209. assert_allclose(transformed_64, transformed_32, rtol=rtol)
  210. assert_allclose(model_64.components_, model_32.components_, rtol=rtol)
  211. @pytest.mark.parametrize("SPCA", [SparsePCA, MiniBatchSparsePCA])
  212. def test_spca_feature_names_out(SPCA):
  213. """Check feature names out for *SparsePCA."""
  214. rng = np.random.RandomState(0)
  215. n_samples, n_features = 12, 10
  216. X = rng.randn(n_samples, n_features)
  217. model = SPCA(n_components=4).fit(X)
  218. names = model.get_feature_names_out()
  219. estimator_name = SPCA.__name__.lower()
  220. assert_array_equal([f"{estimator_name}{i}" for i in range(4)], names)
  221. # TODO (1.4): remove this test
  222. def test_spca_n_iter_deprecation():
  223. """Check that we raise a warning for the deprecation of `n_iter` and it is ignored
  224. when `max_iter` is specified.
  225. """
  226. rng = np.random.RandomState(0)
  227. n_samples, n_features = 12, 10
  228. X = rng.randn(n_samples, n_features)
  229. warn_msg = "'n_iter' is deprecated in version 1.1 and will be removed"
  230. with pytest.warns(FutureWarning, match=warn_msg):
  231. MiniBatchSparsePCA(n_iter=2).fit(X)
  232. n_iter, max_iter = 1, 100
  233. with pytest.warns(FutureWarning, match=warn_msg):
  234. model = MiniBatchSparsePCA(
  235. n_iter=n_iter, max_iter=max_iter, random_state=0
  236. ).fit(X)
  237. assert model.n_iter_ > 1
  238. assert model.n_iter_ <= max_iter
  239. def test_pca_n_features_deprecation():
  240. X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])
  241. pca = PCA(n_components=2).fit(X)
  242. with pytest.warns(FutureWarning, match="`n_features_` was deprecated"):
  243. pca.n_features_
  244. def test_spca_early_stopping(global_random_seed):
  245. """Check that `tol` and `max_no_improvement` act as early stopping."""
  246. rng = np.random.RandomState(global_random_seed)
  247. n_samples, n_features = 50, 10
  248. X = rng.randn(n_samples, n_features)
  249. # vary the tolerance to force the early stopping of one of the model
  250. model_early_stopped = MiniBatchSparsePCA(
  251. max_iter=100, tol=0.5, random_state=global_random_seed
  252. ).fit(X)
  253. model_not_early_stopped = MiniBatchSparsePCA(
  254. max_iter=100, tol=1e-3, random_state=global_random_seed
  255. ).fit(X)
  256. assert model_early_stopped.n_iter_ < model_not_early_stopped.n_iter_
  257. # force the max number of no improvement to a large value to check that
  258. # it does help to early stop
  259. model_early_stopped = MiniBatchSparsePCA(
  260. max_iter=100, tol=1e-6, max_no_improvement=2, random_state=global_random_seed
  261. ).fit(X)
  262. model_not_early_stopped = MiniBatchSparsePCA(
  263. max_iter=100, tol=1e-6, max_no_improvement=100, random_state=global_random_seed
  264. ).fit(X)
  265. assert model_early_stopped.n_iter_ < model_not_early_stopped.n_iter_
  266. def test_equivalence_components_pca_spca(global_random_seed):
  267. """Check the equivalence of the components found by PCA and SparsePCA.
  268. Non-regression test for:
  269. https://github.com/scikit-learn/scikit-learn/issues/23932
  270. """
  271. rng = np.random.RandomState(global_random_seed)
  272. X = rng.randn(50, 4)
  273. n_components = 2
  274. pca = PCA(
  275. n_components=n_components,
  276. svd_solver="randomized",
  277. random_state=0,
  278. ).fit(X)
  279. spca = SparsePCA(
  280. n_components=n_components,
  281. method="lars",
  282. ridge_alpha=0,
  283. alpha=0,
  284. random_state=0,
  285. ).fit(X)
  286. assert_allclose(pca.components_, spca.components_)
  287. def test_sparse_pca_inverse_transform():
  288. """Check that `inverse_transform` in `SparsePCA` and `PCA` are similar."""
  289. rng = np.random.RandomState(0)
  290. n_samples, n_features = 10, 5
  291. X = rng.randn(n_samples, n_features)
  292. n_components = 2
  293. spca = SparsePCA(
  294. n_components=n_components, alpha=1e-12, ridge_alpha=1e-12, random_state=0
  295. )
  296. pca = PCA(n_components=n_components, random_state=0)
  297. X_trans_spca = spca.fit_transform(X)
  298. X_trans_pca = pca.fit_transform(X)
  299. assert_allclose(
  300. spca.inverse_transform(X_trans_spca), pca.inverse_transform(X_trans_pca)
  301. )
  302. @pytest.mark.parametrize("SPCA", [SparsePCA, MiniBatchSparsePCA])
  303. def test_transform_inverse_transform_round_trip(SPCA):
  304. """Check the `transform` and `inverse_transform` round trip with no loss of
  305. information.
  306. """
  307. rng = np.random.RandomState(0)
  308. n_samples, n_features = 10, 5
  309. X = rng.randn(n_samples, n_features)
  310. n_components = n_features
  311. spca = SPCA(
  312. n_components=n_components, alpha=1e-12, ridge_alpha=1e-12, random_state=0
  313. )
  314. X_trans_spca = spca.fit_transform(X)
  315. assert_allclose(spca.inverse_transform(X_trans_spca), X)