| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049 |
- # Authors: Olivier Grisel <olivier.grisel@ensta.org>
- # Mathieu Blondel <mathieu@mblondel.org>
- # Denis Engemann <denis-alexander.engemann@inria.fr>
- #
- # License: BSD 3 clause
- import numpy as np
- import pytest
- from scipy import linalg, sparse
- from scipy.linalg import eigh
- from scipy.sparse.linalg import eigsh
- from scipy.special import expit
- from sklearn.datasets import make_low_rank_matrix, make_sparse_spd_matrix
- from sklearn.utils import gen_batches
- from sklearn.utils._arpack import _init_arpack_v0
- from sklearn.utils._testing import (
- assert_allclose,
- assert_allclose_dense_sparse,
- assert_almost_equal,
- assert_array_almost_equal,
- assert_array_equal,
- skip_if_32bit,
- )
- from sklearn.utils.extmath import (
- _deterministic_vector_sign_flip,
- _incremental_mean_and_var,
- _randomized_eigsh,
- _safe_accumulator_op,
- cartesian,
- density,
- log_logistic,
- randomized_svd,
- row_norms,
- safe_sparse_dot,
- softmax,
- stable_cumsum,
- svd_flip,
- weighted_mode,
- )
- from sklearn.utils.fixes import _mode
- def test_density():
- rng = np.random.RandomState(0)
- X = rng.randint(10, size=(10, 5))
- X[1, 2] = 0
- X[5, 3] = 0
- X_csr = sparse.csr_matrix(X)
- X_csc = sparse.csc_matrix(X)
- X_coo = sparse.coo_matrix(X)
- X_lil = sparse.lil_matrix(X)
- for X_ in (X_csr, X_csc, X_coo, X_lil):
- assert density(X_) == density(X)
- # TODO(1.4): Remove test
- def test_density_deprecated_kwargs():
- """Check that future warning is raised when user enters keyword arguments."""
- test_array = np.array([[1, 2, 3], [4, 5, 6]])
- with pytest.warns(
- FutureWarning,
- match=(
- "Additional keyword arguments are deprecated in version 1.2 and will be"
- " removed in version 1.4."
- ),
- ):
- density(test_array, a=1)
- def test_uniform_weights():
- # with uniform weights, results should be identical to stats.mode
- rng = np.random.RandomState(0)
- x = rng.randint(10, size=(10, 5))
- weights = np.ones(x.shape)
- for axis in (None, 0, 1):
- mode, score = _mode(x, axis)
- mode2, score2 = weighted_mode(x, weights, axis=axis)
- assert_array_equal(mode, mode2)
- assert_array_equal(score, score2)
- def test_random_weights():
- # set this up so that each row should have a weighted mode of 6,
- # with a score that is easily reproduced
- mode_result = 6
- rng = np.random.RandomState(0)
- x = rng.randint(mode_result, size=(100, 10))
- w = rng.random_sample(x.shape)
- x[:, :5] = mode_result
- w[:, :5] += 1
- mode, score = weighted_mode(x, w, axis=1)
- assert_array_equal(mode, mode_result)
- assert_array_almost_equal(score.ravel(), w[:, :5].sum(1))
- def check_randomized_svd_low_rank(dtype):
- # Check that extmath.randomized_svd is consistent with linalg.svd
- n_samples = 100
- n_features = 500
- rank = 5
- k = 10
- decimal = 5 if dtype == np.float32 else 7
- dtype = np.dtype(dtype)
- # generate a matrix X of approximate effective rank `rank` and no noise
- # component (very structured signal):
- X = make_low_rank_matrix(
- n_samples=n_samples,
- n_features=n_features,
- effective_rank=rank,
- tail_strength=0.0,
- random_state=0,
- ).astype(dtype, copy=False)
- assert X.shape == (n_samples, n_features)
- # compute the singular values of X using the slow exact method
- U, s, Vt = linalg.svd(X, full_matrices=False)
- # Convert the singular values to the specific dtype
- U = U.astype(dtype, copy=False)
- s = s.astype(dtype, copy=False)
- Vt = Vt.astype(dtype, copy=False)
- for normalizer in ["auto", "LU", "QR"]: # 'none' would not be stable
- # compute the singular values of X using the fast approximate method
- Ua, sa, Va = randomized_svd(
- X, k, power_iteration_normalizer=normalizer, random_state=0
- )
- # If the input dtype is float, then the output dtype is float of the
- # same bit size (f32 is not upcast to f64)
- # But if the input dtype is int, the output dtype is float64
- if dtype.kind == "f":
- assert Ua.dtype == dtype
- assert sa.dtype == dtype
- assert Va.dtype == dtype
- else:
- assert Ua.dtype == np.float64
- assert sa.dtype == np.float64
- assert Va.dtype == np.float64
- assert Ua.shape == (n_samples, k)
- assert sa.shape == (k,)
- assert Va.shape == (k, n_features)
- # ensure that the singular values of both methods are equal up to the
- # real rank of the matrix
- assert_almost_equal(s[:k], sa, decimal=decimal)
- # check the singular vectors too (while not checking the sign)
- assert_almost_equal(
- np.dot(U[:, :k], Vt[:k, :]), np.dot(Ua, Va), decimal=decimal
- )
- # check the sparse matrix representation
- X = sparse.csr_matrix(X)
- # compute the singular values of X using the fast approximate method
- Ua, sa, Va = randomized_svd(
- X, k, power_iteration_normalizer=normalizer, random_state=0
- )
- if dtype.kind == "f":
- assert Ua.dtype == dtype
- assert sa.dtype == dtype
- assert Va.dtype == dtype
- else:
- assert Ua.dtype.kind == "f"
- assert sa.dtype.kind == "f"
- assert Va.dtype.kind == "f"
- assert_almost_equal(s[:rank], sa[:rank], decimal=decimal)
- @pytest.mark.parametrize("dtype", (np.int32, np.int64, np.float32, np.float64))
- def test_randomized_svd_low_rank_all_dtypes(dtype):
- check_randomized_svd_low_rank(dtype)
- @pytest.mark.parametrize("dtype", (np.int32, np.int64, np.float32, np.float64))
- def test_randomized_eigsh(dtype):
- """Test that `_randomized_eigsh` returns the appropriate components"""
- rng = np.random.RandomState(42)
- X = np.diag(np.array([1.0, -2.0, 0.0, 3.0], dtype=dtype))
- # random rotation that preserves the eigenvalues of X
- rand_rot = np.linalg.qr(rng.normal(size=X.shape))[0]
- X = rand_rot @ X @ rand_rot.T
- # with 'module' selection method, the negative eigenvalue shows up
- eigvals, eigvecs = _randomized_eigsh(X, n_components=2, selection="module")
- # eigenvalues
- assert eigvals.shape == (2,)
- assert_array_almost_equal(eigvals, [3.0, -2.0]) # negative eigenvalue here
- # eigenvectors
- assert eigvecs.shape == (4, 2)
- # with 'value' selection method, the negative eigenvalue does not show up
- with pytest.raises(NotImplementedError):
- _randomized_eigsh(X, n_components=2, selection="value")
- @pytest.mark.parametrize("k", (10, 50, 100, 199, 200))
- def test_randomized_eigsh_compared_to_others(k):
- """Check that `_randomized_eigsh` is similar to other `eigsh`
- Tests that for a random PSD matrix, `_randomized_eigsh` provides results
- comparable to LAPACK (scipy.linalg.eigh) and ARPACK
- (scipy.sparse.linalg.eigsh).
- Note: some versions of ARPACK do not support k=n_features.
- """
- # make a random PSD matrix
- n_features = 200
- X = make_sparse_spd_matrix(n_features, random_state=0)
- # compare two versions of randomized
- # rough and fast
- eigvals, eigvecs = _randomized_eigsh(
- X, n_components=k, selection="module", n_iter=25, random_state=0
- )
- # more accurate but slow (TODO find realistic settings here)
- eigvals_qr, eigvecs_qr = _randomized_eigsh(
- X,
- n_components=k,
- n_iter=25,
- n_oversamples=20,
- random_state=0,
- power_iteration_normalizer="QR",
- selection="module",
- )
- # with LAPACK
- eigvals_lapack, eigvecs_lapack = eigh(
- X, subset_by_index=(n_features - k, n_features - 1)
- )
- indices = eigvals_lapack.argsort()[::-1]
- eigvals_lapack = eigvals_lapack[indices]
- eigvecs_lapack = eigvecs_lapack[:, indices]
- # -- eigenvalues comparison
- assert eigvals_lapack.shape == (k,)
- # comparison precision
- assert_array_almost_equal(eigvals, eigvals_lapack, decimal=6)
- assert_array_almost_equal(eigvals_qr, eigvals_lapack, decimal=6)
- # -- eigenvectors comparison
- assert eigvecs_lapack.shape == (n_features, k)
- # flip eigenvectors' sign to enforce deterministic output
- dummy_vecs = np.zeros_like(eigvecs).T
- eigvecs, _ = svd_flip(eigvecs, dummy_vecs)
- eigvecs_qr, _ = svd_flip(eigvecs_qr, dummy_vecs)
- eigvecs_lapack, _ = svd_flip(eigvecs_lapack, dummy_vecs)
- assert_array_almost_equal(eigvecs, eigvecs_lapack, decimal=4)
- assert_array_almost_equal(eigvecs_qr, eigvecs_lapack, decimal=6)
- # comparison ARPACK ~ LAPACK (some ARPACK implems do not support k=n)
- if k < n_features:
- v0 = _init_arpack_v0(n_features, random_state=0)
- # "LA" largest algebraic <=> selection="value" in randomized_eigsh
- eigvals_arpack, eigvecs_arpack = eigsh(
- X, k, which="LA", tol=0, maxiter=None, v0=v0
- )
- indices = eigvals_arpack.argsort()[::-1]
- # eigenvalues
- eigvals_arpack = eigvals_arpack[indices]
- assert_array_almost_equal(eigvals_lapack, eigvals_arpack, decimal=10)
- # eigenvectors
- eigvecs_arpack = eigvecs_arpack[:, indices]
- eigvecs_arpack, _ = svd_flip(eigvecs_arpack, dummy_vecs)
- assert_array_almost_equal(eigvecs_arpack, eigvecs_lapack, decimal=8)
- @pytest.mark.parametrize(
- "n,rank",
- [
- (10, 7),
- (100, 10),
- (100, 80),
- (500, 10),
- (500, 250),
- (500, 400),
- ],
- )
- def test_randomized_eigsh_reconst_low_rank(n, rank):
- """Check that randomized_eigsh is able to reconstruct a low rank psd matrix
- Tests that the decomposition provided by `_randomized_eigsh` leads to
- orthonormal eigenvectors, and that a low rank PSD matrix can be effectively
- reconstructed with good accuracy using it.
- """
- assert rank < n
- # create a low rank PSD
- rng = np.random.RandomState(69)
- X = rng.randn(n, rank)
- A = X @ X.T
- # approximate A with the "right" number of components
- S, V = _randomized_eigsh(A, n_components=rank, random_state=rng)
- # orthonormality checks
- assert_array_almost_equal(np.linalg.norm(V, axis=0), np.ones(S.shape))
- assert_array_almost_equal(V.T @ V, np.diag(np.ones(S.shape)))
- # reconstruction
- A_reconstruct = V @ np.diag(S) @ V.T
- # test that the approximation is good
- assert_array_almost_equal(A_reconstruct, A, decimal=6)
- @pytest.mark.parametrize("dtype", (np.float32, np.float64))
- def test_row_norms(dtype):
- X = np.random.RandomState(42).randn(100, 100)
- if dtype is np.float32:
- precision = 4
- else:
- precision = 5
- X = X.astype(dtype, copy=False)
- sq_norm = (X**2).sum(axis=1)
- assert_array_almost_equal(sq_norm, row_norms(X, squared=True), precision)
- assert_array_almost_equal(np.sqrt(sq_norm), row_norms(X), precision)
- for csr_index_dtype in [np.int32, np.int64]:
- Xcsr = sparse.csr_matrix(X, dtype=dtype)
- # csr_matrix will use int32 indices by default,
- # up-casting those to int64 when necessary
- if csr_index_dtype is np.int64:
- Xcsr.indptr = Xcsr.indptr.astype(csr_index_dtype, copy=False)
- Xcsr.indices = Xcsr.indices.astype(csr_index_dtype, copy=False)
- assert Xcsr.indices.dtype == csr_index_dtype
- assert Xcsr.indptr.dtype == csr_index_dtype
- assert_array_almost_equal(sq_norm, row_norms(Xcsr, squared=True), precision)
- assert_array_almost_equal(np.sqrt(sq_norm), row_norms(Xcsr), precision)
- def test_randomized_svd_low_rank_with_noise():
- # Check that extmath.randomized_svd can handle noisy matrices
- n_samples = 100
- n_features = 500
- rank = 5
- k = 10
- # generate a matrix X wity structure approximate rank `rank` and an
- # important noisy component
- X = make_low_rank_matrix(
- n_samples=n_samples,
- n_features=n_features,
- effective_rank=rank,
- tail_strength=0.1,
- random_state=0,
- )
- assert X.shape == (n_samples, n_features)
- # compute the singular values of X using the slow exact method
- _, s, _ = linalg.svd(X, full_matrices=False)
- for normalizer in ["auto", "none", "LU", "QR"]:
- # compute the singular values of X using the fast approximate
- # method without the iterated power method
- _, sa, _ = randomized_svd(
- X, k, n_iter=0, power_iteration_normalizer=normalizer, random_state=0
- )
- # the approximation does not tolerate the noise:
- assert np.abs(s[:k] - sa).max() > 0.01
- # compute the singular values of X using the fast approximate
- # method with iterated power method
- _, sap, _ = randomized_svd(
- X, k, power_iteration_normalizer=normalizer, random_state=0
- )
- # the iterated power method is helping getting rid of the noise:
- assert_almost_equal(s[:k], sap, decimal=3)
- def test_randomized_svd_infinite_rank():
- # Check that extmath.randomized_svd can handle noisy matrices
- n_samples = 100
- n_features = 500
- rank = 5
- k = 10
- # let us try again without 'low_rank component': just regularly but slowly
- # decreasing singular values: the rank of the data matrix is infinite
- X = make_low_rank_matrix(
- n_samples=n_samples,
- n_features=n_features,
- effective_rank=rank,
- tail_strength=1.0,
- random_state=0,
- )
- assert X.shape == (n_samples, n_features)
- # compute the singular values of X using the slow exact method
- _, s, _ = linalg.svd(X, full_matrices=False)
- for normalizer in ["auto", "none", "LU", "QR"]:
- # compute the singular values of X using the fast approximate method
- # without the iterated power method
- _, sa, _ = randomized_svd(
- X, k, n_iter=0, power_iteration_normalizer=normalizer, random_state=0
- )
- # the approximation does not tolerate the noise:
- assert np.abs(s[:k] - sa).max() > 0.1
- # compute the singular values of X using the fast approximate method
- # with iterated power method
- _, sap, _ = randomized_svd(
- X, k, n_iter=5, power_iteration_normalizer=normalizer, random_state=0
- )
- # the iterated power method is still managing to get most of the
- # structure at the requested rank
- assert_almost_equal(s[:k], sap, decimal=3)
- def test_randomized_svd_transpose_consistency():
- # Check that transposing the design matrix has limited impact
- n_samples = 100
- n_features = 500
- rank = 4
- k = 10
- X = make_low_rank_matrix(
- n_samples=n_samples,
- n_features=n_features,
- effective_rank=rank,
- tail_strength=0.5,
- random_state=0,
- )
- assert X.shape == (n_samples, n_features)
- U1, s1, V1 = randomized_svd(X, k, n_iter=3, transpose=False, random_state=0)
- U2, s2, V2 = randomized_svd(X, k, n_iter=3, transpose=True, random_state=0)
- U3, s3, V3 = randomized_svd(X, k, n_iter=3, transpose="auto", random_state=0)
- U4, s4, V4 = linalg.svd(X, full_matrices=False)
- assert_almost_equal(s1, s4[:k], decimal=3)
- assert_almost_equal(s2, s4[:k], decimal=3)
- assert_almost_equal(s3, s4[:k], decimal=3)
- assert_almost_equal(np.dot(U1, V1), np.dot(U4[:, :k], V4[:k, :]), decimal=2)
- assert_almost_equal(np.dot(U2, V2), np.dot(U4[:, :k], V4[:k, :]), decimal=2)
- # in this case 'auto' is equivalent to transpose
- assert_almost_equal(s2, s3)
- def test_randomized_svd_power_iteration_normalizer():
- # randomized_svd with power_iteration_normalized='none' diverges for
- # large number of power iterations on this dataset
- rng = np.random.RandomState(42)
- X = make_low_rank_matrix(100, 500, effective_rank=50, random_state=rng)
- X += 3 * rng.randint(0, 2, size=X.shape)
- n_components = 50
- # Check that it diverges with many (non-normalized) power iterations
- U, s, Vt = randomized_svd(
- X, n_components, n_iter=2, power_iteration_normalizer="none", random_state=0
- )
- A = X - U.dot(np.diag(s).dot(Vt))
- error_2 = linalg.norm(A, ord="fro")
- U, s, Vt = randomized_svd(
- X, n_components, n_iter=20, power_iteration_normalizer="none", random_state=0
- )
- A = X - U.dot(np.diag(s).dot(Vt))
- error_20 = linalg.norm(A, ord="fro")
- assert np.abs(error_2 - error_20) > 100
- for normalizer in ["LU", "QR", "auto"]:
- U, s, Vt = randomized_svd(
- X,
- n_components,
- n_iter=2,
- power_iteration_normalizer=normalizer,
- random_state=0,
- )
- A = X - U.dot(np.diag(s).dot(Vt))
- error_2 = linalg.norm(A, ord="fro")
- for i in [5, 10, 50]:
- U, s, Vt = randomized_svd(
- X,
- n_components,
- n_iter=i,
- power_iteration_normalizer=normalizer,
- random_state=0,
- )
- A = X - U.dot(np.diag(s).dot(Vt))
- error = linalg.norm(A, ord="fro")
- assert 15 > np.abs(error_2 - error)
- def test_randomized_svd_sparse_warnings():
- # randomized_svd throws a warning for lil and dok matrix
- rng = np.random.RandomState(42)
- X = make_low_rank_matrix(50, 20, effective_rank=10, random_state=rng)
- n_components = 5
- for cls in (sparse.lil_matrix, sparse.dok_matrix):
- X = cls(X)
- warn_msg = (
- "Calculating SVD of a {} is expensive. "
- "csr_matrix is more efficient.".format(cls.__name__)
- )
- with pytest.warns(sparse.SparseEfficiencyWarning, match=warn_msg):
- randomized_svd(X, n_components, n_iter=1, power_iteration_normalizer="none")
- def test_svd_flip():
- # Check that svd_flip works in both situations, and reconstructs input.
- rs = np.random.RandomState(1999)
- n_samples = 20
- n_features = 10
- X = rs.randn(n_samples, n_features)
- # Check matrix reconstruction
- U, S, Vt = linalg.svd(X, full_matrices=False)
- U1, V1 = svd_flip(U, Vt, u_based_decision=False)
- assert_almost_equal(np.dot(U1 * S, V1), X, decimal=6)
- # Check transposed matrix reconstruction
- XT = X.T
- U, S, Vt = linalg.svd(XT, full_matrices=False)
- U2, V2 = svd_flip(U, Vt, u_based_decision=True)
- assert_almost_equal(np.dot(U2 * S, V2), XT, decimal=6)
- # Check that different flip methods are equivalent under reconstruction
- U_flip1, V_flip1 = svd_flip(U, Vt, u_based_decision=True)
- assert_almost_equal(np.dot(U_flip1 * S, V_flip1), XT, decimal=6)
- U_flip2, V_flip2 = svd_flip(U, Vt, u_based_decision=False)
- assert_almost_equal(np.dot(U_flip2 * S, V_flip2), XT, decimal=6)
- def test_randomized_svd_sign_flip():
- a = np.array([[2.0, 0.0], [0.0, 1.0]])
- u1, s1, v1 = randomized_svd(a, 2, flip_sign=True, random_state=41)
- for seed in range(10):
- u2, s2, v2 = randomized_svd(a, 2, flip_sign=True, random_state=seed)
- assert_almost_equal(u1, u2)
- assert_almost_equal(v1, v2)
- assert_almost_equal(np.dot(u2 * s2, v2), a)
- assert_almost_equal(np.dot(u2.T, u2), np.eye(2))
- assert_almost_equal(np.dot(v2.T, v2), np.eye(2))
- def test_randomized_svd_sign_flip_with_transpose():
- # Check if the randomized_svd sign flipping is always done based on u
- # irrespective of transpose.
- # See https://github.com/scikit-learn/scikit-learn/issues/5608
- # for more details.
- def max_loading_is_positive(u, v):
- """
- returns bool tuple indicating if the values maximising np.abs
- are positive across all rows for u and across all columns for v.
- """
- u_based = (np.abs(u).max(axis=0) == u.max(axis=0)).all()
- v_based = (np.abs(v).max(axis=1) == v.max(axis=1)).all()
- return u_based, v_based
- mat = np.arange(10 * 8).reshape(10, -1)
- # Without transpose
- u_flipped, _, v_flipped = randomized_svd(mat, 3, flip_sign=True, random_state=0)
- u_based, v_based = max_loading_is_positive(u_flipped, v_flipped)
- assert u_based
- assert not v_based
- # With transpose
- u_flipped_with_transpose, _, v_flipped_with_transpose = randomized_svd(
- mat, 3, flip_sign=True, transpose=True, random_state=0
- )
- u_based, v_based = max_loading_is_positive(
- u_flipped_with_transpose, v_flipped_with_transpose
- )
- assert u_based
- assert not v_based
- @pytest.mark.parametrize("n", [50, 100, 300])
- @pytest.mark.parametrize("m", [50, 100, 300])
- @pytest.mark.parametrize("k", [10, 20, 50])
- @pytest.mark.parametrize("seed", range(5))
- def test_randomized_svd_lapack_driver(n, m, k, seed):
- # Check that different SVD drivers provide consistent results
- # Matrix being compressed
- rng = np.random.RandomState(seed)
- X = rng.rand(n, m)
- # Number of components
- u1, s1, vt1 = randomized_svd(X, k, svd_lapack_driver="gesdd", random_state=0)
- u2, s2, vt2 = randomized_svd(X, k, svd_lapack_driver="gesvd", random_state=0)
- # Check shape and contents
- assert u1.shape == u2.shape
- assert_allclose(u1, u2, atol=0, rtol=1e-3)
- assert s1.shape == s2.shape
- assert_allclose(s1, s2, atol=0, rtol=1e-3)
- assert vt1.shape == vt2.shape
- assert_allclose(vt1, vt2, atol=0, rtol=1e-3)
- def test_cartesian():
- # Check if cartesian product delivers the right results
- axes = (np.array([1, 2, 3]), np.array([4, 5]), np.array([6, 7]))
- true_out = np.array(
- [
- [1, 4, 6],
- [1, 4, 7],
- [1, 5, 6],
- [1, 5, 7],
- [2, 4, 6],
- [2, 4, 7],
- [2, 5, 6],
- [2, 5, 7],
- [3, 4, 6],
- [3, 4, 7],
- [3, 5, 6],
- [3, 5, 7],
- ]
- )
- out = cartesian(axes)
- assert_array_equal(true_out, out)
- # check single axis
- x = np.arange(3)
- assert_array_equal(x[:, np.newaxis], cartesian((x,)))
- @pytest.mark.parametrize(
- "arrays, output_dtype",
- [
- (
- [np.array([1, 2, 3], dtype=np.int32), np.array([4, 5], dtype=np.int64)],
- np.dtype(np.int64),
- ),
- (
- [np.array([1, 2, 3], dtype=np.int32), np.array([4, 5], dtype=np.float64)],
- np.dtype(np.float64),
- ),
- (
- [np.array([1, 2, 3], dtype=np.int32), np.array(["x", "y"], dtype=object)],
- np.dtype(object),
- ),
- ],
- )
- def test_cartesian_mix_types(arrays, output_dtype):
- """Check that the cartesian product works with mixed types."""
- output = cartesian(arrays)
- assert output.dtype == output_dtype
- def test_logistic_sigmoid():
- # Check correctness and robustness of logistic sigmoid implementation
- def naive_log_logistic(x):
- return np.log(expit(x))
- x = np.linspace(-2, 2, 50)
- assert_array_almost_equal(log_logistic(x), naive_log_logistic(x))
- extreme_x = np.array([-100.0, 100.0])
- assert_array_almost_equal(log_logistic(extreme_x), [-100, 0])
- @pytest.fixture()
- def rng():
- return np.random.RandomState(42)
- @pytest.mark.parametrize("dtype", [np.float32, np.float64])
- def test_incremental_weighted_mean_and_variance_simple(rng, dtype):
- mult = 10
- X = rng.rand(1000, 20).astype(dtype) * mult
- sample_weight = rng.rand(X.shape[0]) * mult
- mean, var, _ = _incremental_mean_and_var(X, 0, 0, 0, sample_weight=sample_weight)
- expected_mean = np.average(X, weights=sample_weight, axis=0)
- expected_var = (
- np.average(X**2, weights=sample_weight, axis=0) - expected_mean**2
- )
- assert_almost_equal(mean, expected_mean)
- assert_almost_equal(var, expected_var)
- @pytest.mark.parametrize("mean", [0, 1e7, -1e7])
- @pytest.mark.parametrize("var", [1, 1e-8, 1e5])
- @pytest.mark.parametrize(
- "weight_loc, weight_scale", [(0, 1), (0, 1e-8), (1, 1e-8), (10, 1), (1e7, 1)]
- )
- def test_incremental_weighted_mean_and_variance(
- mean, var, weight_loc, weight_scale, rng
- ):
- # Testing of correctness and numerical stability
- def _assert(X, sample_weight, expected_mean, expected_var):
- n = X.shape[0]
- for chunk_size in [1, n // 10 + 1, n // 4 + 1, n // 2 + 1, n]:
- last_mean, last_weight_sum, last_var = 0, 0, 0
- for batch in gen_batches(n, chunk_size):
- last_mean, last_var, last_weight_sum = _incremental_mean_and_var(
- X[batch],
- last_mean,
- last_var,
- last_weight_sum,
- sample_weight=sample_weight[batch],
- )
- assert_allclose(last_mean, expected_mean)
- assert_allclose(last_var, expected_var, atol=1e-6)
- size = (100, 20)
- weight = rng.normal(loc=weight_loc, scale=weight_scale, size=size[0])
- # Compare to weighted average: np.average
- X = rng.normal(loc=mean, scale=var, size=size)
- expected_mean = _safe_accumulator_op(np.average, X, weights=weight, axis=0)
- expected_var = _safe_accumulator_op(
- np.average, (X - expected_mean) ** 2, weights=weight, axis=0
- )
- _assert(X, weight, expected_mean, expected_var)
- # Compare to unweighted mean: np.mean
- X = rng.normal(loc=mean, scale=var, size=size)
- ones_weight = np.ones(size[0])
- expected_mean = _safe_accumulator_op(np.mean, X, axis=0)
- expected_var = _safe_accumulator_op(np.var, X, axis=0)
- _assert(X, ones_weight, expected_mean, expected_var)
- @pytest.mark.parametrize("dtype", [np.float32, np.float64])
- def test_incremental_weighted_mean_and_variance_ignore_nan(dtype):
- old_means = np.array([535.0, 535.0, 535.0, 535.0])
- old_variances = np.array([4225.0, 4225.0, 4225.0, 4225.0])
- old_weight_sum = np.array([2, 2, 2, 2], dtype=np.int32)
- sample_weights_X = np.ones(3)
- sample_weights_X_nan = np.ones(4)
- X = np.array(
- [[170, 170, 170, 170], [430, 430, 430, 430], [300, 300, 300, 300]]
- ).astype(dtype)
- X_nan = np.array(
- [
- [170, np.nan, 170, 170],
- [np.nan, 170, 430, 430],
- [430, 430, np.nan, 300],
- [300, 300, 300, np.nan],
- ]
- ).astype(dtype)
- X_means, X_variances, X_count = _incremental_mean_and_var(
- X, old_means, old_variances, old_weight_sum, sample_weight=sample_weights_X
- )
- X_nan_means, X_nan_variances, X_nan_count = _incremental_mean_and_var(
- X_nan,
- old_means,
- old_variances,
- old_weight_sum,
- sample_weight=sample_weights_X_nan,
- )
- assert_allclose(X_nan_means, X_means)
- assert_allclose(X_nan_variances, X_variances)
- assert_allclose(X_nan_count, X_count)
- def test_incremental_variance_update_formulas():
- # Test Youngs and Cramer incremental variance formulas.
- # Doggie data from https://www.mathsisfun.com/data/standard-deviation.html
- A = np.array(
- [
- [600, 470, 170, 430, 300],
- [600, 470, 170, 430, 300],
- [600, 470, 170, 430, 300],
- [600, 470, 170, 430, 300],
- ]
- ).T
- idx = 2
- X1 = A[:idx, :]
- X2 = A[idx:, :]
- old_means = X1.mean(axis=0)
- old_variances = X1.var(axis=0)
- old_sample_count = np.full(X1.shape[1], X1.shape[0], dtype=np.int32)
- final_means, final_variances, final_count = _incremental_mean_and_var(
- X2, old_means, old_variances, old_sample_count
- )
- assert_almost_equal(final_means, A.mean(axis=0), 6)
- assert_almost_equal(final_variances, A.var(axis=0), 6)
- assert_almost_equal(final_count, A.shape[0])
- def test_incremental_mean_and_variance_ignore_nan():
- old_means = np.array([535.0, 535.0, 535.0, 535.0])
- old_variances = np.array([4225.0, 4225.0, 4225.0, 4225.0])
- old_sample_count = np.array([2, 2, 2, 2], dtype=np.int32)
- X = np.array([[170, 170, 170, 170], [430, 430, 430, 430], [300, 300, 300, 300]])
- X_nan = np.array(
- [
- [170, np.nan, 170, 170],
- [np.nan, 170, 430, 430],
- [430, 430, np.nan, 300],
- [300, 300, 300, np.nan],
- ]
- )
- X_means, X_variances, X_count = _incremental_mean_and_var(
- X, old_means, old_variances, old_sample_count
- )
- X_nan_means, X_nan_variances, X_nan_count = _incremental_mean_and_var(
- X_nan, old_means, old_variances, old_sample_count
- )
- assert_allclose(X_nan_means, X_means)
- assert_allclose(X_nan_variances, X_variances)
- assert_allclose(X_nan_count, X_count)
- @skip_if_32bit
- def test_incremental_variance_numerical_stability():
- # Test Youngs and Cramer incremental variance formulas.
- def np_var(A):
- return A.var(axis=0)
- # Naive one pass variance computation - not numerically stable
- # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
- def one_pass_var(X):
- n = X.shape[0]
- exp_x2 = (X**2).sum(axis=0) / n
- expx_2 = (X.sum(axis=0) / n) ** 2
- return exp_x2 - expx_2
- # Two-pass algorithm, stable.
- # We use it as a benchmark. It is not an online algorithm
- # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm
- def two_pass_var(X):
- mean = X.mean(axis=0)
- Y = X.copy()
- return np.mean((Y - mean) ** 2, axis=0)
- # Naive online implementation
- # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm
- # This works only for chunks for size 1
- def naive_mean_variance_update(x, last_mean, last_variance, last_sample_count):
- updated_sample_count = last_sample_count + 1
- samples_ratio = last_sample_count / float(updated_sample_count)
- updated_mean = x / updated_sample_count + last_mean * samples_ratio
- updated_variance = (
- last_variance * samples_ratio
- + (x - last_mean) * (x - updated_mean) / updated_sample_count
- )
- return updated_mean, updated_variance, updated_sample_count
- # We want to show a case when one_pass_var has error > 1e-3 while
- # _batch_mean_variance_update has less.
- tol = 200
- n_features = 2
- n_samples = 10000
- x1 = np.array(1e8, dtype=np.float64)
- x2 = np.log(1e-5, dtype=np.float64)
- A0 = np.full((n_samples // 2, n_features), x1, dtype=np.float64)
- A1 = np.full((n_samples // 2, n_features), x2, dtype=np.float64)
- A = np.vstack((A0, A1))
- # Naive one pass var: >tol (=1063)
- assert np.abs(np_var(A) - one_pass_var(A)).max() > tol
- # Starting point for online algorithms: after A0
- # Naive implementation: >tol (436)
- mean, var, n = A0[0, :], np.zeros(n_features), n_samples // 2
- for i in range(A1.shape[0]):
- mean, var, n = naive_mean_variance_update(A1[i, :], mean, var, n)
- assert n == A.shape[0]
- # the mean is also slightly unstable
- assert np.abs(A.mean(axis=0) - mean).max() > 1e-6
- assert np.abs(np_var(A) - var).max() > tol
- # Robust implementation: <tol (177)
- mean, var = A0[0, :], np.zeros(n_features)
- n = np.full(n_features, n_samples // 2, dtype=np.int32)
- for i in range(A1.shape[0]):
- mean, var, n = _incremental_mean_and_var(
- A1[i, :].reshape((1, A1.shape[1])), mean, var, n
- )
- assert_array_equal(n, A.shape[0])
- assert_array_almost_equal(A.mean(axis=0), mean)
- assert tol > np.abs(np_var(A) - var).max()
- def test_incremental_variance_ddof():
- # Test that degrees of freedom parameter for calculations are correct.
- rng = np.random.RandomState(1999)
- X = rng.randn(50, 10)
- n_samples, n_features = X.shape
- for batch_size in [11, 20, 37]:
- steps = np.arange(0, X.shape[0], batch_size)
- if steps[-1] != X.shape[0]:
- steps = np.hstack([steps, n_samples])
- for i, j in zip(steps[:-1], steps[1:]):
- batch = X[i:j, :]
- if i == 0:
- incremental_means = batch.mean(axis=0)
- incremental_variances = batch.var(axis=0)
- # Assign this twice so that the test logic is consistent
- incremental_count = batch.shape[0]
- sample_count = np.full(batch.shape[1], batch.shape[0], dtype=np.int32)
- else:
- result = _incremental_mean_and_var(
- batch, incremental_means, incremental_variances, sample_count
- )
- (incremental_means, incremental_variances, incremental_count) = result
- sample_count += batch.shape[0]
- calculated_means = np.mean(X[:j], axis=0)
- calculated_variances = np.var(X[:j], axis=0)
- assert_almost_equal(incremental_means, calculated_means, 6)
- assert_almost_equal(incremental_variances, calculated_variances, 6)
- assert_array_equal(incremental_count, sample_count)
- def test_vector_sign_flip():
- # Testing that sign flip is working & largest value has positive sign
- data = np.random.RandomState(36).randn(5, 5)
- max_abs_rows = np.argmax(np.abs(data), axis=1)
- data_flipped = _deterministic_vector_sign_flip(data)
- max_rows = np.argmax(data_flipped, axis=1)
- assert_array_equal(max_abs_rows, max_rows)
- signs = np.sign(data[range(data.shape[0]), max_abs_rows])
- assert_array_equal(data, data_flipped * signs[:, np.newaxis])
- def test_softmax():
- rng = np.random.RandomState(0)
- X = rng.randn(3, 5)
- exp_X = np.exp(X)
- sum_exp_X = np.sum(exp_X, axis=1).reshape((-1, 1))
- assert_array_almost_equal(softmax(X), exp_X / sum_exp_X)
- def test_stable_cumsum():
- assert_array_equal(stable_cumsum([1, 2, 3]), np.cumsum([1, 2, 3]))
- r = np.random.RandomState(0).rand(100000)
- with pytest.warns(RuntimeWarning):
- stable_cumsum(r, rtol=0, atol=0)
- # test axis parameter
- A = np.random.RandomState(36).randint(1000, size=(5, 5, 5))
- assert_array_equal(stable_cumsum(A, axis=0), np.cumsum(A, axis=0))
- assert_array_equal(stable_cumsum(A, axis=1), np.cumsum(A, axis=1))
- assert_array_equal(stable_cumsum(A, axis=2), np.cumsum(A, axis=2))
- @pytest.mark.parametrize(
- "A_array_constr", [np.array, sparse.csr_matrix], ids=["dense", "sparse"]
- )
- @pytest.mark.parametrize(
- "B_array_constr", [np.array, sparse.csr_matrix], ids=["dense", "sparse"]
- )
- def test_safe_sparse_dot_2d(A_array_constr, B_array_constr):
- rng = np.random.RandomState(0)
- A = rng.random_sample((30, 10))
- B = rng.random_sample((10, 20))
- expected = np.dot(A, B)
- A = A_array_constr(A)
- B = B_array_constr(B)
- actual = safe_sparse_dot(A, B, dense_output=True)
- assert_allclose(actual, expected)
- def test_safe_sparse_dot_nd():
- rng = np.random.RandomState(0)
- # dense ND / sparse
- A = rng.random_sample((2, 3, 4, 5, 6))
- B = rng.random_sample((6, 7))
- expected = np.dot(A, B)
- B = sparse.csr_matrix(B)
- actual = safe_sparse_dot(A, B)
- assert_allclose(actual, expected)
- # sparse / dense ND
- A = rng.random_sample((2, 3))
- B = rng.random_sample((4, 5, 3, 6))
- expected = np.dot(A, B)
- A = sparse.csr_matrix(A)
- actual = safe_sparse_dot(A, B)
- assert_allclose(actual, expected)
- @pytest.mark.parametrize(
- "A_array_constr", [np.array, sparse.csr_matrix], ids=["dense", "sparse"]
- )
- def test_safe_sparse_dot_2d_1d(A_array_constr):
- rng = np.random.RandomState(0)
- B = rng.random_sample((10))
- # 2D @ 1D
- A = rng.random_sample((30, 10))
- expected = np.dot(A, B)
- A = A_array_constr(A)
- actual = safe_sparse_dot(A, B)
- assert_allclose(actual, expected)
- # 1D @ 2D
- A = rng.random_sample((10, 30))
- expected = np.dot(B, A)
- A = A_array_constr(A)
- actual = safe_sparse_dot(B, A)
- assert_allclose(actual, expected)
- @pytest.mark.parametrize("dense_output", [True, False])
- def test_safe_sparse_dot_dense_output(dense_output):
- rng = np.random.RandomState(0)
- A = sparse.random(30, 10, density=0.1, random_state=rng)
- B = sparse.random(10, 20, density=0.1, random_state=rng)
- expected = A.dot(B)
- actual = safe_sparse_dot(A, B, dense_output=dense_output)
- assert sparse.issparse(actual) == (not dense_output)
- if dense_output:
- expected = expected.toarray()
- assert_allclose_dense_sparse(actual, expected)
|