| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689 |
- # This base template ("datapipe.pyi.in") is generated from mypy stubgen with minimal editing for code injection
- # The output file will be "datapipe.pyi". This is executed as part of torch/CMakeLists.txt
- # Note that, for mypy, .pyi file takes precedent over .py file, such that we must define the interface for other
- # classes/objects here, even though we are not injecting extra code into them at the moment.
- from typing import Any, Callable, Dict, Generic, Iterator, List, Literal, Optional, TypeVar, Union
- from torch.utils.data import Dataset, default_collate, IterableDataset
- from torch.utils.data.datapipes._hook_iterator import _SnapshotState
- from torch.utils.data.datapipes._typing import _DataPipeMeta, _IterDataPipeMeta
- T_co = TypeVar("T_co", covariant=True)
- T = TypeVar("T")
- UNTRACABLE_DATAFRAME_PIPES: Any
- class MapDataPipe(Dataset[T_co], metaclass=_DataPipeMeta):
- functions: Dict[str, Callable] = ...
- reduce_ex_hook: Optional[Callable] = ...
- getstate_hook: Optional[Callable] = ...
- str_hook: Optional[Callable] = ...
- repr_hook: Optional[Callable] = ...
- def __getattr__(self, attribute_name: Any): ...
- @classmethod
- def register_function(cls, function_name: Any, function: Any) -> None: ...
- @classmethod
- def register_datapipe_as_function(
- cls,
- function_name: Any,
- cls_to_register: Any,
- ): ...
- def __getstate__(self): ...
- def __reduce_ex__(self, *args: Any, **kwargs: Any): ...
- @classmethod
- def set_getstate_hook(cls, hook_fn: Any) -> None: ...
- @classmethod
- def set_reduce_ex_hook(cls, hook_fn: Any) -> None: ...
- # Functional form of 'BatcherMapDataPipe'
- def batch(self, batch_size: int, drop_last: bool = False, wrapper_class=DataChunk) -> MapDataPipe:
- r"""
- Create mini-batches of data (functional name: ``batch``).
-
- An outer dimension will be added as ``batch_size`` if ``drop_last`` is set to ``True``,
- or ``length % batch_size`` for the last batch if ``drop_last`` is set to ``False``.
-
- Args:
- datapipe: Iterable DataPipe being batched
- batch_size: The size of each batch
- drop_last: Option to drop the last batch if it's not full
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.map import SequenceWrapper
- >>> dp = SequenceWrapper(range(10))
- >>> batch_dp = dp.batch(batch_size=2)
- >>> list(batch_dp)
- [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
- """
-
- # Functional form of 'ConcaterMapDataPipe'
- def concat(self, *datapipes: MapDataPipe) -> MapDataPipe:
- r"""
- Concatenate multiple Map DataPipes (functional name: ``concat``).
-
- The new index of is the cumulative sum of source DataPipes.
- For example, if there are 2 source DataPipes both with length 5,
- index 0 to 4 of the resulting `ConcatMapDataPipe` would refer to
- elements of the first DataPipe, and 5 to 9 would refer to elements
- of the second DataPipe.
-
- Args:
- datapipes: Map DataPipes being concatenated
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.map import SequenceWrapper
- >>> dp1 = SequenceWrapper(range(3))
- >>> dp2 = SequenceWrapper(range(3))
- >>> concat_dp = dp1.concat(dp2)
- >>> list(concat_dp)
- [0, 1, 2, 0, 1, 2]
- """
-
- # Functional form of 'MapperMapDataPipe'
- def map(self, fn: Callable= ...) -> MapDataPipe:
- r"""
- Apply the input function over each item from the source DataPipe (functional name: ``map``).
-
- The function can be any regular Python function or partial object. Lambda
- function is not recommended as it is not supported by pickle.
-
- Args:
- datapipe: Source MapDataPipe
- fn: Function being applied to each item
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.map import SequenceWrapper, Mapper
- >>> def add_one(x):
- ... return x + 1
- >>> dp = SequenceWrapper(range(10))
- >>> map_dp_1 = dp.map(add_one)
- >>> list(map_dp_1)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- >>> map_dp_2 = Mapper(dp, lambda x: x + 1)
- >>> list(map_dp_2)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- """
-
- # Functional form of 'ShufflerIterDataPipe'
- def shuffle(self, *, indices: Optional[List] = None) -> IterDataPipe:
- r"""
- Shuffle the input MapDataPipe via its indices (functional name: ``shuffle``).
-
- When it is used with :class:`~torch.utils.data.DataLoader`, the methods to
- set up random seed are different based on :attr:`num_workers`.
-
- For single-process mode (:attr:`num_workers == 0`), the random seed is set before
- the :class:`~torch.utils.data.DataLoader` in the main process. For multi-process
- mode (:attr:`num_worker > 0`), ``worker_init_fn`` is used to set up a random seed
- for each worker process.
-
- Args:
- datapipe: MapDataPipe being shuffled
- indices: a list of indices of the MapDataPipe. If not provided, we assume it uses 0-based indexing
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.map import SequenceWrapper
- >>> dp = SequenceWrapper(range(10))
- >>> shuffle_dp = dp.shuffle().set_seed(0)
- >>> list(shuffle_dp)
- [7, 8, 1, 5, 3, 4, 2, 0, 9, 6]
- >>> list(shuffle_dp)
- [6, 1, 9, 5, 2, 4, 7, 3, 8, 0]
- >>> # Reset seed for Shuffler
- >>> shuffle_dp = shuffle_dp.set_seed(0)
- >>> list(shuffle_dp)
- [7, 8, 1, 5, 3, 4, 2, 0, 9, 6]
-
- Note:
- Even thought this ``shuffle`` operation takes a ``MapDataPipe`` as the input, it would return an
- ``IterDataPipe`` rather than a ``MapDataPipe``, because ``MapDataPipe`` should be non-sensitive to
- the order of data order for the sake of random reads, but ``IterDataPipe`` depends on the order
- of data during data-processing.
- """
-
- # Functional form of 'ZipperMapDataPipe'
- def zip(self, *datapipes: MapDataPipe[T_co]) -> MapDataPipe:
- r"""
- Aggregates elements into a tuple from each of the input DataPipes (functional name: ``zip``).
-
- This MataPipe is out of bound as soon as the shortest input DataPipe is exhausted.
-
- Args:
- *datapipes: Map DataPipes being aggregated
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.map import SequenceWrapper
- >>> dp1 = SequenceWrapper(range(3))
- >>> dp2 = SequenceWrapper(range(10, 13))
- >>> zip_dp = dp1.zip(dp2)
- >>> list(zip_dp)
- [(0, 10), (1, 11), (2, 12)]
- """
-
- class IterDataPipe(IterableDataset[T_co], metaclass=_IterDataPipeMeta):
- functions: Dict[str, Callable] = ...
- reduce_ex_hook: Optional[Callable] = ...
- getstate_hook: Optional[Callable] = ...
- str_hook: Optional[Callable] = ...
- repr_hook: Optional[Callable] = ...
- _number_of_samples_yielded: int = ...
- _snapshot_state: _SnapshotState = _SnapshotState.Iterating
- _fast_forward_iterator: Optional[Iterator] = ...
- def __getattr__(self, attribute_name: Any): ...
- @classmethod
- def register_function(cls, function_name: Any, function: Any) -> None: ...
- @classmethod
- def register_datapipe_as_function(
- cls,
- function_name: Any,
- cls_to_register: Any,
- enable_df_api_tracing: bool = ...,
- ): ...
- def __getstate__(self): ...
- def __reduce_ex__(self, *args: Any, **kwargs: Any): ...
- @classmethod
- def set_getstate_hook(cls, hook_fn: Any) -> None: ...
- @classmethod
- def set_reduce_ex_hook(cls, hook_fn: Any) -> None: ...
- # Functional form of 'BatcherIterDataPipe'
- def batch(self, batch_size: int, drop_last: bool = False, wrapper_class=DataChunk) -> IterDataPipe:
- r"""
- Creates mini-batches of data (functional name: ``batch``).
-
- An outer dimension will be added as ``batch_size`` if ``drop_last`` is set to ``True``, or ``length % batch_size`` for the
- last batch if ``drop_last`` is set to ``False``.
-
- Args:
- datapipe: Iterable DataPipe being batched
- batch_size: The size of each batch
- drop_last: Option to drop the last batch if it's not full
- wrapper_class: wrapper to apply onto each batch (type ``List``) before yielding,
- defaults to ``DataChunk``
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> dp = IterableWrapper(range(10))
- >>> dp = dp.batch(batch_size=3, drop_last=True)
- >>> list(dp)
- [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
- """
-
- # Functional form of 'CollatorIterDataPipe'
- def collate(self, conversion: Optional[Union[Callable[..., Any],Dict[Union[str, Any], Union[Callable, Any]],]] = default_collate, collate_fn: Optional[Callable] = None) -> IterDataPipe:
- r"""
- Collates samples from DataPipe to Tensor(s) by a custom collate function (functional name: ``collate``).
-
- By default, it uses :func:`torch.utils.data.default_collate`.
-
- .. note::
- While writing a custom collate function, you can import :func:`torch.utils.data.default_collate` for the
- default behavior and `functools.partial` to specify any additional arguments.
-
- Args:
- datapipe: Iterable DataPipe being collated
- collate_fn: Customized collate function to collect and combine data or a batch of data.
- Default function collates to Tensor(s) based on data type.
-
- Example:
- >>> # xdoctest: +SKIP
- >>> # Convert integer data to float Tensor
- >>> class MyIterDataPipe(torch.utils.data.IterDataPipe):
- ... def __init__(self, start, end):
- ... super(MyIterDataPipe).__init__()
- ... assert end > start, "this example code only works with end >= start"
- ... self.start = start
- ... self.end = end
- ...
- ... def __iter__(self):
- ... return iter(range(self.start, self.end))
- ...
- ... def __len__(self):
- ... return self.end - self.start
- ...
- >>> ds = MyIterDataPipe(start=3, end=7)
- >>> print(list(ds))
- [3, 4, 5, 6]
- >>> def collate_fn(batch):
- ... return torch.tensor(batch, dtype=torch.float)
- ...
- >>> collated_ds = CollateIterDataPipe(ds, collate_fn=collate_fn)
- >>> print(list(collated_ds))
- [tensor(3.), tensor(4.), tensor(5.), tensor(6.)]
- """
-
- # Functional form of 'ConcaterIterDataPipe'
- def concat(self, *datapipes: IterDataPipe) -> IterDataPipe:
- r"""
- Concatenates multiple Iterable DataPipes (functional name: ``concat``).
-
- The resulting DataPipe will yield all the elements from the first input DataPipe, before yielding from the subsequent ones.
-
- Args:
- datapipes: Iterable DataPipes being concatenated
-
- Example:
- >>> # xdoctest: +REQUIRES(module:torchdata)
- >>> import random
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> dp1 = IterableWrapper(range(3))
- >>> dp2 = IterableWrapper(range(5))
- >>> list(dp1.concat(dp2))
- [0, 1, 2, 0, 1, 2, 3, 4]
- """
-
- # Functional form of 'DemultiplexerIterDataPipe'
- def demux(self, num_instances: int, classifier_fn: Callable[[T_co], Optional[int]], drop_none: bool = False, buffer_size: int = 1000) -> List[IterDataPipe]:
- r"""
- Splits the input DataPipe into multiple child DataPipes, using the given classification function (functional name: ``demux``).
-
- A list of the child DataPipes is returned from this operation.
-
- Args:
- datapipe: Iterable DataPipe being filtered
- num_instances: number of instances of the DataPipe to create
- classifier_fn: a function that maps values to an integer within the range ``[0, num_instances - 1]`` or ``None``
- drop_none: defaults to ``False``, if ``True``, the function will skip over elements classified as ``None``
- buffer_size: this defines the maximum number of inputs that the buffer can hold across all child
- DataPipes while waiting for their values to be yielded.
- Defaults to ``1000``. Use ``-1`` for the unlimited buffer.
-
- Examples:
- >>> # xdoctest: +REQUIRES(module:torchdata)
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> def odd_or_even(n):
- ... return n % 2
- >>> source_dp = IterableWrapper(range(5))
- >>> dp1, dp2 = source_dp.demux(num_instances=2, classifier_fn=odd_or_even)
- >>> list(dp1)
- [0, 2, 4]
- >>> list(dp2)
- [1, 3]
- >>> # It can also filter out any element that gets `None` from the `classifier_fn`
- >>> def odd_or_even_no_zero(n):
- ... return n % 2 if n != 0 else None
- >>> dp1, dp2 = source_dp.demux(num_instances=2, classifier_fn=odd_or_even_no_zero, drop_none=True)
- >>> list(dp1)
- [2, 4]
- >>> list(dp2)
- [1, 3]
- """
-
- # Functional form of 'FilterIterDataPipe'
- def filter(self, filter_fn: Callable, input_col=None) -> IterDataPipe:
- r"""
- Filters out elements from the source datapipe according to input ``filter_fn`` (functional name: ``filter``).
-
- Args:
- datapipe: Iterable DataPipe being filtered
- filter_fn: Customized function mapping an element to a boolean.
- input_col: Index or indices of data which ``filter_fn`` is applied, such as:
-
- - ``None`` as default to apply ``filter_fn`` to the data directly.
- - Integer(s) is used for list/tuple.
- - Key(s) is used for dict.
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> def is_even(n):
- ... return n % 2 == 0
- >>> dp = IterableWrapper(range(5))
- >>> filter_dp = dp.filter(filter_fn=is_even)
- >>> list(filter_dp)
- [0, 2, 4]
- """
-
- # Functional form of 'ForkerIterDataPipe'
- def fork(self, num_instances: int, buffer_size: int = 1000, copy: Optional[Literal["shallow", "deep"]] = None) -> List[IterDataPipe]:
- r"""
- Creates multiple instances of the same Iterable DataPipe (functional name: ``fork``).
-
- Args:
- datapipe: Iterable DataPipe being copied
- num_instances: number of instances of the datapipe to create
- buffer_size: this restricts how far ahead the leading child DataPipe
- can read relative to the slowest child DataPipe.
- Defaults to ``1000``. Use ``-1`` for the unlimited buffer.
- copy: copy strategy to use for items yielded by each branch. Supported
- options are ``None`` for no copying, ``"shallow"`` for shallow object
- copies, and ``"deep"`` for deep object copies. Defaults to ``None``.
-
- Note:
- All branches of the forked pipeline return the identical object unless
- the copy parameter is supplied. If the object is mutable or contains
- mutable objects, changing them in one branch will affect all others.
-
- Example:
- >>> # xdoctest: +REQUIRES(module:torchdata)
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> source_dp = IterableWrapper(range(5))
- >>> dp1, dp2 = source_dp.fork(num_instances=2)
- >>> list(dp1)
- [0, 1, 2, 3, 4]
- >>> list(dp2)
- [0, 1, 2, 3, 4]
- """
-
- # Functional form of 'GrouperIterDataPipe'
- def groupby(self, group_key_fn: Callable[[T_co], Any], *, keep_key: bool = False, buffer_size: int = 10000, group_size: Optional[int] = None, guaranteed_group_size: Optional[int] = None, drop_remaining: bool = False) -> IterDataPipe:
- r"""
- Groups data from IterDataPipe by keys from ``group_key_fn``, yielding a ``DataChunk`` with batch size up to ``group_size``.
-
- (functional name: ``groupby``).
-
- The samples are read sequentially from the source ``datapipe``, and a batch of samples belonging to the same group
- will be yielded as soon as the size of the batch reaches ``group_size``. When the buffer is full,
- the DataPipe will yield the largest batch with the same key, provided that its size is larger
- than ``guaranteed_group_size``. If its size is smaller, it will be dropped if ``drop_remaining=True``.
-
- After iterating through the entirety of source ``datapipe``, everything not dropped due to the buffer capacity
- will be yielded from the buffer, even if the group sizes are smaller than ``guaranteed_group_size``.
-
- Args:
- datapipe: Iterable datapipe to be grouped
- group_key_fn: Function used to generate group key from the data of the source datapipe
- keep_key: Option to yield the matching key along with the items in a tuple,
- resulting in `(key, [items])` otherwise returning [items]
- buffer_size: The size of buffer for ungrouped data
- group_size: The max size of each group, a batch is yielded as soon as it reaches this size
- guaranteed_group_size: The guaranteed minimum group size to be yielded in case the buffer is full
- drop_remaining: Specifies if the group smaller than ``guaranteed_group_size`` will be dropped from buffer
- when the buffer is full
-
- Example:
- >>> import os
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> def group_fn(file):
- ... return os.path.basename(file).split(".")[0]
- >>> source_dp = IterableWrapper(["a.png", "b.png", "a.json", "b.json", "a.jpg", "c.json"])
- >>> dp0 = source_dp.groupby(group_key_fn=group_fn)
- >>> list(dp0)
- [['a.png', 'a.json', 'a.jpg'], ['b.png', 'b.json'], ['c.json']]
- >>> # A group is yielded as soon as its size equals to `group_size`
- >>> dp1 = source_dp.groupby(group_key_fn=group_fn, group_size=2)
- >>> list(dp1)
- [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
- >>> # Scenario where `buffer` is full, and group 'a' needs to be yielded since its size > `guaranteed_group_size`
- >>> dp2 = source_dp.groupby(group_key_fn=group_fn, buffer_size=3, group_size=3, guaranteed_group_size=2)
- >>> list(dp2)
- [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
- """
-
- # Functional form of 'FileListerIterDataPipe'
- def list_files(self, masks: Union[str, List[str]] = '', *, recursive: bool = False, abspath: bool = False, non_deterministic: bool = False, length: int = -1) -> IterDataPipe:
- r"""
- Given path(s) to the root directory, yields file pathname(s) (path + filename) of files within the root directory.
-
- Multiple root directories can be provided (functional name: ``list_files``).
-
- Args:
- root: Root directory or a sequence of root directories
- masks: Unix style filter string or string list for filtering file name(s)
- recursive: Whether to return pathname from nested directories or not
- abspath: Whether to return relative pathname or absolute pathname
- non_deterministic: Whether to return pathname in sorted order or not.
- If ``False``, the results yielded from each root directory will be sorted
- length: Nominal length of the datapipe
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import FileLister
- >>> dp = FileLister(root=".", recursive=True)
- >>> list(dp)
- ['example.py', './data/data.tar']
- """
-
- # Functional form of 'MapperIterDataPipe'
- def map(self, fn: Callable, input_col=None, output_col=None) -> IterDataPipe:
- r"""
- Applies a function over each item from the source DataPipe (functional name: ``map``).
-
- The function can be any regular Python function or partial object. Lambda
- function is not recommended as it is not supported by pickle.
-
- Args:
- datapipe: Source Iterable DataPipe
- fn: Function being applied over each item
- input_col: Index or indices of data which ``fn`` is applied, such as:
-
- - ``None`` as default to apply ``fn`` to the data directly.
- - Integer(s) is used for list/tuple.
- - Key(s) is used for dict.
-
- output_col: Index of data where result of ``fn`` is placed. ``output_col`` can be specified
- only when ``input_col`` is not ``None``
-
- - ``None`` as default to replace the index that ``input_col`` specified; For ``input_col`` with
- multiple indices, the left-most one is used, and other indices will be removed.
- - Integer is used for list/tuple. ``-1`` represents to append result at the end.
- - Key is used for dict. New key is acceptable.
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper, Mapper
- >>> def add_one(x):
- ... return x + 1
- >>> dp = IterableWrapper(range(10))
- >>> map_dp_1 = dp.map(add_one) # Invocation via functional form is preferred
- >>> list(map_dp_1)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- >>> # We discourage the usage of `lambda` functions as they are not serializable with `pickle`
- >>> # Use `functools.partial` or explicitly define the function instead
- >>> map_dp_2 = Mapper(dp, lambda x: x + 1)
- >>> list(map_dp_2)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- """
-
- # Functional form of 'MultiplexerIterDataPipe'
- def mux(self, *datapipes) -> IterDataPipe:
- r"""
- Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux``).
-
- As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration,
- and so on. It ends when the shortest input DataPipe is exhausted.
-
- Args:
- datapipes: Iterable DataPipes that will take turn to yield their elements, until the shortest DataPipe is exhausted
-
- Example:
- >>> # xdoctest: +REQUIRES(module:torchdata)
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> dp1, dp2, dp3 = IterableWrapper(range(3)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
- >>> list(dp1.mux(dp2, dp3))
- [0, 10, 20, 1, 11, 21, 2, 12, 22]
- """
-
- # Functional form of 'FileOpenerIterDataPipe'
- def open_files(self, mode: str = 'r', encoding: Optional[str] = None, length: int = -1) -> IterDataPipe:
- r"""
- Given pathnames, opens files and yield pathname and file stream in a tuple (functional name: ``open_files``).
-
- Args:
- datapipe: Iterable datapipe that provides pathnames
- mode: An optional string that specifies the mode in which
- the file is opened by ``open()``. It defaults to ``r``, other options are
- ``b`` for reading in binary mode and ``t`` for text mode.
- encoding: An optional string that specifies the encoding of the
- underlying file. It defaults to ``None`` to match the default encoding of ``open``.
- length: Nominal length of the datapipe
-
- Note:
- The opened file handles will be closed by Python's GC periodically. Users can choose
- to close them explicitly.
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import FileLister, FileOpener, StreamReader
- >>> dp = FileLister(root=".").filter(lambda fname: fname.endswith('.txt'))
- >>> dp = FileOpener(dp)
- >>> dp = StreamReader(dp)
- >>> list(dp)
- [('./abc.txt', 'abc')]
- """
-
- # Functional form of 'StreamReaderIterDataPipe'
- def read_from_stream(self, chunk=None) -> IterDataPipe:
- r"""
- Given IO streams and their label names, yield bytes with label name as tuple.
-
- (functional name: ``read_from_stream``).
-
- Args:
- datapipe: Iterable DataPipe provides label/URL and byte stream
- chunk: Number of bytes to be read from stream per iteration.
- If ``None``, all bytes will be read until the EOF.
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper, StreamReader
- >>> from io import StringIO
- >>> dp = IterableWrapper([("alphabet", StringIO("abcde"))])
- >>> list(StreamReader(dp, chunk=1))
- [('alphabet', 'a'), ('alphabet', 'b'), ('alphabet', 'c'), ('alphabet', 'd'), ('alphabet', 'e')]
- """
-
- # Functional form of 'RoutedDecoderIterDataPipe'
- def routed_decode(self, *handlers: Callable, key_fn: Callable= ...) -> IterDataPipe:
- r"""
- Decodes binary streams from input DataPipe, yields pathname and decoded data in a tuple.
-
- (functional name: ``routed_decode``)
-
- Args:
- datapipe: Iterable datapipe that provides pathname and binary stream in tuples
- handlers: Optional user defined decoder handlers. If ``None``, basic and image decoder
- handlers will be set as default. If multiple handles are provided, the priority
- order follows the order of handlers (the first handler has the top priority)
- key_fn: Function for decoder to extract key from pathname to dispatch handlers.
- Default is set to extract file extension from pathname
-
- Note:
- When ``key_fn`` is specified returning anything other than extension, the default
- handler will not work and users need to specify custom handler. Custom handler
- could use regex to determine the eligibility to handle data.
- """
-
- # Functional form of 'ShardingFilterIterDataPipe'
- def sharding_filter(self, sharding_group_filter=None) -> IterDataPipe:
- r"""
- Wrapper that allows DataPipe to be sharded (functional name: ``sharding_filter``).
-
- After ``apply_sharding`` is called, each instance of the DataPipe (on different workers) will have every `n`-th element of the
- original DataPipe, where `n` equals to the number of instances.
-
- Args:
- source_datapipe: Iterable DataPipe that will be sharded
- """
-
- # Functional form of 'ShufflerIterDataPipe'
- def shuffle(self, *, buffer_size: int = 10000, unbatch_level: int = 0) -> IterDataPipe:
- r"""
- Shuffle the input DataPipe with a buffer (functional name: ``shuffle``).
-
- The buffer with ``buffer_size`` is filled with elements from the datapipe first. Then,
- each item will be yielded from the buffer by reservoir sampling via iterator.
-
- ``buffer_size`` is required to be larger than ``0``. For ``buffer_size == 1``, the
- datapipe is not shuffled. In order to fully shuffle all elements from datapipe,
- ``buffer_size`` is required to be greater than or equal to the size of datapipe.
-
- When it is used with :class:`torch.utils.data.DataLoader`, the methods to
- set up random seed are different based on :attr:`num_workers`.
-
- For single-process mode (:attr:`num_workers == 0`), the random seed is set before
- the :class:`~torch.utils.data.DataLoader` in the main process. For multi-process
- mode (:attr:`num_worker > 0`), `worker_init_fn` is used to set up a random seed
- for each worker process.
-
- Args:
- datapipe: The IterDataPipe being shuffled
- buffer_size: The buffer size for shuffling (default to ``10000``)
- unbatch_level: Specifies if it is necessary to unbatch source data before
- applying the shuffle
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> dp = IterableWrapper(range(10))
- >>> shuffle_dp = dp.shuffle()
- >>> list(shuffle_dp)
- [0, 4, 1, 6, 3, 2, 9, 5, 7, 8]
- """
-
- # Functional form of 'UnBatcherIterDataPipe'
- def unbatch(self, unbatch_level: int = 1) -> IterDataPipe:
- r"""
- Undos batching of data (functional name: ``unbatch``).
-
- In other words, it flattens the data up to the specified level within a batched DataPipe.
-
- Args:
- datapipe: Iterable DataPipe being un-batched
- unbatch_level: Defaults to ``1`` (only flattening the top level). If set to ``2``,
- it will flatten the top two levels, and ``-1`` will flatten the entire DataPipe.
-
- Example:
- >>> # xdoctest: +SKIP
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> source_dp = IterableWrapper([[[0, 1], [2]], [[3, 4], [5]], [[6]]])
- >>> dp1 = source_dp.unbatch()
- >>> list(dp1)
- [[0, 1], [2], [3, 4], [5], [6]]
- >>> dp2 = source_dp.unbatch(unbatch_level=2)
- >>> list(dp2)
- [0, 1, 2, 3, 4, 5, 6]
- """
-
- # Functional form of 'ZipperIterDataPipe'
- def zip(self, *datapipes: IterDataPipe) -> IterDataPipe:
- r"""
- Aggregates elements into a tuple from each of the input DataPipes (functional name: ``zip``).
-
- The output is stopped as soon as the shortest input DataPipe is exhausted.
-
- Args:
- *datapipes: Iterable DataPipes being aggregated
-
- Example:
- >>> # xdoctest: +REQUIRES(module:torchdata)
- >>> from torchdata.datapipes.iter import IterableWrapper
- >>> dp1, dp2, dp3 = IterableWrapper(range(5)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
- >>> list(dp1.zip(dp2, dp3))
- [(0, 10, 20), (1, 11, 21), (2, 12, 22), (3, 13, 23), (4, 14, 24)]
- """
-
- class DFIterDataPipe(IterDataPipe):
- def _is_dfpipe(self): ...
- def __iter__(self): ...
- class _DataPipeSerializationWrapper:
- def __init__(self, datapipe): ...
- def __getstate__(self): ...
- def __setstate__(self, state): ...
- def __len__(self): ...
- class _IterDataPipeSerializationWrapper(_DataPipeSerializationWrapper, IterDataPipe):
- def __iter__(self): ...
- class _MapDataPipeSerializationWrapper(_DataPipeSerializationWrapper, MapDataPipe):
- def __getitem__(self, idx): ...
- class DataChunk(list, Generic[T]):
- def __init__(self, items):
- super().__init__(items)
- self.items = items
- def as_str(self, indent: str = "") -> str:
- res = indent + "[" + ", ".join(str(i) for i in iter(self)) + "]"
- return res
- def __iter__(self) -> Iterator[T]:
- yield from super().__iter__()
- def raw_iterator(self) -> T: # type: ignore[misc]
- yield from self.items
|