test_quantile.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. # Authors: David Dale <dale.david@mail.ru>
  2. # Christian Lorentzen <lorentzen.ch@gmail.com>
  3. # License: BSD 3 clause
  4. import numpy as np
  5. import pytest
  6. from pytest import approx
  7. from scipy import sparse
  8. from scipy.optimize import minimize
  9. from sklearn.datasets import make_regression
  10. from sklearn.exceptions import ConvergenceWarning
  11. from sklearn.linear_model import HuberRegressor, QuantileRegressor
  12. from sklearn.metrics import mean_pinball_loss
  13. from sklearn.utils._testing import assert_allclose, skip_if_32bit
  14. from sklearn.utils.fixes import parse_version, sp_version
  15. @pytest.fixture
  16. def X_y_data():
  17. X, y = make_regression(n_samples=10, n_features=1, random_state=0, noise=1)
  18. return X, y
  19. @pytest.fixture
  20. def default_solver():
  21. return "highs" if sp_version >= parse_version("1.6.0") else "interior-point"
  22. @pytest.mark.skipif(
  23. parse_version(sp_version.base_version) >= parse_version("1.11"),
  24. reason="interior-point solver is not available in SciPy 1.11",
  25. )
  26. @pytest.mark.parametrize("solver", ["interior-point", "revised simplex"])
  27. def test_incompatible_solver_for_sparse_input(X_y_data, solver):
  28. X, y = X_y_data
  29. X_sparse = sparse.csc_matrix(X)
  30. err_msg = (
  31. f"Solver {solver} does not support sparse X. Use solver 'highs' for example."
  32. )
  33. with pytest.raises(ValueError, match=err_msg):
  34. QuantileRegressor(solver=solver).fit(X_sparse, y)
  35. @pytest.mark.parametrize("solver", ("highs-ds", "highs-ipm", "highs"))
  36. @pytest.mark.skipif(
  37. sp_version >= parse_version("1.6.0"),
  38. reason="Solvers are available as of scipy 1.6.0",
  39. )
  40. def test_too_new_solver_methods_raise_error(X_y_data, solver):
  41. """Test that highs solver raises for scipy<1.6.0."""
  42. X, y = X_y_data
  43. with pytest.raises(ValueError, match="scipy>=1.6.0"):
  44. QuantileRegressor(solver=solver).fit(X, y)
  45. @pytest.mark.parametrize(
  46. "quantile, alpha, intercept, coef",
  47. [
  48. # for 50% quantile w/o regularization, any slope in [1, 10] is okay
  49. [0.5, 0, 1, None],
  50. # if positive error costs more, the slope is maximal
  51. [0.51, 0, 1, 10],
  52. # if negative error costs more, the slope is minimal
  53. [0.49, 0, 1, 1],
  54. # for a small lasso penalty, the slope is also minimal
  55. [0.5, 0.01, 1, 1],
  56. # for a large lasso penalty, the model predicts the constant median
  57. [0.5, 100, 2, 0],
  58. ],
  59. )
  60. def test_quantile_toy_example(quantile, alpha, intercept, coef, default_solver):
  61. # test how different parameters affect a small intuitive example
  62. X = [[0], [1], [1]]
  63. y = [1, 2, 11]
  64. model = QuantileRegressor(
  65. quantile=quantile, alpha=alpha, solver=default_solver
  66. ).fit(X, y)
  67. assert_allclose(model.intercept_, intercept, atol=1e-2)
  68. if coef is not None:
  69. assert_allclose(model.coef_[0], coef, atol=1e-2)
  70. if alpha < 100:
  71. assert model.coef_[0] >= 1
  72. assert model.coef_[0] <= 10
  73. @pytest.mark.parametrize("fit_intercept", [True, False])
  74. def test_quantile_equals_huber_for_low_epsilon(fit_intercept, default_solver):
  75. X, y = make_regression(n_samples=100, n_features=20, random_state=0, noise=1.0)
  76. alpha = 1e-4
  77. huber = HuberRegressor(
  78. epsilon=1 + 1e-4, alpha=alpha, fit_intercept=fit_intercept
  79. ).fit(X, y)
  80. quant = QuantileRegressor(
  81. alpha=alpha, fit_intercept=fit_intercept, solver=default_solver
  82. ).fit(X, y)
  83. assert_allclose(huber.coef_, quant.coef_, atol=1e-1)
  84. if fit_intercept:
  85. assert huber.intercept_ == approx(quant.intercept_, abs=1e-1)
  86. # check that we still predict fraction
  87. assert np.mean(y < quant.predict(X)) == approx(0.5, abs=1e-1)
  88. @pytest.mark.parametrize("q", [0.5, 0.9, 0.05])
  89. def test_quantile_estimates_calibration(q, default_solver):
  90. # Test that model estimates percentage of points below the prediction
  91. X, y = make_regression(n_samples=1000, n_features=20, random_state=0, noise=1.0)
  92. quant = QuantileRegressor(
  93. quantile=q,
  94. alpha=0,
  95. solver=default_solver,
  96. ).fit(X, y)
  97. assert np.mean(y < quant.predict(X)) == approx(q, abs=1e-2)
  98. def test_quantile_sample_weight(default_solver):
  99. # test that with unequal sample weights we still estimate weighted fraction
  100. n = 1000
  101. X, y = make_regression(n_samples=n, n_features=5, random_state=0, noise=10.0)
  102. weight = np.ones(n)
  103. # when we increase weight of upper observations,
  104. # estimate of quantile should go up
  105. weight[y > y.mean()] = 100
  106. quant = QuantileRegressor(quantile=0.5, alpha=1e-8, solver=default_solver)
  107. quant.fit(X, y, sample_weight=weight)
  108. fraction_below = np.mean(y < quant.predict(X))
  109. assert fraction_below > 0.5
  110. weighted_fraction_below = np.average(y < quant.predict(X), weights=weight)
  111. assert weighted_fraction_below == approx(0.5, abs=3e-2)
  112. @pytest.mark.skipif(
  113. sp_version < parse_version("1.6.0"),
  114. reason="The `highs` solver is available from the 1.6.0 scipy version",
  115. )
  116. @pytest.mark.parametrize("quantile", [0.2, 0.5, 0.8])
  117. def test_asymmetric_error(quantile, default_solver):
  118. """Test quantile regression for asymmetric distributed targets."""
  119. n_samples = 1000
  120. rng = np.random.RandomState(42)
  121. X = np.concatenate(
  122. (
  123. np.abs(rng.randn(n_samples)[:, None]),
  124. -rng.randint(2, size=(n_samples, 1)),
  125. ),
  126. axis=1,
  127. )
  128. intercept = 1.23
  129. coef = np.array([0.5, -2])
  130. # Take care that X @ coef + intercept > 0
  131. assert np.min(X @ coef + intercept) > 0
  132. # For an exponential distribution with rate lambda, e.g. exp(-lambda * x),
  133. # the quantile at level q is:
  134. # quantile(q) = - log(1 - q) / lambda
  135. # scale = 1/lambda = -quantile(q) / log(1 - q)
  136. y = rng.exponential(
  137. scale=-(X @ coef + intercept) / np.log(1 - quantile), size=n_samples
  138. )
  139. model = QuantileRegressor(
  140. quantile=quantile,
  141. alpha=0,
  142. solver=default_solver,
  143. ).fit(X, y)
  144. # This test can be made to pass with any solver but in the interest
  145. # of sparing continuous integration resources, the test is performed
  146. # with the fastest solver only.
  147. assert model.intercept_ == approx(intercept, rel=0.2)
  148. assert_allclose(model.coef_, coef, rtol=0.6)
  149. assert_allclose(np.mean(model.predict(X) > y), quantile, atol=1e-2)
  150. # Now compare to Nelder-Mead optimization with L1 penalty
  151. alpha = 0.01
  152. model.set_params(alpha=alpha).fit(X, y)
  153. model_coef = np.r_[model.intercept_, model.coef_]
  154. def func(coef):
  155. loss = mean_pinball_loss(y, X @ coef[1:] + coef[0], alpha=quantile)
  156. L1 = np.sum(np.abs(coef[1:]))
  157. return loss + alpha * L1
  158. res = minimize(
  159. fun=func,
  160. x0=[1, 0, -1],
  161. method="Nelder-Mead",
  162. tol=1e-12,
  163. options={"maxiter": 2000},
  164. )
  165. assert func(model_coef) == approx(func(res.x))
  166. assert_allclose(model.intercept_, res.x[0])
  167. assert_allclose(model.coef_, res.x[1:])
  168. assert_allclose(np.mean(model.predict(X) > y), quantile, atol=1e-2)
  169. @pytest.mark.parametrize("quantile", [0.2, 0.5, 0.8])
  170. def test_equivariance(quantile, default_solver):
  171. """Test equivariace of quantile regression.
  172. See Koenker (2005) Quantile Regression, Chapter 2.2.3.
  173. """
  174. rng = np.random.RandomState(42)
  175. n_samples, n_features = 100, 5
  176. X, y = make_regression(
  177. n_samples=n_samples,
  178. n_features=n_features,
  179. n_informative=n_features,
  180. noise=0,
  181. random_state=rng,
  182. shuffle=False,
  183. )
  184. # make y asymmetric
  185. y += rng.exponential(scale=100, size=y.shape)
  186. params = dict(alpha=0, solver=default_solver)
  187. model1 = QuantileRegressor(quantile=quantile, **params).fit(X, y)
  188. # coef(q; a*y, X) = a * coef(q; y, X)
  189. a = 2.5
  190. model2 = QuantileRegressor(quantile=quantile, **params).fit(X, a * y)
  191. assert model2.intercept_ == approx(a * model1.intercept_, rel=1e-5)
  192. assert_allclose(model2.coef_, a * model1.coef_, rtol=1e-5)
  193. # coef(1-q; -a*y, X) = -a * coef(q; y, X)
  194. model2 = QuantileRegressor(quantile=1 - quantile, **params).fit(X, -a * y)
  195. assert model2.intercept_ == approx(-a * model1.intercept_, rel=1e-5)
  196. assert_allclose(model2.coef_, -a * model1.coef_, rtol=1e-5)
  197. # coef(q; y + X @ g, X) = coef(q; y, X) + g
  198. g_intercept, g_coef = rng.randn(), rng.randn(n_features)
  199. model2 = QuantileRegressor(quantile=quantile, **params)
  200. model2.fit(X, y + X @ g_coef + g_intercept)
  201. assert model2.intercept_ == approx(model1.intercept_ + g_intercept)
  202. assert_allclose(model2.coef_, model1.coef_ + g_coef, rtol=1e-6)
  203. # coef(q; y, X @ A) = A^-1 @ coef(q; y, X)
  204. A = rng.randn(n_features, n_features)
  205. model2 = QuantileRegressor(quantile=quantile, **params)
  206. model2.fit(X @ A, y)
  207. assert model2.intercept_ == approx(model1.intercept_, rel=1e-5)
  208. assert_allclose(model2.coef_, np.linalg.solve(A, model1.coef_), rtol=1e-5)
  209. @pytest.mark.skipif(
  210. parse_version(sp_version.base_version) >= parse_version("1.11"),
  211. reason="interior-point solver is not available in SciPy 1.11",
  212. )
  213. @pytest.mark.filterwarnings("ignore:`method='interior-point'` is deprecated")
  214. def test_linprog_failure():
  215. """Test that linprog fails."""
  216. X = np.linspace(0, 10, num=10).reshape(-1, 1)
  217. y = np.linspace(0, 10, num=10)
  218. reg = QuantileRegressor(
  219. alpha=0, solver="interior-point", solver_options={"maxiter": 1}
  220. )
  221. msg = "Linear programming for QuantileRegressor did not succeed."
  222. with pytest.warns(ConvergenceWarning, match=msg):
  223. reg.fit(X, y)
  224. @skip_if_32bit
  225. @pytest.mark.skipif(
  226. sp_version <= parse_version("1.6.0"),
  227. reason="Solvers are available as of scipy 1.6.0",
  228. )
  229. @pytest.mark.parametrize(
  230. "sparse_format", [sparse.csc_matrix, sparse.csr_matrix, sparse.coo_matrix]
  231. )
  232. @pytest.mark.parametrize("solver", ["highs", "highs-ds", "highs-ipm"])
  233. @pytest.mark.parametrize("fit_intercept", [True, False])
  234. def test_sparse_input(sparse_format, solver, fit_intercept, default_solver):
  235. """Test that sparse and dense X give same results."""
  236. X, y = make_regression(n_samples=100, n_features=20, random_state=1, noise=1.0)
  237. X_sparse = sparse_format(X)
  238. alpha = 1e-4
  239. quant_dense = QuantileRegressor(
  240. alpha=alpha, fit_intercept=fit_intercept, solver=default_solver
  241. ).fit(X, y)
  242. quant_sparse = QuantileRegressor(
  243. alpha=alpha, fit_intercept=fit_intercept, solver=solver
  244. ).fit(X_sparse, y)
  245. assert_allclose(quant_sparse.coef_, quant_dense.coef_, rtol=1e-2)
  246. if fit_intercept:
  247. assert quant_sparse.intercept_ == approx(quant_dense.intercept_)
  248. # check that we still predict fraction
  249. assert 0.45 <= np.mean(y < quant_sparse.predict(X_sparse)) <= 0.57
  250. # TODO (1.4): remove this test in 1.4
  251. @pytest.mark.skipif(
  252. parse_version(sp_version.base_version) >= parse_version("1.11"),
  253. reason="interior-point solver is not available in SciPy 1.11",
  254. )
  255. def test_warning_new_default(X_y_data):
  256. """Check that we warn about the new default solver."""
  257. X, y = X_y_data
  258. model = QuantileRegressor()
  259. with pytest.warns(FutureWarning, match="The default solver will change"):
  260. model.fit(X, y)
  261. def test_error_interior_point_future(X_y_data, monkeypatch):
  262. """Check that we will raise a proper error when requesting
  263. `solver='interior-point'` in SciPy >= 1.11.
  264. """
  265. X, y = X_y_data
  266. import sklearn.linear_model._quantile
  267. with monkeypatch.context() as m:
  268. m.setattr(sklearn.linear_model._quantile, "sp_version", parse_version("1.11.0"))
  269. err_msg = "Solver interior-point is not anymore available in SciPy >= 1.11.0."
  270. with pytest.raises(ValueError, match=err_msg):
  271. QuantileRegressor(solver="interior-point").fit(X, y)