| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- from collections import Counter
- from contextlib import suppress
- from typing import NamedTuple
- import numpy as np
- from . import is_scalar_nan
- def _unique(values, *, return_inverse=False, return_counts=False):
- """Helper function to find unique values with support for python objects.
- Uses pure python method for object dtype, and numpy method for
- all other dtypes.
- Parameters
- ----------
- values : ndarray
- Values to check for unknowns.
- return_inverse : bool, default=False
- If True, also return the indices of the unique values.
- return_counts : bool, default=False
- If True, also return the number of times each unique item appears in
- values.
- Returns
- -------
- unique : ndarray
- The sorted unique values.
- unique_inverse : ndarray
- The indices to reconstruct the original array from the unique array.
- Only provided if `return_inverse` is True.
- unique_counts : ndarray
- The number of times each of the unique values comes up in the original
- array. Only provided if `return_counts` is True.
- """
- if values.dtype == object:
- return _unique_python(
- values, return_inverse=return_inverse, return_counts=return_counts
- )
- # numerical
- return _unique_np(
- values, return_inverse=return_inverse, return_counts=return_counts
- )
- def _unique_np(values, return_inverse=False, return_counts=False):
- """Helper function to find unique values for numpy arrays that correctly
- accounts for nans. See `_unique` documentation for details."""
- uniques = np.unique(
- values, return_inverse=return_inverse, return_counts=return_counts
- )
- inverse, counts = None, None
- if return_counts:
- *uniques, counts = uniques
- if return_inverse:
- *uniques, inverse = uniques
- if return_counts or return_inverse:
- uniques = uniques[0]
- # np.unique will have duplicate missing values at the end of `uniques`
- # here we clip the nans and remove it from uniques
- if uniques.size and is_scalar_nan(uniques[-1]):
- nan_idx = np.searchsorted(uniques, np.nan)
- uniques = uniques[: nan_idx + 1]
- if return_inverse:
- inverse[inverse > nan_idx] = nan_idx
- if return_counts:
- counts[nan_idx] = np.sum(counts[nan_idx:])
- counts = counts[: nan_idx + 1]
- ret = (uniques,)
- if return_inverse:
- ret += (inverse,)
- if return_counts:
- ret += (counts,)
- return ret[0] if len(ret) == 1 else ret
- class MissingValues(NamedTuple):
- """Data class for missing data information"""
- nan: bool
- none: bool
- def to_list(self):
- """Convert tuple to a list where None is always first."""
- output = []
- if self.none:
- output.append(None)
- if self.nan:
- output.append(np.nan)
- return output
- def _extract_missing(values):
- """Extract missing values from `values`.
- Parameters
- ----------
- values: set
- Set of values to extract missing from.
- Returns
- -------
- output: set
- Set with missing values extracted.
- missing_values: MissingValues
- Object with missing value information.
- """
- missing_values_set = {
- value for value in values if value is None or is_scalar_nan(value)
- }
- if not missing_values_set:
- return values, MissingValues(nan=False, none=False)
- if None in missing_values_set:
- if len(missing_values_set) == 1:
- output_missing_values = MissingValues(nan=False, none=True)
- else:
- # If there is more than one missing value, then it has to be
- # float('nan') or np.nan
- output_missing_values = MissingValues(nan=True, none=True)
- else:
- output_missing_values = MissingValues(nan=True, none=False)
- # create set without the missing values
- output = values - missing_values_set
- return output, output_missing_values
- class _nandict(dict):
- """Dictionary with support for nans."""
- def __init__(self, mapping):
- super().__init__(mapping)
- for key, value in mapping.items():
- if is_scalar_nan(key):
- self.nan_value = value
- break
- def __missing__(self, key):
- if hasattr(self, "nan_value") and is_scalar_nan(key):
- return self.nan_value
- raise KeyError(key)
- def _map_to_integer(values, uniques):
- """Map values based on its position in uniques."""
- table = _nandict({val: i for i, val in enumerate(uniques)})
- return np.array([table[v] for v in values])
- def _unique_python(values, *, return_inverse, return_counts):
- # Only used in `_uniques`, see docstring there for details
- try:
- uniques_set = set(values)
- uniques_set, missing_values = _extract_missing(uniques_set)
- uniques = sorted(uniques_set)
- uniques.extend(missing_values.to_list())
- uniques = np.array(uniques, dtype=values.dtype)
- except TypeError:
- types = sorted(t.__qualname__ for t in set(type(v) for v in values))
- raise TypeError(
- "Encoders require their input argument must be uniformly "
- f"strings or numbers. Got {types}"
- )
- ret = (uniques,)
- if return_inverse:
- ret += (_map_to_integer(values, uniques),)
- if return_counts:
- ret += (_get_counts(values, uniques),)
- return ret[0] if len(ret) == 1 else ret
- def _encode(values, *, uniques, check_unknown=True):
- """Helper function to encode values into [0, n_uniques - 1].
- Uses pure python method for object dtype, and numpy method for
- all other dtypes.
- The numpy method has the limitation that the `uniques` need to
- be sorted. Importantly, this is not checked but assumed to already be
- the case. The calling method needs to ensure this for all non-object
- values.
- Parameters
- ----------
- values : ndarray
- Values to encode.
- uniques : ndarray
- The unique values in `values`. If the dtype is not object, then
- `uniques` needs to be sorted.
- check_unknown : bool, default=True
- If True, check for values in `values` that are not in `unique`
- and raise an error. This is ignored for object dtype, and treated as
- True in this case. This parameter is useful for
- _BaseEncoder._transform() to avoid calling _check_unknown()
- twice.
- Returns
- -------
- encoded : ndarray
- Encoded values
- """
- if values.dtype.kind in "OUS":
- try:
- return _map_to_integer(values, uniques)
- except KeyError as e:
- raise ValueError(f"y contains previously unseen labels: {str(e)}")
- else:
- if check_unknown:
- diff = _check_unknown(values, uniques)
- if diff:
- raise ValueError(f"y contains previously unseen labels: {str(diff)}")
- return np.searchsorted(uniques, values)
- def _check_unknown(values, known_values, return_mask=False):
- """
- Helper function to check for unknowns in values to be encoded.
- Uses pure python method for object dtype, and numpy method for
- all other dtypes.
- Parameters
- ----------
- values : array
- Values to check for unknowns.
- known_values : array
- Known values. Must be unique.
- return_mask : bool, default=False
- If True, return a mask of the same shape as `values` indicating
- the valid values.
- Returns
- -------
- diff : list
- The unique values present in `values` and not in `know_values`.
- valid_mask : boolean array
- Additionally returned if ``return_mask=True``.
- """
- valid_mask = None
- if values.dtype.kind in "OUS":
- values_set = set(values)
- values_set, missing_in_values = _extract_missing(values_set)
- uniques_set = set(known_values)
- uniques_set, missing_in_uniques = _extract_missing(uniques_set)
- diff = values_set - uniques_set
- nan_in_diff = missing_in_values.nan and not missing_in_uniques.nan
- none_in_diff = missing_in_values.none and not missing_in_uniques.none
- def is_valid(value):
- return (
- value in uniques_set
- or missing_in_uniques.none
- and value is None
- or missing_in_uniques.nan
- and is_scalar_nan(value)
- )
- if return_mask:
- if diff or nan_in_diff or none_in_diff:
- valid_mask = np.array([is_valid(value) for value in values])
- else:
- valid_mask = np.ones(len(values), dtype=bool)
- diff = list(diff)
- if none_in_diff:
- diff.append(None)
- if nan_in_diff:
- diff.append(np.nan)
- else:
- unique_values = np.unique(values)
- diff = np.setdiff1d(unique_values, known_values, assume_unique=True)
- if return_mask:
- if diff.size:
- valid_mask = np.isin(values, known_values)
- else:
- valid_mask = np.ones(len(values), dtype=bool)
- # check for nans in the known_values
- if np.isnan(known_values).any():
- diff_is_nan = np.isnan(diff)
- if diff_is_nan.any():
- # removes nan from valid_mask
- if diff.size and return_mask:
- is_nan = np.isnan(values)
- valid_mask[is_nan] = 1
- # remove nan from diff
- diff = diff[~diff_is_nan]
- diff = list(diff)
- if return_mask:
- return diff, valid_mask
- return diff
- class _NaNCounter(Counter):
- """Counter with support for nan values."""
- def __init__(self, items):
- super().__init__(self._generate_items(items))
- def _generate_items(self, items):
- """Generate items without nans. Stores the nan counts separately."""
- for item in items:
- if not is_scalar_nan(item):
- yield item
- continue
- if not hasattr(self, "nan_count"):
- self.nan_count = 0
- self.nan_count += 1
- def __missing__(self, key):
- if hasattr(self, "nan_count") and is_scalar_nan(key):
- return self.nan_count
- raise KeyError(key)
- def _get_counts(values, uniques):
- """Get the count of each of the `uniques` in `values`.
- The counts will use the order passed in by `uniques`. For non-object dtypes,
- `uniques` is assumed to be sorted and `np.nan` is at the end.
- """
- if values.dtype.kind in "OU":
- counter = _NaNCounter(values)
- output = np.zeros(len(uniques), dtype=np.int64)
- for i, item in enumerate(uniques):
- with suppress(KeyError):
- output[i] = counter[item]
- return output
- unique_values, counts = _unique_np(values, return_counts=True)
- # Recorder unique_values based on input: `uniques`
- uniques_in_values = np.isin(uniques, unique_values, assume_unique=True)
- if np.isnan(unique_values[-1]) and np.isnan(uniques[-1]):
- uniques_in_values[-1] = True
- unique_valid_indices = np.searchsorted(unique_values, uniques[uniques_in_values])
- output = np.zeros_like(uniques, dtype=np.int64)
- output[uniques_in_values] = counts[unique_valid_indices]
- return output
|