| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804 |
- import re
- import numpy as np
- import pytest
- import scipy.sparse as sp
- from joblib import cpu_count
- from sklearn import datasets
- from sklearn.base import ClassifierMixin, clone
- from sklearn.datasets import (
- load_linnerud,
- make_classification,
- make_multilabel_classification,
- make_regression,
- )
- from sklearn.dummy import DummyClassifier, DummyRegressor
- from sklearn.ensemble import (
- GradientBoostingRegressor,
- RandomForestClassifier,
- StackingRegressor,
- )
- from sklearn.exceptions import NotFittedError
- from sklearn.impute import SimpleImputer
- from sklearn.linear_model import (
- Lasso,
- LinearRegression,
- LogisticRegression,
- OrthogonalMatchingPursuit,
- PassiveAggressiveClassifier,
- Ridge,
- SGDClassifier,
- SGDRegressor,
- )
- from sklearn.metrics import jaccard_score, mean_squared_error
- from sklearn.model_selection import GridSearchCV, train_test_split
- from sklearn.multiclass import OneVsRestClassifier
- from sklearn.multioutput import (
- ClassifierChain,
- MultiOutputClassifier,
- MultiOutputRegressor,
- RegressorChain,
- )
- from sklearn.pipeline import make_pipeline
- from sklearn.svm import LinearSVC
- from sklearn.tree import DecisionTreeClassifier
- from sklearn.utils import shuffle
- from sklearn.utils._testing import (
- assert_almost_equal,
- assert_array_almost_equal,
- assert_array_equal,
- )
- def test_multi_target_regression():
- X, y = datasets.make_regression(n_targets=3, random_state=0)
- X_train, y_train = X[:50], y[:50]
- X_test, y_test = X[50:], y[50:]
- references = np.zeros_like(y_test)
- for n in range(3):
- rgr = GradientBoostingRegressor(random_state=0)
- rgr.fit(X_train, y_train[:, n])
- references[:, n] = rgr.predict(X_test)
- rgr = MultiOutputRegressor(GradientBoostingRegressor(random_state=0))
- rgr.fit(X_train, y_train)
- y_pred = rgr.predict(X_test)
- assert_almost_equal(references, y_pred)
- def test_multi_target_regression_partial_fit():
- X, y = datasets.make_regression(n_targets=3, random_state=0)
- X_train, y_train = X[:50], y[:50]
- X_test, y_test = X[50:], y[50:]
- references = np.zeros_like(y_test)
- half_index = 25
- for n in range(3):
- sgr = SGDRegressor(random_state=0, max_iter=5)
- sgr.partial_fit(X_train[:half_index], y_train[:half_index, n])
- sgr.partial_fit(X_train[half_index:], y_train[half_index:, n])
- references[:, n] = sgr.predict(X_test)
- sgr = MultiOutputRegressor(SGDRegressor(random_state=0, max_iter=5))
- sgr.partial_fit(X_train[:half_index], y_train[:half_index])
- sgr.partial_fit(X_train[half_index:], y_train[half_index:])
- y_pred = sgr.predict(X_test)
- assert_almost_equal(references, y_pred)
- assert not hasattr(MultiOutputRegressor(Lasso), "partial_fit")
- def test_multi_target_regression_one_target():
- # Test multi target regression raises
- X, y = datasets.make_regression(n_targets=1, random_state=0)
- rgr = MultiOutputRegressor(GradientBoostingRegressor(random_state=0))
- msg = "at least two dimensions"
- with pytest.raises(ValueError, match=msg):
- rgr.fit(X, y)
- def test_multi_target_sparse_regression():
- X, y = datasets.make_regression(n_targets=3, random_state=0)
- X_train, y_train = X[:50], y[:50]
- X_test = X[50:]
- for sparse in [
- sp.csr_matrix,
- sp.csc_matrix,
- sp.coo_matrix,
- sp.dok_matrix,
- sp.lil_matrix,
- ]:
- rgr = MultiOutputRegressor(Lasso(random_state=0))
- rgr_sparse = MultiOutputRegressor(Lasso(random_state=0))
- rgr.fit(X_train, y_train)
- rgr_sparse.fit(sparse(X_train), y_train)
- assert_almost_equal(rgr.predict(X_test), rgr_sparse.predict(sparse(X_test)))
- def test_multi_target_sample_weights_api():
- X = [[1, 2, 3], [4, 5, 6]]
- y = [[3.141, 2.718], [2.718, 3.141]]
- w = [0.8, 0.6]
- rgr = MultiOutputRegressor(OrthogonalMatchingPursuit())
- msg = "does not support sample weights"
- with pytest.raises(ValueError, match=msg):
- rgr.fit(X, y, w)
- # no exception should be raised if the base estimator supports weights
- rgr = MultiOutputRegressor(GradientBoostingRegressor(random_state=0))
- rgr.fit(X, y, w)
- def test_multi_target_sample_weight_partial_fit():
- # weighted regressor
- X = [[1, 2, 3], [4, 5, 6]]
- y = [[3.141, 2.718], [2.718, 3.141]]
- w = [2.0, 1.0]
- rgr_w = MultiOutputRegressor(SGDRegressor(random_state=0, max_iter=5))
- rgr_w.partial_fit(X, y, w)
- # weighted with different weights
- w = [2.0, 2.0]
- rgr = MultiOutputRegressor(SGDRegressor(random_state=0, max_iter=5))
- rgr.partial_fit(X, y, w)
- assert rgr.predict(X)[0][0] != rgr_w.predict(X)[0][0]
- def test_multi_target_sample_weights():
- # weighted regressor
- Xw = [[1, 2, 3], [4, 5, 6]]
- yw = [[3.141, 2.718], [2.718, 3.141]]
- w = [2.0, 1.0]
- rgr_w = MultiOutputRegressor(GradientBoostingRegressor(random_state=0))
- rgr_w.fit(Xw, yw, w)
- # unweighted, but with repeated samples
- X = [[1, 2, 3], [1, 2, 3], [4, 5, 6]]
- y = [[3.141, 2.718], [3.141, 2.718], [2.718, 3.141]]
- rgr = MultiOutputRegressor(GradientBoostingRegressor(random_state=0))
- rgr.fit(X, y)
- X_test = [[1.5, 2.5, 3.5], [3.5, 4.5, 5.5]]
- assert_almost_equal(rgr.predict(X_test), rgr_w.predict(X_test))
- # Import the data
- iris = datasets.load_iris()
- # create a multiple targets by randomized shuffling and concatenating y.
- X = iris.data
- y1 = iris.target
- y2 = shuffle(y1, random_state=1)
- y3 = shuffle(y1, random_state=2)
- y = np.column_stack((y1, y2, y3))
- n_samples, n_features = X.shape
- n_outputs = y.shape[1]
- n_classes = len(np.unique(y1))
- classes = list(map(np.unique, (y1, y2, y3)))
- def test_multi_output_classification_partial_fit_parallelism():
- sgd_linear_clf = SGDClassifier(loss="log_loss", random_state=1, max_iter=5)
- mor = MultiOutputClassifier(sgd_linear_clf, n_jobs=4)
- mor.partial_fit(X, y, classes)
- est1 = mor.estimators_[0]
- mor.partial_fit(X, y)
- est2 = mor.estimators_[0]
- if cpu_count() > 1:
- # parallelism requires this to be the case for a sane implementation
- assert est1 is not est2
- # check multioutput has predict_proba
- def test_hasattr_multi_output_predict_proba():
- # default SGDClassifier has loss='hinge'
- # which does not expose a predict_proba method
- sgd_linear_clf = SGDClassifier(random_state=1, max_iter=5)
- multi_target_linear = MultiOutputClassifier(sgd_linear_clf)
- multi_target_linear.fit(X, y)
- assert not hasattr(multi_target_linear, "predict_proba")
- # case where predict_proba attribute exists
- sgd_linear_clf = SGDClassifier(loss="log_loss", random_state=1, max_iter=5)
- multi_target_linear = MultiOutputClassifier(sgd_linear_clf)
- multi_target_linear.fit(X, y)
- assert hasattr(multi_target_linear, "predict_proba")
- # check predict_proba passes
- def test_multi_output_predict_proba():
- sgd_linear_clf = SGDClassifier(random_state=1, max_iter=5)
- param = {"loss": ("hinge", "log_loss", "modified_huber")}
- # inner function for custom scoring
- def custom_scorer(estimator, X, y):
- if hasattr(estimator, "predict_proba"):
- return 1.0
- else:
- return 0.0
- grid_clf = GridSearchCV(
- sgd_linear_clf,
- param_grid=param,
- scoring=custom_scorer,
- cv=3,
- error_score="raise",
- )
- multi_target_linear = MultiOutputClassifier(grid_clf)
- multi_target_linear.fit(X, y)
- multi_target_linear.predict_proba(X)
- # SGDClassifier defaults to loss='hinge' which is not a probabilistic
- # loss function; therefore it does not expose a predict_proba method
- sgd_linear_clf = SGDClassifier(random_state=1, max_iter=5)
- multi_target_linear = MultiOutputClassifier(sgd_linear_clf)
- multi_target_linear.fit(X, y)
- err_msg = "probability estimates are not available for loss='hinge'"
- with pytest.raises(AttributeError, match=err_msg):
- multi_target_linear.predict_proba(X)
- def test_multi_output_classification_partial_fit():
- # test if multi_target initializes correctly with base estimator and fit
- # assert predictions work as expected for predict
- sgd_linear_clf = SGDClassifier(loss="log_loss", random_state=1, max_iter=5)
- multi_target_linear = MultiOutputClassifier(sgd_linear_clf)
- # train the multi_target_linear and also get the predictions.
- half_index = X.shape[0] // 2
- multi_target_linear.partial_fit(X[:half_index], y[:half_index], classes=classes)
- first_predictions = multi_target_linear.predict(X)
- assert (n_samples, n_outputs) == first_predictions.shape
- multi_target_linear.partial_fit(X[half_index:], y[half_index:])
- second_predictions = multi_target_linear.predict(X)
- assert (n_samples, n_outputs) == second_predictions.shape
- # train the linear classification with each column and assert that
- # predictions are equal after first partial_fit and second partial_fit
- for i in range(3):
- # create a clone with the same state
- sgd_linear_clf = clone(sgd_linear_clf)
- sgd_linear_clf.partial_fit(
- X[:half_index], y[:half_index, i], classes=classes[i]
- )
- assert_array_equal(sgd_linear_clf.predict(X), first_predictions[:, i])
- sgd_linear_clf.partial_fit(X[half_index:], y[half_index:, i])
- assert_array_equal(sgd_linear_clf.predict(X), second_predictions[:, i])
- def test_multi_output_classification_partial_fit_no_first_classes_exception():
- sgd_linear_clf = SGDClassifier(loss="log_loss", random_state=1, max_iter=5)
- multi_target_linear = MultiOutputClassifier(sgd_linear_clf)
- msg = "classes must be passed on the first call to partial_fit."
- with pytest.raises(ValueError, match=msg):
- multi_target_linear.partial_fit(X, y)
- def test_multi_output_classification():
- # test if multi_target initializes correctly with base estimator and fit
- # assert predictions work as expected for predict, prodict_proba and score
- forest = RandomForestClassifier(n_estimators=10, random_state=1)
- multi_target_forest = MultiOutputClassifier(forest)
- # train the multi_target_forest and also get the predictions.
- multi_target_forest.fit(X, y)
- predictions = multi_target_forest.predict(X)
- assert (n_samples, n_outputs) == predictions.shape
- predict_proba = multi_target_forest.predict_proba(X)
- assert len(predict_proba) == n_outputs
- for class_probabilities in predict_proba:
- assert (n_samples, n_classes) == class_probabilities.shape
- assert_array_equal(np.argmax(np.dstack(predict_proba), axis=1), predictions)
- # train the forest with each column and assert that predictions are equal
- for i in range(3):
- forest_ = clone(forest) # create a clone with the same state
- forest_.fit(X, y[:, i])
- assert list(forest_.predict(X)) == list(predictions[:, i])
- assert_array_equal(list(forest_.predict_proba(X)), list(predict_proba[i]))
- def test_multiclass_multioutput_estimator():
- # test to check meta of meta estimators
- svc = LinearSVC(dual="auto", random_state=0)
- multi_class_svc = OneVsRestClassifier(svc)
- multi_target_svc = MultiOutputClassifier(multi_class_svc)
- multi_target_svc.fit(X, y)
- predictions = multi_target_svc.predict(X)
- assert (n_samples, n_outputs) == predictions.shape
- # train the forest with each column and assert that predictions are equal
- for i in range(3):
- multi_class_svc_ = clone(multi_class_svc) # create a clone
- multi_class_svc_.fit(X, y[:, i])
- assert list(multi_class_svc_.predict(X)) == list(predictions[:, i])
- def test_multiclass_multioutput_estimator_predict_proba():
- seed = 542
- # make test deterministic
- rng = np.random.RandomState(seed)
- # random features
- X = rng.normal(size=(5, 5))
- # random labels
- y1 = np.array(["b", "a", "a", "b", "a"]).reshape(5, 1) # 2 classes
- y2 = np.array(["d", "e", "f", "e", "d"]).reshape(5, 1) # 3 classes
- Y = np.concatenate([y1, y2], axis=1)
- clf = MultiOutputClassifier(
- LogisticRegression(solver="liblinear", random_state=seed)
- )
- clf.fit(X, Y)
- y_result = clf.predict_proba(X)
- y_actual = [
- np.array(
- [
- [0.23481764, 0.76518236],
- [0.67196072, 0.32803928],
- [0.54681448, 0.45318552],
- [0.34883923, 0.65116077],
- [0.73687069, 0.26312931],
- ]
- ),
- np.array(
- [
- [0.5171785, 0.23878628, 0.24403522],
- [0.22141451, 0.64102704, 0.13755846],
- [0.16751315, 0.18256843, 0.64991843],
- [0.27357372, 0.55201592, 0.17441036],
- [0.65745193, 0.26062899, 0.08191907],
- ]
- ),
- ]
- for i in range(len(y_actual)):
- assert_almost_equal(y_result[i], y_actual[i])
- def test_multi_output_classification_sample_weights():
- # weighted classifier
- Xw = [[1, 2, 3], [4, 5, 6]]
- yw = [[3, 2], [2, 3]]
- w = np.asarray([2.0, 1.0])
- forest = RandomForestClassifier(n_estimators=10, random_state=1)
- clf_w = MultiOutputClassifier(forest)
- clf_w.fit(Xw, yw, w)
- # unweighted, but with repeated samples
- X = [[1, 2, 3], [1, 2, 3], [4, 5, 6]]
- y = [[3, 2], [3, 2], [2, 3]]
- forest = RandomForestClassifier(n_estimators=10, random_state=1)
- clf = MultiOutputClassifier(forest)
- clf.fit(X, y)
- X_test = [[1.5, 2.5, 3.5], [3.5, 4.5, 5.5]]
- assert_almost_equal(clf.predict(X_test), clf_w.predict(X_test))
- def test_multi_output_classification_partial_fit_sample_weights():
- # weighted classifier
- Xw = [[1, 2, 3], [4, 5, 6], [1.5, 2.5, 3.5]]
- yw = [[3, 2], [2, 3], [3, 2]]
- w = np.asarray([2.0, 1.0, 1.0])
- sgd_linear_clf = SGDClassifier(random_state=1, max_iter=20)
- clf_w = MultiOutputClassifier(sgd_linear_clf)
- clf_w.fit(Xw, yw, w)
- # unweighted, but with repeated samples
- X = [[1, 2, 3], [1, 2, 3], [4, 5, 6], [1.5, 2.5, 3.5]]
- y = [[3, 2], [3, 2], [2, 3], [3, 2]]
- sgd_linear_clf = SGDClassifier(random_state=1, max_iter=20)
- clf = MultiOutputClassifier(sgd_linear_clf)
- clf.fit(X, y)
- X_test = [[1.5, 2.5, 3.5]]
- assert_array_almost_equal(clf.predict(X_test), clf_w.predict(X_test))
- def test_multi_output_exceptions():
- # NotFittedError when fit is not done but score, predict and
- # and predict_proba are called
- moc = MultiOutputClassifier(LinearSVC(dual="auto", random_state=0))
- with pytest.raises(NotFittedError):
- moc.score(X, y)
- # ValueError when number of outputs is different
- # for fit and score
- y_new = np.column_stack((y1, y2))
- moc.fit(X, y)
- with pytest.raises(ValueError):
- moc.score(X, y_new)
- # ValueError when y is continuous
- msg = "Unknown label type"
- with pytest.raises(ValueError, match=msg):
- moc.fit(X, X[:, 1])
- @pytest.mark.parametrize("response_method", ["predict_proba", "predict"])
- def test_multi_output_not_fitted_error(response_method):
- """Check that we raise the proper error when the estimator is not fitted"""
- moc = MultiOutputClassifier(LogisticRegression())
- with pytest.raises(NotFittedError):
- getattr(moc, response_method)(X)
- def test_multi_output_delegate_predict_proba():
- """Check the behavior for the delegation of predict_proba to the underlying
- estimator"""
- # A base estimator with `predict_proba`should expose the method even before fit
- moc = MultiOutputClassifier(LogisticRegression())
- assert hasattr(moc, "predict_proba")
- moc.fit(X, y)
- assert hasattr(moc, "predict_proba")
- # A base estimator without `predict_proba` should raise an AttributeError
- moc = MultiOutputClassifier(LinearSVC(dual="auto"))
- assert not hasattr(moc, "predict_proba")
- msg = "'LinearSVC' object has no attribute 'predict_proba'"
- with pytest.raises(AttributeError, match=msg):
- moc.predict_proba(X)
- moc.fit(X, y)
- assert not hasattr(moc, "predict_proba")
- with pytest.raises(AttributeError, match=msg):
- moc.predict_proba(X)
- def generate_multilabel_dataset_with_correlations():
- # Generate a multilabel data set from a multiclass dataset as a way of
- # by representing the integer number of the original class using a binary
- # encoding.
- X, y = make_classification(
- n_samples=1000, n_features=100, n_classes=16, n_informative=10, random_state=0
- )
- Y_multi = np.array([[int(yyy) for yyy in format(yy, "#06b")[2:]] for yy in y])
- return X, Y_multi
- def test_classifier_chain_fit_and_predict_with_linear_svc():
- # Fit classifier chain and verify predict performance using LinearSVC
- X, Y = generate_multilabel_dataset_with_correlations()
- classifier_chain = ClassifierChain(LinearSVC(dual="auto"))
- classifier_chain.fit(X, Y)
- Y_pred = classifier_chain.predict(X)
- assert Y_pred.shape == Y.shape
- Y_decision = classifier_chain.decision_function(X)
- Y_binary = Y_decision >= 0
- assert_array_equal(Y_binary, Y_pred)
- assert not hasattr(classifier_chain, "predict_proba")
- def test_classifier_chain_fit_and_predict_with_sparse_data():
- # Fit classifier chain with sparse data
- X, Y = generate_multilabel_dataset_with_correlations()
- X_sparse = sp.csr_matrix(X)
- classifier_chain = ClassifierChain(LogisticRegression())
- classifier_chain.fit(X_sparse, Y)
- Y_pred_sparse = classifier_chain.predict(X_sparse)
- classifier_chain = ClassifierChain(LogisticRegression())
- classifier_chain.fit(X, Y)
- Y_pred_dense = classifier_chain.predict(X)
- assert_array_equal(Y_pred_sparse, Y_pred_dense)
- def test_classifier_chain_vs_independent_models():
- # Verify that an ensemble of classifier chains (each of length
- # N) can achieve a higher Jaccard similarity score than N independent
- # models
- X, Y = generate_multilabel_dataset_with_correlations()
- X_train = X[:600, :]
- X_test = X[600:, :]
- Y_train = Y[:600, :]
- Y_test = Y[600:, :]
- ovr = OneVsRestClassifier(LogisticRegression())
- ovr.fit(X_train, Y_train)
- Y_pred_ovr = ovr.predict(X_test)
- chain = ClassifierChain(LogisticRegression())
- chain.fit(X_train, Y_train)
- Y_pred_chain = chain.predict(X_test)
- assert jaccard_score(Y_test, Y_pred_chain, average="samples") > jaccard_score(
- Y_test, Y_pred_ovr, average="samples"
- )
- def test_base_chain_fit_and_predict():
- # Fit base chain and verify predict performance
- X, Y = generate_multilabel_dataset_with_correlations()
- chains = [RegressorChain(Ridge()), ClassifierChain(LogisticRegression())]
- for chain in chains:
- chain.fit(X, Y)
- Y_pred = chain.predict(X)
- assert Y_pred.shape == Y.shape
- assert [c.coef_.size for c in chain.estimators_] == list(
- range(X.shape[1], X.shape[1] + Y.shape[1])
- )
- Y_prob = chains[1].predict_proba(X)
- Y_binary = Y_prob >= 0.5
- assert_array_equal(Y_binary, Y_pred)
- assert isinstance(chains[1], ClassifierMixin)
- def test_base_chain_fit_and_predict_with_sparse_data_and_cv():
- # Fit base chain with sparse data cross_val_predict
- X, Y = generate_multilabel_dataset_with_correlations()
- X_sparse = sp.csr_matrix(X)
- base_chains = [
- ClassifierChain(LogisticRegression(), cv=3),
- RegressorChain(Ridge(), cv=3),
- ]
- for chain in base_chains:
- chain.fit(X_sparse, Y)
- Y_pred = chain.predict(X_sparse)
- assert Y_pred.shape == Y.shape
- def test_base_chain_random_order():
- # Fit base chain with random order
- X, Y = generate_multilabel_dataset_with_correlations()
- for chain in [ClassifierChain(LogisticRegression()), RegressorChain(Ridge())]:
- chain_random = clone(chain).set_params(order="random", random_state=42)
- chain_random.fit(X, Y)
- chain_fixed = clone(chain).set_params(order=chain_random.order_)
- chain_fixed.fit(X, Y)
- assert_array_equal(chain_fixed.order_, chain_random.order_)
- assert list(chain_random.order) != list(range(4))
- assert len(chain_random.order_) == 4
- assert len(set(chain_random.order_)) == 4
- # Randomly ordered chain should behave identically to a fixed order
- # chain with the same order.
- for est1, est2 in zip(chain_random.estimators_, chain_fixed.estimators_):
- assert_array_almost_equal(est1.coef_, est2.coef_)
- def test_base_chain_crossval_fit_and_predict():
- # Fit chain with cross_val_predict and verify predict
- # performance
- X, Y = generate_multilabel_dataset_with_correlations()
- for chain in [ClassifierChain(LogisticRegression()), RegressorChain(Ridge())]:
- chain.fit(X, Y)
- chain_cv = clone(chain).set_params(cv=3)
- chain_cv.fit(X, Y)
- Y_pred_cv = chain_cv.predict(X)
- Y_pred = chain.predict(X)
- assert Y_pred_cv.shape == Y_pred.shape
- assert not np.all(Y_pred == Y_pred_cv)
- if isinstance(chain, ClassifierChain):
- assert jaccard_score(Y, Y_pred_cv, average="samples") > 0.4
- else:
- assert mean_squared_error(Y, Y_pred_cv) < 0.25
- @pytest.mark.parametrize(
- "estimator",
- [
- RandomForestClassifier(n_estimators=2),
- MultiOutputClassifier(RandomForestClassifier(n_estimators=2)),
- ClassifierChain(RandomForestClassifier(n_estimators=2)),
- ],
- )
- def test_multi_output_classes_(estimator):
- # Tests classes_ attribute of multioutput classifiers
- # RandomForestClassifier supports multioutput out-of-the-box
- estimator.fit(X, y)
- assert isinstance(estimator.classes_, list)
- assert len(estimator.classes_) == n_outputs
- for estimator_classes, expected_classes in zip(classes, estimator.classes_):
- assert_array_equal(estimator_classes, expected_classes)
- class DummyRegressorWithFitParams(DummyRegressor):
- def fit(self, X, y, sample_weight=None, **fit_params):
- self._fit_params = fit_params
- return super().fit(X, y, sample_weight)
- class DummyClassifierWithFitParams(DummyClassifier):
- def fit(self, X, y, sample_weight=None, **fit_params):
- self._fit_params = fit_params
- return super().fit(X, y, sample_weight)
- @pytest.mark.filterwarnings("ignore:`n_features_in_` is deprecated")
- @pytest.mark.parametrize(
- "estimator, dataset",
- [
- (
- MultiOutputClassifier(DummyClassifierWithFitParams(strategy="prior")),
- datasets.make_multilabel_classification(),
- ),
- (
- MultiOutputRegressor(DummyRegressorWithFitParams()),
- datasets.make_regression(n_targets=3, random_state=0),
- ),
- ],
- )
- def test_multioutput_estimator_with_fit_params(estimator, dataset):
- X, y = dataset
- some_param = np.zeros_like(X)
- estimator.fit(X, y, some_param=some_param)
- for dummy_estimator in estimator.estimators_:
- assert "some_param" in dummy_estimator._fit_params
- def test_regressor_chain_w_fit_params():
- # Make sure fit_params are properly propagated to the sub-estimators
- rng = np.random.RandomState(0)
- X, y = datasets.make_regression(n_targets=3, random_state=0)
- weight = rng.rand(y.shape[0])
- class MySGD(SGDRegressor):
- def fit(self, X, y, **fit_params):
- self.sample_weight_ = fit_params["sample_weight"]
- super().fit(X, y, **fit_params)
- model = RegressorChain(MySGD())
- # Fitting with params
- fit_param = {"sample_weight": weight}
- model.fit(X, y, **fit_param)
- for est in model.estimators_:
- assert est.sample_weight_ is weight
- @pytest.mark.parametrize(
- "MultiOutputEstimator, Estimator",
- [(MultiOutputClassifier, LogisticRegression), (MultiOutputRegressor, Ridge)],
- )
- # FIXME: we should move this test in `estimator_checks` once we are able
- # to construct meta-estimator instances
- def test_support_missing_values(MultiOutputEstimator, Estimator):
- # smoke test to check that pipeline MultioutputEstimators are letting
- # the validation of missing values to
- # the underlying pipeline, regressor or classifier
- rng = np.random.RandomState(42)
- X, y = rng.randn(50, 2), rng.binomial(1, 0.5, (50, 3))
- mask = rng.choice([1, 0], X.shape, p=[0.01, 0.99]).astype(bool)
- X[mask] = np.nan
- pipe = make_pipeline(SimpleImputer(), Estimator())
- MultiOutputEstimator(pipe).fit(X, y).score(X, y)
- @pytest.mark.parametrize("order_type", [list, np.array, tuple])
- def test_classifier_chain_tuple_order(order_type):
- X = [[1, 2, 3], [4, 5, 6], [1.5, 2.5, 3.5]]
- y = [[3, 2], [2, 3], [3, 2]]
- order = order_type([1, 0])
- chain = ClassifierChain(RandomForestClassifier(), order=order)
- chain.fit(X, y)
- X_test = [[1.5, 2.5, 3.5]]
- y_test = [[3, 2]]
- assert_array_almost_equal(chain.predict(X_test), y_test)
- def test_classifier_chain_tuple_invalid_order():
- X = [[1, 2, 3], [4, 5, 6], [1.5, 2.5, 3.5]]
- y = [[3, 2], [2, 3], [3, 2]]
- order = tuple([1, 2])
- chain = ClassifierChain(RandomForestClassifier(), order=order)
- with pytest.raises(ValueError, match="invalid order"):
- chain.fit(X, y)
- def test_classifier_chain_verbose(capsys):
- X, y = make_multilabel_classification(
- n_samples=100, n_features=5, n_classes=3, n_labels=3, random_state=0
- )
- X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
- pattern = (
- r"\[Chain\].*\(1 of 3\) Processing order 0, total=.*\n"
- r"\[Chain\].*\(2 of 3\) Processing order 1, total=.*\n"
- r"\[Chain\].*\(3 of 3\) Processing order 2, total=.*\n$"
- )
- classifier = ClassifierChain(
- DecisionTreeClassifier(),
- order=[0, 1, 2],
- random_state=0,
- verbose=True,
- )
- classifier.fit(X_train, y_train)
- assert re.match(pattern, capsys.readouterr()[0])
- def test_regressor_chain_verbose(capsys):
- X, y = make_regression(n_samples=125, n_targets=3, random_state=0)
- X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
- pattern = (
- r"\[Chain\].*\(1 of 3\) Processing order 1, total=.*\n"
- r"\[Chain\].*\(2 of 3\) Processing order 0, total=.*\n"
- r"\[Chain\].*\(3 of 3\) Processing order 2, total=.*\n$"
- )
- regressor = RegressorChain(
- LinearRegression(),
- order=[1, 0, 2],
- random_state=0,
- verbose=True,
- )
- regressor.fit(X_train, y_train)
- assert re.match(pattern, capsys.readouterr()[0])
- def test_multioutputregressor_ducktypes_fitted_estimator():
- """Test that MultiOutputRegressor checks the fitted estimator for
- predict. Non-regression test for #16549."""
- X, y = load_linnerud(return_X_y=True)
- stacker = StackingRegressor(
- estimators=[("sgd", SGDRegressor(random_state=1))],
- final_estimator=Ridge(),
- cv=2,
- )
- reg = MultiOutputRegressor(estimator=stacker).fit(X, y)
- # Does not raise
- reg.predict(X)
- @pytest.mark.parametrize(
- "Cls, method", [(ClassifierChain, "fit"), (MultiOutputClassifier, "partial_fit")]
- )
- def test_fit_params_no_routing(Cls, method):
- """Check that we raise an error when passing metadata not requested by the
- underlying classifier.
- """
- X, y = make_classification(n_samples=50)
- clf = Cls(PassiveAggressiveClassifier())
- with pytest.raises(ValueError, match="is only supported if"):
- getattr(clf, method)(X, y, test=1)
- def test_multioutput_regressor_has_partial_fit():
- # Test that an unfitted MultiOutputRegressor handles available_if for
- # partial_fit correctly
- est = MultiOutputRegressor(LinearRegression())
- msg = "This 'MultiOutputRegressor' has no attribute 'partial_fit'"
- with pytest.raises(AttributeError, match=msg):
- getattr(est, "partial_fit")
|