| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653 |
- # Copyright 2020 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import random
- import warnings
- from collections.abc import Mapping
- from dataclasses import dataclass
- from random import randint
- from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union
- import numpy as np
- from ..models.bert import BertTokenizer, BertTokenizerFast
- from ..tokenization_utils_base import PreTrainedTokenizerBase
- from ..utils import PaddingStrategy
- InputDataClass = NewType("InputDataClass", Any)
- """
- A DataCollator is a function that takes a list of samples from a Dataset and collate them into a batch, as a dictionary
- of PyTorch/TensorFlow tensors or NumPy arrays.
- """
- DataCollator = NewType("DataCollator", Callable[[List[InputDataClass]], Dict[str, Any]])
- class DataCollatorMixin:
- def __call__(self, features, return_tensors=None):
- if return_tensors is None:
- return_tensors = self.return_tensors
- if return_tensors == "tf":
- return self.tf_call(features)
- elif return_tensors == "pt":
- return self.torch_call(features)
- elif return_tensors == "np":
- return self.numpy_call(features)
- else:
- raise ValueError(f"Framework '{return_tensors}' not recognized!")
- def pad_without_fast_tokenizer_warning(tokenizer, *pad_args, **pad_kwargs):
- """
- Pads without triggering the warning about how using the pad function is sub-optimal when using a fast tokenizer.
- """
- # To avoid errors when using Feature extractors
- if not hasattr(tokenizer, "deprecation_warnings"):
- return tokenizer.pad(*pad_args, **pad_kwargs)
- # Save the state of the warning, then disable it
- warning_state = tokenizer.deprecation_warnings.get("Asking-to-pad-a-fast-tokenizer", False)
- tokenizer.deprecation_warnings["Asking-to-pad-a-fast-tokenizer"] = True
- try:
- padded = tokenizer.pad(*pad_args, **pad_kwargs)
- finally:
- # Restore the state of the warning.
- tokenizer.deprecation_warnings["Asking-to-pad-a-fast-tokenizer"] = warning_state
- return padded
- def default_data_collator(features: List[InputDataClass], return_tensors="pt") -> Dict[str, Any]:
- """
- Very simple data collator that simply collates batches of dict-like objects and performs special handling for
- potential keys named:
- - `label`: handles a single value (int or float) per object
- - `label_ids`: handles a list of values per object
- Does not do any additional preprocessing: property names of the input object will be used as corresponding inputs
- to the model. See glue and ner for example of how it's useful.
- """
- # In this function we'll make the assumption that all `features` in the batch
- # have the same attributes.
- # So we will look at the first element as a proxy for what attributes exist
- # on the whole batch.
- if return_tensors == "pt":
- return torch_default_data_collator(features)
- elif return_tensors == "tf":
- return tf_default_data_collator(features)
- elif return_tensors == "np":
- return numpy_default_data_collator(features)
- @dataclass
- class DefaultDataCollator(DataCollatorMixin):
- """
- Very simple data collator that simply collates batches of dict-like objects and performs special handling for
- potential keys named:
- - `label`: handles a single value (int or float) per object
- - `label_ids`: handles a list of values per object
- Does not do any additional preprocessing: property names of the input object will be used as corresponding inputs
- to the model. See glue and ner for example of how it's useful.
- This is an object (like other data collators) rather than a pure function like default_data_collator. This can be
- helpful if you need to set a return_tensors value at initialization.
- Args:
- return_tensors (`str`, *optional*, defaults to `"pt"`):
- The type of Tensor to return. Allowable values are "np", "pt" and "tf".
- """
- return_tensors: str = "pt"
- def __call__(self, features: List[Dict[str, Any]], return_tensors=None) -> Dict[str, Any]:
- if return_tensors is None:
- return_tensors = self.return_tensors
- return default_data_collator(features, return_tensors)
- def torch_default_data_collator(features: List[InputDataClass]) -> Dict[str, Any]:
- import torch
- if not isinstance(features[0], Mapping):
- features = [vars(f) for f in features]
- first = features[0]
- batch = {}
- # Special handling for labels.
- # Ensure that tensor is created with the correct type
- # (it should be automatically the case, but let's make sure of it.)
- if "label" in first and first["label"] is not None:
- label = first["label"].item() if isinstance(first["label"], torch.Tensor) else first["label"]
- dtype = torch.long if isinstance(label, int) else torch.float
- batch["labels"] = torch.tensor([f["label"] for f in features], dtype=dtype)
- elif "label_ids" in first and first["label_ids"] is not None:
- if isinstance(first["label_ids"], torch.Tensor):
- batch["labels"] = torch.stack([f["label_ids"] for f in features])
- else:
- dtype = torch.long if isinstance(first["label_ids"][0], int) else torch.float
- batch["labels"] = torch.tensor([f["label_ids"] for f in features], dtype=dtype)
- # Handling of all other possible keys.
- # Again, we will use the first element to figure out which key/values are not None for this model.
- for k, v in first.items():
- if k not in ("label", "label_ids") and v is not None and not isinstance(v, str):
- if isinstance(v, torch.Tensor):
- batch[k] = torch.stack([f[k] for f in features])
- elif isinstance(v, np.ndarray):
- batch[k] = torch.from_numpy(np.stack([f[k] for f in features]))
- else:
- batch[k] = torch.tensor([f[k] for f in features])
- return batch
- def tf_default_data_collator(features: List[InputDataClass]) -> Dict[str, Any]:
- import tensorflow as tf
- if not isinstance(features[0], Mapping):
- features = [vars(f) for f in features]
- first = features[0]
- batch = {}
- # Special handling for labels.
- # Ensure that tensor is created with the correct type
- # (it should be automatically the case, but let's make sure of it.)
- if "label" in first and first["label"] is not None:
- label_col_name = "label"
- elif "label_ids" in first and first["label_ids"] is not None:
- label_col_name = "label_ids"
- elif "labels" in first and first["labels"] is not None:
- label_col_name = "labels"
- else:
- label_col_name = None
- if label_col_name is not None:
- if isinstance(first[label_col_name], tf.Tensor):
- dtype = tf.int64 if first[label_col_name].dtype.is_integer else tf.float32
- elif isinstance(first[label_col_name], np.ndarray) or isinstance(first[label_col_name], np.generic):
- dtype = tf.int64 if np.issubdtype(first[label_col_name].dtype, np.integer) else tf.float32
- elif isinstance(first[label_col_name], (tuple, list)):
- dtype = tf.int64 if isinstance(first[label_col_name][0], int) else tf.float32
- else:
- dtype = tf.int64 if isinstance(first[label_col_name], int) else tf.float32
- batch["labels"] = tf.convert_to_tensor([f[label_col_name] for f in features], dtype=dtype)
- # Handling of all other possible keys.
- # Again, we will use the first element to figure out which key/values are not None for this model.
- for k, v in first.items():
- if k not in ("label", "label_ids", "labels") and v is not None and not isinstance(v, str):
- if isinstance(v, (tf.Tensor, np.ndarray)):
- batch[k] = tf.stack([f[k] for f in features])
- else:
- batch[k] = tf.convert_to_tensor([f[k] for f in features])
- return batch
- def numpy_default_data_collator(features: List[InputDataClass]) -> Dict[str, Any]:
- if not isinstance(features[0], Mapping):
- features = [vars(f) for f in features]
- first = features[0]
- batch = {}
- # Special handling for labels.
- # Ensure that tensor is created with the correct type
- # (it should be automatically the case, but let's make sure of it.)
- if "label" in first and first["label"] is not None:
- label = first["label"].item() if isinstance(first["label"], np.ndarray) else first["label"]
- dtype = np.int64 if isinstance(label, int) else np.float32
- batch["labels"] = np.array([f["label"] for f in features], dtype=dtype)
- elif "label_ids" in first and first["label_ids"] is not None:
- if isinstance(first["label_ids"], np.ndarray):
- batch["labels"] = np.stack([f["label_ids"] for f in features])
- else:
- dtype = np.int64 if isinstance(first["label_ids"][0], int) else np.float32
- batch["labels"] = np.array([f["label_ids"] for f in features], dtype=dtype)
- # Handling of all other possible keys.
- # Again, we will use the first element to figure out which key/values are not None for this model.
- for k, v in first.items():
- if k not in ("label", "label_ids") and v is not None and not isinstance(v, str):
- if isinstance(v, np.ndarray):
- batch[k] = np.stack([f[k] for f in features])
- else:
- batch[k] = np.array([f[k] for f in features])
- return batch
- @dataclass
- class DataCollatorWithPadding:
- """
- Data collator that will dynamically pad the inputs received.
- Args:
- tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
- The tokenizer used for encoding the data.
- padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`):
- Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
- among:
- - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single
- sequence is provided).
- - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
- acceptable input length for the model if that argument is not provided.
- - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths).
- max_length (`int`, *optional*):
- Maximum length of the returned list and optionally padding length (see above).
- pad_to_multiple_of (`int`, *optional*):
- If set will pad the sequence to a multiple of the provided value.
- This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
- 7.5 (Volta).
- return_tensors (`str`, *optional*, defaults to `"pt"`):
- The type of Tensor to return. Allowable values are "np", "pt" and "tf".
- """
- tokenizer: PreTrainedTokenizerBase
- padding: Union[bool, str, PaddingStrategy] = True
- max_length: Optional[int] = None
- pad_to_multiple_of: Optional[int] = None
- return_tensors: str = "pt"
- def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, Any]:
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer,
- features,
- padding=self.padding,
- max_length=self.max_length,
- pad_to_multiple_of=self.pad_to_multiple_of,
- return_tensors=self.return_tensors,
- )
- if "label" in batch:
- batch["labels"] = batch["label"]
- del batch["label"]
- if "label_ids" in batch:
- batch["labels"] = batch["label_ids"]
- del batch["label_ids"]
- return batch
- @dataclass
- class DataCollatorForTokenClassification(DataCollatorMixin):
- """
- Data collator that will dynamically pad the inputs received, as well as the labels.
- Args:
- tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
- The tokenizer used for encoding the data.
- padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`):
- Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
- among:
- - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single
- sequence is provided).
- - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
- acceptable input length for the model if that argument is not provided.
- - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths).
- max_length (`int`, *optional*):
- Maximum length of the returned list and optionally padding length (see above).
- pad_to_multiple_of (`int`, *optional*):
- If set will pad the sequence to a multiple of the provided value.
- This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
- 7.5 (Volta).
- label_pad_token_id (`int`, *optional*, defaults to -100):
- The id to use when padding the labels (-100 will be automatically ignore by PyTorch loss functions).
- return_tensors (`str`, *optional*, defaults to `"pt"`):
- The type of Tensor to return. Allowable values are "np", "pt" and "tf".
- """
- tokenizer: PreTrainedTokenizerBase
- padding: Union[bool, str, PaddingStrategy] = True
- max_length: Optional[int] = None
- pad_to_multiple_of: Optional[int] = None
- label_pad_token_id: int = -100
- return_tensors: str = "pt"
- def torch_call(self, features):
- import torch
- label_name = "label" if "label" in features[0].keys() else "labels"
- labels = [feature[label_name] for feature in features] if label_name in features[0].keys() else None
- no_labels_features = [{k: v for k, v in feature.items() if k != label_name} for feature in features]
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer,
- no_labels_features,
- padding=self.padding,
- max_length=self.max_length,
- pad_to_multiple_of=self.pad_to_multiple_of,
- return_tensors="pt",
- )
- if labels is None:
- return batch
- sequence_length = batch["input_ids"].shape[1]
- padding_side = self.tokenizer.padding_side
- def to_list(tensor_or_iterable):
- if isinstance(tensor_or_iterable, torch.Tensor):
- return tensor_or_iterable.tolist()
- return list(tensor_or_iterable)
- if padding_side == "right":
- batch[label_name] = [
- to_list(label) + [self.label_pad_token_id] * (sequence_length - len(label)) for label in labels
- ]
- else:
- batch[label_name] = [
- [self.label_pad_token_id] * (sequence_length - len(label)) + to_list(label) for label in labels
- ]
- batch[label_name] = torch.tensor(batch[label_name], dtype=torch.int64)
- return batch
- def tf_call(self, features):
- import tensorflow as tf
- label_name = "label" if "label" in features[0].keys() else "labels"
- labels = [feature[label_name] for feature in features] if label_name in features[0].keys() else None
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer,
- features,
- padding=self.padding,
- max_length=self.max_length,
- pad_to_multiple_of=self.pad_to_multiple_of,
- # Conversion to tensors will fail if we have labels as they are not of the same length yet.
- return_tensors="tf" if labels is None else None,
- )
- if labels is None:
- return batch
- sequence_length = tf.convert_to_tensor(batch["input_ids"]).shape[1]
- padding_side = self.tokenizer.padding_side
- if padding_side == "right":
- batch["labels"] = [
- list(label) + [self.label_pad_token_id] * (sequence_length - len(label)) for label in labels
- ]
- else:
- batch["labels"] = [
- [self.label_pad_token_id] * (sequence_length - len(label)) + list(label) for label in labels
- ]
- batch = {k: tf.convert_to_tensor(v, dtype=tf.int64) for k, v in batch.items()}
- return batch
- def numpy_call(self, features):
- label_name = "label" if "label" in features[0].keys() else "labels"
- labels = [feature[label_name] for feature in features] if label_name in features[0].keys() else None
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer,
- features,
- padding=self.padding,
- max_length=self.max_length,
- pad_to_multiple_of=self.pad_to_multiple_of,
- # Conversion to tensors will fail if we have labels as they are not of the same length yet.
- return_tensors="np" if labels is None else None,
- )
- if labels is None:
- return batch
- sequence_length = np.array(batch["input_ids"]).shape[1]
- padding_side = self.tokenizer.padding_side
- if padding_side == "right":
- batch["labels"] = [
- list(label) + [self.label_pad_token_id] * (sequence_length - len(label)) for label in labels
- ]
- else:
- batch["labels"] = [
- [self.label_pad_token_id] * (sequence_length - len(label)) + list(label) for label in labels
- ]
- batch = {k: np.array(v, dtype=np.int64) for k, v in batch.items()}
- return batch
- def _torch_collate_batch(examples, tokenizer, pad_to_multiple_of: Optional[int] = None):
- """Collate `examples` into a batch, using the information in `tokenizer` for padding if necessary."""
- import torch
- # Tensorize if necessary.
- if isinstance(examples[0], (list, tuple, np.ndarray)):
- examples = [torch.tensor(e, dtype=torch.long) for e in examples]
- length_of_first = examples[0].size(0)
- # Check if padding is necessary.
- are_tensors_same_length = all(x.size(0) == length_of_first for x in examples)
- if are_tensors_same_length and (pad_to_multiple_of is None or length_of_first % pad_to_multiple_of == 0):
- if not isinstance(examples, torch.Tensor):
- return torch.stack(examples, dim=0)
- # If yes, check if we have a `pad_token`.
- if tokenizer._pad_token is None:
- raise ValueError(
- "You are attempting to pad samples but the tokenizer you are using"
- f" ({tokenizer.__class__.__name__}) does not have a pad token."
- )
- # Creating the full tensor and filling it with our data.
- max_length = max(x.size(0) for x in examples)
- if pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0):
- max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of
- result = examples[0].new_full([len(examples), max_length], tokenizer.pad_token_id)
- for i, example in enumerate(examples):
- if tokenizer.padding_side == "right":
- result[i, : example.shape[0]] = example
- else:
- result[i, -example.shape[0] :] = example
- return result
- def _tf_collate_batch(examples, tokenizer, pad_to_multiple_of: Optional[int] = None):
- import tensorflow as tf
- """Collate `examples` into a batch, using the information in `tokenizer` for padding if necessary."""
- # Tensorize if necessary.
- if isinstance(examples[0], (list, tuple)):
- examples = [tf.convert_to_tensor(e, dtype=tf.int64) for e in examples]
- # Check if padding is necessary.
- length_of_first = len(examples[0])
- are_tensors_same_length = all(len(x) == length_of_first for x in examples)
- if are_tensors_same_length and (pad_to_multiple_of is None or length_of_first % pad_to_multiple_of == 0):
- return tf.stack(examples, axis=0)
- # If yes, check if we have a `pad_token`.
- if tokenizer._pad_token is None:
- raise ValueError(
- "You are attempting to pad samples but the tokenizer you are using"
- f" ({tokenizer.__class__.__name__}) does not have a pad token."
- )
- # Creating the full tensor and filling it with our data.
- max_length = max(len(x) for x in examples)
- if pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0):
- max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of
- # result = examples[0].new_full([len(examples), max_length], tokenizer.pad_token_id)
- result = []
- rank = tf.rank(examples[0])
- paddings = np.zeros((rank, 2), dtype=np.int32)
- for example in examples:
- if tokenizer.padding_side == "right":
- paddings[0, 1] = max_length - len(example)
- else:
- paddings[0, 0] = max_length - len(example)
- result.append(tf.pad(example, paddings, constant_values=tokenizer.pad_token_id))
- return tf.stack(result, axis=0)
- def _numpy_collate_batch(examples, tokenizer, pad_to_multiple_of: Optional[int] = None):
- """Collate `examples` into a batch, using the information in `tokenizer` for padding if necessary."""
- # Tensorize if necessary.
- if isinstance(examples[0], (list, tuple)):
- examples = [np.array(e, dtype=np.int64) for e in examples]
- # Check if padding is necessary.
- length_of_first = len(examples[0])
- are_tensors_same_length = all(len(x) == length_of_first for x in examples)
- if are_tensors_same_length and (pad_to_multiple_of is None or length_of_first % pad_to_multiple_of == 0):
- return np.stack(examples, axis=0)
- # If yes, check if we have a `pad_token`.
- if tokenizer._pad_token is None:
- raise ValueError(
- "You are attempting to pad samples but the tokenizer you are using"
- f" ({tokenizer.__class__.__name__}) does not have a pad token."
- )
- # Creating the full tensor and filling it with our data.
- max_length = max(len(x) for x in examples)
- if pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0):
- max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of
- result = np.full(shape=(len(examples), max_length), fill_value=tokenizer.pad_token_id, dtype=examples[0].dtype)
- for i, example in enumerate(examples):
- if tokenizer.padding_side == "right":
- result[i, : example.shape[0]] = example
- else:
- result[i, -example.shape[0] :] = example
- return result
- def tolist(x):
- if isinstance(x, list):
- return x
- elif hasattr(x, "numpy"): # Checks for TF tensors without needing the import
- x = x.numpy()
- return x.tolist()
- @dataclass
- class DataCollatorForSeq2Seq:
- """
- Data collator that will dynamically pad the inputs received, as well as the labels.
- Args:
- tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
- The tokenizer used for encoding the data.
- model ([`PreTrainedModel`], *optional*):
- The model that is being trained. If set and has the *prepare_decoder_input_ids_from_labels*, use it to
- prepare the *decoder_input_ids*
- This is useful when using *label_smoothing* to avoid calculating loss twice.
- padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`):
- Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
- among:
- - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single
- sequence is provided).
- - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
- acceptable input length for the model if that argument is not provided.
- - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths).
- max_length (`int`, *optional*):
- Maximum length of the returned list and optionally padding length (see above).
- pad_to_multiple_of (`int`, *optional*):
- If set will pad the sequence to a multiple of the provided value.
- This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
- 7.5 (Volta).
- label_pad_token_id (`int`, *optional*, defaults to -100):
- The id to use when padding the labels (-100 will be automatically ignored by PyTorch loss functions).
- return_tensors (`str`, *optional*, defaults to `"pt"`):
- The type of Tensor to return. Allowable values are "np", "pt" and "tf".
- """
- tokenizer: PreTrainedTokenizerBase
- model: Optional[Any] = None
- padding: Union[bool, str, PaddingStrategy] = True
- max_length: Optional[int] = None
- pad_to_multiple_of: Optional[int] = None
- label_pad_token_id: int = -100
- return_tensors: str = "pt"
- def __call__(self, features, return_tensors=None):
- if return_tensors is None:
- return_tensors = self.return_tensors
- label_name = "label" if "label" in features[0].keys() else "labels"
- labels = [feature[label_name] for feature in features] if label_name in features[0].keys() else None
- # reconvert list[None] to None if necessary
- # this might occur when we pass {..., "labels": None}
- if labels is not None and all(label is None for label in labels):
- labels = None
- non_labels_features = [{k: v for k, v in feature.items() if k != label_name} for feature in features]
- # run through tokenizer without labels to ensure no side effects
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer,
- non_labels_features,
- padding=self.padding,
- max_length=self.max_length,
- pad_to_multiple_of=self.pad_to_multiple_of,
- return_tensors=return_tensors,
- )
- # we have to pad the labels manually as we cannot rely on `tokenizer.pad` and we need them to be of the same length to return tensors
- no_padding = self.padding is False or self.padding == PaddingStrategy.DO_NOT_PAD
- if labels is not None:
- if no_padding:
- if isinstance(features[0][label_name], list):
- batch["labels"] = list(labels)
- else:
- batch["labels"] = [np.concatenate([label, []]) for label in labels]
- else:
- max_padding = self.padding == PaddingStrategy.MAX_LENGTH and self.max_length is not None
- max_label_length = max(len(l) for l in labels) if not max_padding else self.max_length
- if self.pad_to_multiple_of is not None:
- max_label_length = (
- (max_label_length + self.pad_to_multiple_of - 1)
- // self.pad_to_multiple_of
- * self.pad_to_multiple_of
- )
- padding_side = self.tokenizer.padding_side
- if isinstance(features[0][label_name], list):
- batch["labels"] = [
- label + [self.label_pad_token_id] * (max_label_length - len(label))
- if padding_side == "right"
- else [self.label_pad_token_id] * (max_label_length - len(label)) + label
- for label in labels
- ]
- else:
- batch["labels"] = [
- np.concatenate(
- [
- label,
- np.array([self.label_pad_token_id] * (max_label_length - len(label)), dtype=np.int64),
- ]
- )
- if padding_side == "right"
- else np.concatenate(
- [
- np.array([self.label_pad_token_id] * (max_label_length - len(label)), dtype=np.int64),
- label,
- ]
- )
- for label in labels
- ]
- # reintroduce side effects via tokenizer that return respective datatypes for the `return_tensors` argument
- if batch.get("labels", None) is not None:
- if return_tensors == "pt":
- import torch
- batch["labels"] = torch.tensor(batch["labels"], dtype=torch.int64)
- elif return_tensors == "tf":
- import tensorflow as tf
- batch["labels"] = tf.constant(batch["labels"], dtype=tf.int64)
- else:
- batch["labels"] = np.array(batch["labels"], dtype=np.int64)
- else:
- batch["labels"] = None
- # prepare decoder_input_ids
- if (
- labels is not None
- and self.model is not None
- and hasattr(self.model, "prepare_decoder_input_ids_from_labels")
- ):
- decoder_input_ids = self.model.prepare_decoder_input_ids_from_labels(labels=batch["labels"])
- batch["decoder_input_ids"] = decoder_input_ids
- return batch
- @dataclass
- class DataCollatorForLanguageModeling(DataCollatorMixin):
- """
- Data collator used for language modeling. Inputs are dynamically padded to the maximum length of a batch if they
- are not all of the same length.
- Args:
- tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
- The tokenizer used for encoding the data.
- mlm (`bool`, *optional*, defaults to `True`):
- Whether or not to use masked language modeling. If set to `False`, the labels are the same as the inputs
- with the padding tokens ignored (by setting them to -100). Otherwise, the labels are -100 for non-masked
- tokens and the value to predict for the masked token.
- mlm_probability (`float`, *optional*, defaults to 0.15):
- The probability with which to (randomly) mask tokens in the input, when `mlm` is set to `True`.
- pad_to_multiple_of (`int`, *optional*):
- If set will pad the sequence to a multiple of the provided value.
- return_tensors (`str`):
- The type of Tensor to return. Allowable values are "np", "pt" and "tf".
- <Tip>
- For best performance, this data collator should be used with a dataset having items that are dictionaries or
- BatchEncoding, with the `"special_tokens_mask"` key, as returned by a [`PreTrainedTokenizer`] or a
- [`PreTrainedTokenizerFast`] with the argument `return_special_tokens_mask=True`.
- </Tip>"""
- tokenizer: PreTrainedTokenizerBase
- mlm: bool = True
- mlm_probability: float = 0.15
- pad_to_multiple_of: Optional[int] = None
- tf_experimental_compile: bool = False
- return_tensors: str = "pt"
- def __post_init__(self):
- if self.mlm and self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for masked language modeling. "
- "You should pass `mlm=False` to train on causal language modeling instead."
- )
- if self.tf_experimental_compile:
- import tensorflow as tf
- self.tf_mask_tokens = tf.function(self.tf_mask_tokens, jit_compile=True)
- @staticmethod
- def tf_bernoulli(shape, probability):
- import tensorflow as tf
- prob_matrix = tf.fill(shape, probability)
- return tf.cast(prob_matrix - tf.random.uniform(shape, 0, 1) >= 0, tf.bool)
- def tf_mask_tokens(
- self, inputs: Any, vocab_size, mask_token_id, special_tokens_mask: Optional[Any] = None
- ) -> Tuple[Any, Any]:
- """
- Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
- """
- import tensorflow as tf
- mask_token_id = tf.cast(mask_token_id, inputs.dtype)
- input_shape = tf.shape(inputs)
- # 1 for a special token, 0 for a normal token in the special tokens mask
- # We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
- masked_indices = self.tf_bernoulli(input_shape, self.mlm_probability) & ~special_tokens_mask
- # Replace unmasked indices with -100 in the labels since we only compute loss on masked tokens
- labels = tf.where(masked_indices, inputs, -100)
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = self.tf_bernoulli(input_shape, 0.8) & masked_indices
- inputs = tf.where(indices_replaced, mask_token_id, inputs)
- # 10% of the time, we replace masked input tokens with random word
- indices_random = self.tf_bernoulli(input_shape, 0.5) & masked_indices & ~indices_replaced
- random_words = tf.random.uniform(input_shape, maxval=vocab_size, dtype=inputs.dtype)
- inputs = tf.where(indices_random, random_words, inputs)
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels
- def tf_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- import tensorflow as tf
- # Handle dict or lists with proper padding and conversion to tensor.
- if isinstance(examples[0], Mapping):
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer, examples, return_tensors="tf", pad_to_multiple_of=self.pad_to_multiple_of
- )
- else:
- batch = {
- "input_ids": _tf_collate_batch(examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- }
- # If special token mask has been preprocessed, pop it from the dict.
- special_tokens_mask = batch.pop("special_tokens_mask", None)
- if self.mlm:
- if special_tokens_mask is None:
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True)
- for val in batch["input_ids"].numpy().tolist()
- ]
- # Cannot directly create as bool
- special_tokens_mask = tf.cast(tf.convert_to_tensor(special_tokens_mask, dtype=tf.int64), tf.bool)
- else:
- special_tokens_mask = tf.cast(special_tokens_mask, tf.bool)
- batch["input_ids"], batch["labels"] = self.tf_mask_tokens(
- tf.cast(batch["input_ids"], tf.int64),
- special_tokens_mask=special_tokens_mask,
- mask_token_id=self.tokenizer.mask_token_id,
- vocab_size=len(self.tokenizer),
- )
- else:
- labels = batch["input_ids"]
- if self.tokenizer.pad_token_id is not None:
- # Replace self.tokenizer.pad_token_id with -100
- labels = tf.where(labels == self.tokenizer.pad_token_id, -100, labels)
- else:
- labels = tf.identity(labels) # Makes a copy, just in case
- batch["labels"] = labels
- return batch
- def torch_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- # Handle dict or lists with proper padding and conversion to tensor.
- if isinstance(examples[0], Mapping):
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer, examples, return_tensors="pt", pad_to_multiple_of=self.pad_to_multiple_of
- )
- else:
- batch = {
- "input_ids": _torch_collate_batch(examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- }
- # If special token mask has been preprocessed, pop it from the dict.
- special_tokens_mask = batch.pop("special_tokens_mask", None)
- if self.mlm:
- batch["input_ids"], batch["labels"] = self.torch_mask_tokens(
- batch["input_ids"], special_tokens_mask=special_tokens_mask
- )
- else:
- labels = batch["input_ids"].clone()
- if self.tokenizer.pad_token_id is not None:
- labels[labels == self.tokenizer.pad_token_id] = -100
- batch["labels"] = labels
- return batch
- def torch_mask_tokens(self, inputs: Any, special_tokens_mask: Optional[Any] = None) -> Tuple[Any, Any]:
- """
- Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
- """
- import torch
- labels = inputs.clone()
- # We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
- probability_matrix = torch.full(labels.shape, self.mlm_probability)
- if special_tokens_mask is None:
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
- ]
- special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool)
- else:
- special_tokens_mask = special_tokens_mask.bool()
- probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
- masked_indices = torch.bernoulli(probability_matrix).bool()
- labels[~masked_indices] = -100 # We only compute loss on masked tokens
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
- inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)
- # 10% of the time, we replace masked input tokens with random word
- indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
- random_words = torch.randint(len(self.tokenizer), labels.shape, dtype=torch.long)
- inputs[indices_random] = random_words[indices_random]
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels
- def numpy_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- # Handle dict or lists with proper padding and conversion to tensor.
- if isinstance(examples[0], Mapping):
- batch = pad_without_fast_tokenizer_warning(
- self.tokenizer, examples, return_tensors="np", pad_to_multiple_of=self.pad_to_multiple_of
- )
- else:
- batch = {
- "input_ids": _numpy_collate_batch(examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- }
- # If special token mask has been preprocessed, pop it from the dict.
- special_tokens_mask = batch.pop("special_tokens_mask", None)
- if self.mlm:
- batch["input_ids"], batch["labels"] = self.numpy_mask_tokens(
- batch["input_ids"], special_tokens_mask=special_tokens_mask
- )
- else:
- labels = np.copy(batch["input_ids"])
- if self.tokenizer.pad_token_id is not None:
- labels[labels == self.tokenizer.pad_token_id] = -100
- batch["labels"] = labels
- return batch
- def numpy_mask_tokens(self, inputs: Any, special_tokens_mask: Optional[Any] = None) -> Tuple[Any, Any]:
- """
- Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
- """
- labels = np.copy(inputs)
- # We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
- probability_matrix = np.full(labels.shape, self.mlm_probability)
- if special_tokens_mask is None:
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
- ]
- special_tokens_mask = np.array(special_tokens_mask, dtype=bool)
- else:
- special_tokens_mask = special_tokens_mask.astype(bool)
- probability_matrix[special_tokens_mask] = 0
- # Numpy doesn't have bernoulli, so we use a binomial with 1 trial
- masked_indices = np.random.binomial(1, probability_matrix, size=probability_matrix.shape).astype(bool)
- labels[~masked_indices] = -100 # We only compute loss on masked tokens
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = np.random.binomial(1, 0.8, size=labels.shape).astype(bool) & masked_indices
- inputs[indices_replaced] = self.tokenizer.mask_token_id
- # 10% of the time, we replace masked input tokens with random word
- # indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
- indices_random = (
- np.random.binomial(1, 0.5, size=labels.shape).astype(bool) & masked_indices & ~indices_replaced
- )
- random_words = np.random.randint(
- low=0, high=len(self.tokenizer), size=np.count_nonzero(indices_random), dtype=np.int64
- )
- inputs[indices_random] = random_words
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels
- @dataclass
- class DataCollatorForWholeWordMask(DataCollatorForLanguageModeling):
- """
- Data collator used for language modeling that masks entire words.
- - collates batches of tensors, honoring their tokenizer's pad_token
- - preprocesses batches for masked language modeling
- <Tip>
- This collator relies on details of the implementation of subword tokenization by [`BertTokenizer`], specifically
- that subword tokens are prefixed with *##*. For tokenizers that do not adhere to this scheme, this collator will
- produce an output that is roughly equivalent to [`.DataCollatorForLanguageModeling`].
- </Tip>"""
- def torch_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- if isinstance(examples[0], Mapping):
- input_ids = [e["input_ids"] for e in examples]
- else:
- input_ids = examples
- examples = [{"input_ids": e} for e in examples]
- batch_input = _torch_collate_batch(input_ids, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- mask_labels = []
- for e in examples:
- ref_tokens = []
- for id in tolist(e["input_ids"]):
- token = self.tokenizer._convert_id_to_token(id)
- ref_tokens.append(token)
- # For Chinese tokens, we need extra inf to mark sub-word, e.g [喜,欢]-> [喜,##欢]
- if "chinese_ref" in e:
- ref_pos = tolist(e["chinese_ref"])
- len_seq = len(e["input_ids"])
- for i in range(len_seq):
- if i in ref_pos:
- ref_tokens[i] = "##" + ref_tokens[i]
- mask_labels.append(self._whole_word_mask(ref_tokens))
- batch_mask = _torch_collate_batch(mask_labels, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- inputs, labels = self.torch_mask_tokens(batch_input, batch_mask)
- return {"input_ids": inputs, "labels": labels}
- def tf_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- import tensorflow as tf
- if isinstance(examples[0], Mapping):
- input_ids = [e["input_ids"] for e in examples]
- else:
- input_ids = examples
- examples = [{"input_ids": e} for e in examples]
- batch_input = _tf_collate_batch(input_ids, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- mask_labels = []
- for e in examples:
- ref_tokens = []
- for id in tolist(e["input_ids"]):
- token = self.tokenizer._convert_id_to_token(id)
- ref_tokens.append(token)
- # For Chinese tokens, we need extra inf to mark sub-word, e.g [喜,欢]-> [喜,##欢]
- if "chinese_ref" in e:
- ref_pos = tolist(e["chinese_ref"])
- len_seq = len(e["input_ids"])
- for i in range(len_seq):
- if i in ref_pos:
- ref_tokens[i] = "##" + ref_tokens[i]
- mask_labels.append(self._whole_word_mask(ref_tokens))
- batch_mask = _tf_collate_batch(mask_labels, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- inputs, labels = self.tf_mask_tokens(tf.cast(batch_input, tf.int64), batch_mask)
- return {"input_ids": inputs, "labels": labels}
- def numpy_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- if isinstance(examples[0], Mapping):
- input_ids = [e["input_ids"] for e in examples]
- else:
- input_ids = examples
- examples = [{"input_ids": e} for e in examples]
- batch_input = _numpy_collate_batch(input_ids, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- mask_labels = []
- for e in examples:
- ref_tokens = []
- for id in tolist(e["input_ids"]):
- token = self.tokenizer._convert_id_to_token(id)
- ref_tokens.append(token)
- # For Chinese tokens, we need extra inf to mark sub-word, e.g [喜,欢]-> [喜,##欢]
- if "chinese_ref" in e:
- ref_pos = tolist(e["chinese_ref"])
- len_seq = len(e["input_ids"])
- for i in range(len_seq):
- if i in ref_pos:
- ref_tokens[i] = "##" + ref_tokens[i]
- mask_labels.append(self._whole_word_mask(ref_tokens))
- batch_mask = _numpy_collate_batch(mask_labels, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
- inputs, labels = self.numpy_mask_tokens(batch_input, batch_mask)
- return {"input_ids": inputs, "labels": labels}
- def _whole_word_mask(self, input_tokens: List[str], max_predictions=512):
- """
- Get 0/1 labels for masked tokens with whole word mask proxy
- """
- if not isinstance(self.tokenizer, (BertTokenizer, BertTokenizerFast)):
- warnings.warn(
- "DataCollatorForWholeWordMask is only suitable for BertTokenizer-like tokenizers. "
- "Please refer to the documentation for more information."
- )
- cand_indexes = []
- for i, token in enumerate(input_tokens):
- if token == "[CLS]" or token == "[SEP]":
- continue
- if len(cand_indexes) >= 1 and token.startswith("##"):
- cand_indexes[-1].append(i)
- else:
- cand_indexes.append([i])
- random.shuffle(cand_indexes)
- num_to_predict = min(max_predictions, max(1, int(round(len(input_tokens) * self.mlm_probability))))
- masked_lms = []
- covered_indexes = set()
- for index_set in cand_indexes:
- if len(masked_lms) >= num_to_predict:
- break
- # If adding a whole-word mask would exceed the maximum number of
- # predictions, then just skip this candidate.
- if len(masked_lms) + len(index_set) > num_to_predict:
- continue
- is_any_index_covered = False
- for index in index_set:
- if index in covered_indexes:
- is_any_index_covered = True
- break
- if is_any_index_covered:
- continue
- for index in index_set:
- covered_indexes.add(index)
- masked_lms.append(index)
- if len(covered_indexes) != len(masked_lms):
- raise ValueError("Length of covered_indexes is not equal to length of masked_lms.")
- mask_labels = [1 if i in covered_indexes else 0 for i in range(len(input_tokens))]
- return mask_labels
- def torch_mask_tokens(self, inputs: Any, mask_labels: Any) -> Tuple[Any, Any]:
- """
- Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original. Set
- 'mask_labels' means we use whole word mask (wwm), we directly mask idxs according to it's ref.
- """
- import torch
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for masked language modeling. Remove the"
- " --mlm flag if you want to use this tokenizer."
- )
- labels = inputs.clone()
- # We sample a few tokens in each sequence for masked-LM training (with probability args.mlm_probability defaults to 0.15 in Bert/RoBERTa)
- probability_matrix = mask_labels
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
- ]
- probability_matrix.masked_fill_(torch.tensor(special_tokens_mask, dtype=torch.bool), value=0.0)
- if self.tokenizer._pad_token is not None:
- padding_mask = labels.eq(self.tokenizer.pad_token_id)
- probability_matrix.masked_fill_(padding_mask, value=0.0)
- masked_indices = probability_matrix.bool()
- labels[~masked_indices] = -100 # We only compute loss on masked tokens
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
- inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)
- # 10% of the time, we replace masked input tokens with random word
- indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
- random_words = torch.randint(len(self.tokenizer), labels.shape, dtype=torch.long)
- inputs[indices_random] = random_words[indices_random]
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels
- def tf_mask_tokens(self, inputs: Any, mask_labels: Any) -> Tuple[Any, Any]:
- """
- Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original. Set
- 'mask_labels' means we use whole word mask (wwm), we directly mask idxs according to it's ref.
- """
- import tensorflow as tf
- input_shape = tf.shape(inputs)
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for masked language modeling. Remove the"
- " --mlm flag if you want to use this tokenizer."
- )
- labels = tf.identity(inputs)
- # We sample a few tokens in each sequence for masked-LM training (with probability args.mlm_probability defaults to 0.15 in Bert/RoBERTa)
- masked_indices = tf.cast(mask_labels, tf.bool)
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels
- ]
- masked_indices = masked_indices & ~tf.cast(special_tokens_mask, dtype=tf.bool)
- if self.tokenizer._pad_token is not None:
- padding_mask = inputs == self.tokenizer.pad_token_id
- masked_indices = masked_indices & ~padding_mask
- # Replace unmasked indices with -100 in the labels since we only compute loss on masked tokens
- labels = tf.where(masked_indices, inputs, -100)
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = self.tf_bernoulli(input_shape, 0.8) & masked_indices
- inputs = tf.where(indices_replaced, self.tokenizer.mask_token_id, inputs)
- # 10% of the time, we replace masked input tokens with random word
- indices_random = self.tf_bernoulli(input_shape, 0.5) & masked_indices & ~indices_replaced
- random_words = tf.random.uniform(input_shape, maxval=len(self.tokenizer), dtype=tf.int64)
- inputs = tf.where(indices_random, random_words, inputs)
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels
- def numpy_mask_tokens(self, inputs: Any, mask_labels: Any) -> Tuple[Any, Any]:
- """
- Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original. Set
- 'mask_labels' means we use whole word mask (wwm), we directly mask idxs according to it's ref.
- """
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for masked language modeling. Remove the"
- " --mlm flag if you want to use this tokenizer."
- )
- labels = np.copy(inputs)
- # We sample a few tokens in each sequence for masked-LM training (with probability args.mlm_probability defaults to 0.15 in Bert/RoBERTa)
- masked_indices = mask_labels.astype(bool)
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
- ]
- masked_indices[np.array(special_tokens_mask, dtype=bool)] = 0
- if self.tokenizer._pad_token is not None:
- padding_mask = labels == self.tokenizer.pad_token_id
- masked_indices[padding_mask] = 0
- labels[~masked_indices] = -100 # We only compute loss on masked tokens
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = np.random.binomial(1, 0.8, size=labels.shape).astype(bool) & masked_indices
- inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)
- # 10% of the time, we replace masked input tokens with random word
- # indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
- indices_random = (
- np.random.binomial(1, 0.5, size=labels.shape).astype(bool) & masked_indices & ~indices_replaced
- )
- random_words = np.random.randint(low=0, high=len(self.tokenizer), size=labels.shape, dtype=np.int64)
- inputs[indices_random] = random_words[indices_random]
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels
- @dataclass
- class DataCollatorForSOP(DataCollatorForLanguageModeling):
- """
- Data collator used for sentence order prediction task.
- - collates batches of tensors, honoring their tokenizer's pad_token
- - preprocesses batches for both masked language modeling and sentence order prediction
- """
- def __init__(self, *args, **kwargs):
- warnings.warn(
- "DataCollatorForSOP is deprecated and will be removed in a future version, you can now use "
- "DataCollatorForLanguageModeling instead.",
- FutureWarning,
- )
- def __call__(self, examples: List[Dict[str, Any]]) -> Dict[str, Any]:
- import torch
- from torch.nn.utils.rnn import pad_sequence
- input_ids = [example["input_ids"] for example in examples]
- input_ids = _torch_collate_batch(input_ids, self.tokenizer)
- input_ids, labels, attention_mask = self.mask_tokens(input_ids)
- token_type_ids = [example["token_type_ids"] for example in examples]
- # size of segment_ids varied because randomness, padding zero to the end as the original implementation
- token_type_ids = pad_sequence(token_type_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id)
- sop_label_list = [example["sentence_order_label"] for example in examples]
- sentence_order_label = torch.stack(sop_label_list)
- return {
- "input_ids": input_ids,
- "labels": labels,
- "attention_mask": attention_mask,
- "token_type_ids": token_type_ids,
- "sentence_order_label": sentence_order_label,
- }
- def mask_tokens(self, inputs: Any) -> Tuple[Any, Any, Any]:
- """
- Prepare masked tokens inputs/labels/attention_mask for masked language modeling: 80% MASK, 10% random, 10%
- original. N-gram not applied yet.
- """
- import torch
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for masked language modeling. Remove the"
- " --mlm flag if you want to use this tokenizer."
- )
- labels = inputs.clone()
- # We sample a few tokens in each sequence for masked-LM training (with probability args.mlm_probability defaults to 0.15 in Bert/RoBERTa)
- probability_matrix = torch.full(labels.shape, self.mlm_probability)
- special_tokens_mask = [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
- ]
- probability_matrix.masked_fill_(torch.tensor(special_tokens_mask, dtype=torch.bool), value=0.0)
- if self.tokenizer._pad_token is not None:
- padding_mask = labels.eq(self.tokenizer.pad_token_id)
- probability_matrix.masked_fill_(padding_mask, value=0.0)
- masked_indices = torch.bernoulli(probability_matrix).bool()
- # probability be `1` (masked), however in albert model attention mask `0` means masked, revert the value
- attention_mask = (~masked_indices).float()
- if self.tokenizer._pad_token is not None:
- attention_padding_mask = labels.eq(self.tokenizer.pad_token_id)
- attention_mask.masked_fill_(attention_padding_mask, value=1.0)
- labels[~masked_indices] = -100 # We only compute loss on masked tokens, -100 is default for CE compute
- # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
- indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
- inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)
- # 10% of the time, we replace masked input tokens with random word
- indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
- random_words = torch.randint(len(self.tokenizer), labels.shape, dtype=torch.long)
- inputs[indices_random] = random_words[indices_random]
- # The rest of the time (10% of the time) we keep the masked input tokens unchanged
- return inputs, labels, attention_mask
- @dataclass
- class DataCollatorForPermutationLanguageModeling(DataCollatorMixin):
- """
- Data collator used for permutation language modeling.
- - collates batches of tensors, honoring their tokenizer's pad_token
- - preprocesses batches for permutation language modeling with procedures specific to XLNet
- """
- tokenizer: PreTrainedTokenizerBase
- plm_probability: float = 1 / 6
- max_span_length: int = 5 # maximum length of a span of masked tokens
- return_tensors: str = "pt"
- def torch_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- if isinstance(examples[0], Mapping):
- examples = [e["input_ids"] for e in examples]
- batch = _torch_collate_batch(examples, self.tokenizer)
- inputs, perm_mask, target_mapping, labels = self.torch_mask_tokens(batch)
- return {"input_ids": inputs, "perm_mask": perm_mask, "target_mapping": target_mapping, "labels": labels}
- def tf_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- if isinstance(examples[0], Mapping):
- examples = [e["input_ids"] for e in examples]
- batch = _tf_collate_batch(examples, self.tokenizer)
- inputs, perm_mask, target_mapping, labels = self.tf_mask_tokens(batch)
- return {"input_ids": inputs, "perm_mask": perm_mask, "target_mapping": target_mapping, "labels": labels}
- def numpy_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
- if isinstance(examples[0], Mapping):
- examples = [e["input_ids"] for e in examples]
- batch = _numpy_collate_batch(examples, self.tokenizer)
- inputs, perm_mask, target_mapping, labels = self.numpy_mask_tokens(batch)
- return {"input_ids": inputs, "perm_mask": perm_mask, "target_mapping": target_mapping, "labels": labels}
- def torch_mask_tokens(self, inputs: Any) -> Tuple[Any, Any, Any, Any]:
- """
- The masked tokens to be predicted for a particular sequence are determined by the following algorithm:
- 0. Start from the beginning of the sequence by setting `cur_len = 0` (number of tokens processed so far).
- 1. Sample a `span_length` from the interval `[1, max_span_length]` (length of span of tokens to be masked)
- 2. Reserve a context of length `context_length = span_length / plm_probability` to surround span to be
- masked
- 3. Sample a starting point `start_index` from the interval `[cur_len, cur_len + context_length -
- span_length]` and mask tokens `start_index:start_index + span_length`
- 4. Set `cur_len = cur_len + context_length`. If `cur_len < max_len` (i.e. there are tokens remaining in the
- sequence to be processed), repeat from Step 1.
- """
- import torch
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for permutation language modeling."
- " Please add a mask token if you want to use this tokenizer."
- )
- if inputs.size(1) % 2 != 0:
- raise ValueError(
- "This collator requires that sequence lengths be even to create a leakage-free perm_mask. Please see"
- " relevant comments in source code for details."
- )
- labels = inputs.clone()
- # Creating the mask and target_mapping tensors
- masked_indices = torch.full(labels.shape, 0, dtype=torch.bool)
- target_mapping = torch.zeros((labels.size(0), labels.size(1), labels.size(1)), dtype=torch.float32)
- for i in range(labels.size(0)):
- # Start from the beginning of the sequence by setting `cur_len = 0` (number of tokens processed so far).
- cur_len = 0
- max_len = labels.size(1)
- while cur_len < max_len:
- # Sample a `span_length` from the interval `[1, max_span_length]` (length of span of tokens to be masked)
- span_length = torch.randint(1, self.max_span_length + 1, (1,)).item()
- # Reserve a context of length `context_length = span_length / plm_probability` to surround the span to be masked
- context_length = int(span_length / self.plm_probability)
- # Sample a starting point `start_index` from the interval `[cur_len, cur_len + context_length - span_length]` and mask tokens `start_index:start_index + span_length`
- start_index = cur_len + torch.randint(context_length - span_length + 1, (1,)).item()
- masked_indices[i, start_index : start_index + span_length] = 1
- # Set `cur_len = cur_len + context_length`
- cur_len += context_length
- # Since we're replacing non-masked tokens with -100 in the labels tensor instead of skipping them altogether,
- # the i-th predict corresponds to the i-th token.
- target_mapping[i] = torch.eye(labels.size(1))
- special_tokens_mask = torch.tensor(
- [self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()],
- dtype=torch.bool,
- )
- masked_indices.masked_fill_(special_tokens_mask, value=0.0)
- if self.tokenizer._pad_token is not None:
- padding_mask = labels.eq(self.tokenizer.pad_token_id)
- masked_indices.masked_fill_(padding_mask, value=0.0)
- # Mask indicating non-functional tokens, where functional tokens are [SEP], [CLS], padding, etc.
- non_func_mask = ~(padding_mask | special_tokens_mask)
- inputs[masked_indices] = self.tokenizer.mask_token_id
- labels[~masked_indices] = -100 # We only compute loss on masked tokens
- perm_mask = torch.zeros((labels.size(0), labels.size(1), labels.size(1)), dtype=torch.float32)
- for i in range(labels.size(0)):
- # Generate permutation indices i.e. sample a random factorisation order for the sequence. This will
- # determine which tokens a given token can attend to (encoded in `perm_mask`).
- # Note: Length of token sequence being permuted has to be less than or equal to reused sequence length
- # (see documentation for `mems`), otherwise information may leak through due to reuse. In this implementation,
- # we assume that reused length is half of sequence length and permutation length is equal to reused length.
- # This requires that the sequence length be even.
- # Create a linear factorisation order
- perm_index = torch.arange(labels.size(1))
- # Split this into two halves, assuming that half the sequence is reused each time
- perm_index = perm_index.reshape((-1, labels.size(1) // 2)).transpose(0, 1)
- # Permute the two halves such that they do not cross over
- perm_index = perm_index[torch.randperm(labels.size(1) // 2)]
- # Flatten this out into the desired permuted factorisation order
- perm_index = torch.flatten(perm_index.transpose(0, 1))
- # Set the permutation indices of non-masked (non-functional) tokens to the
- # smallest index (-1) so that:
- # (1) They can be seen by all other positions
- # (2) They cannot see masked positions, so there won't be information leak
- perm_index.masked_fill_(~masked_indices[i] & non_func_mask[i], -1)
- # The logic for whether the i-th token can attend on the j-th token based on the factorisation order:
- # 0 (can attend): If perm_index[i] > perm_index[j] or j is neither masked nor a functional token
- # 1 (cannot attend): If perm_index[i] <= perm_index[j] and j is either masked or a functional token
- perm_mask[i] = (
- perm_index.reshape((labels.size(1), 1)) <= perm_index.reshape((1, labels.size(1)))
- ) & masked_indices[i]
- return inputs.long(), perm_mask, target_mapping, labels.long()
- def tf_mask_tokens(self, inputs: Any) -> Tuple[Any, Any, Any, Any]:
- """
- The masked tokens to be predicted for a particular sequence are determined by the following algorithm:
- 0. Start from the beginning of the sequence by setting `cur_len = 0` (number of tokens processed so far).
- 1. Sample a `span_length` from the interval `[1, max_span_length]` (length of span of tokens to be masked)
- 2. Reserve a context of length `context_length = span_length / plm_probability` to surround span to be
- masked
- 3. Sample a starting point `start_index` from the interval `[cur_len, cur_len + context_length -
- span_length]` and mask tokens `start_index:start_index + span_length`
- 4. Set `cur_len = cur_len + context_length`. If `cur_len < max_len` (i.e. there are tokens remaining in the
- sequence to be processed), repeat from Step 1.
- """
- import tensorflow as tf
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for permutation language modeling."
- " Please add a mask token if you want to use this tokenizer."
- )
- if tf.shape(inputs)[1] % 2 != 0:
- raise ValueError(
- "This collator requires that sequence lengths be even to create a leakage-free perm_mask. Please see"
- " relevant comments in source code for details."
- )
- labels = tf.identity(inputs)
- # Creating the mask and target_mapping tensors
- masked_indices = np.full(labels.shape.as_list(), 0, dtype=bool)
- labels_shape = tf.shape(labels)
- target_mapping = np.zeros((labels_shape[0], labels_shape[1], labels_shape[1]), dtype=np.float32)
- for i in range(len(labels)):
- # Start from the beginning of the sequence by setting `cur_len = 0` (number of tokens processed so far).
- cur_len = 0
- max_len = tf.shape(labels)[1]
- while cur_len < max_len:
- # Sample a `span_length` from the interval `[1, max_span_length]` (length of span of tokens to be masked)
- span_length = randint(1, self.max_span_length + 1)
- # Reserve a context of length `context_length = span_length / plm_probability` to surround the span to be masked
- context_length = int(span_length / self.plm_probability)
- # Sample a starting point `start_index` from the interval `[cur_len, cur_len + context_length - span_length]` and mask tokens `start_index:start_index + span_length`
- start_index = cur_len + randint(0, context_length - span_length + 1)
- masked_indices[i, start_index : start_index + span_length] = 1
- # Set `cur_len = cur_len + context_length`
- cur_len += context_length
- # Since we're replacing non-masked tokens with -100 in the labels tensor instead of skipping them altogether,
- # the i-th predict corresponds to the i-th token.
- target_mapping[i] = np.eye(labels_shape[1])
- masked_indices = tf.cast(tf.convert_to_tensor(masked_indices), dtype=tf.bool)
- target_mapping = tf.convert_to_tensor(target_mapping)
- special_tokens_mask = tf.convert_to_tensor(
- [
- self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True)
- for val in labels.numpy().tolist()
- ],
- )
- special_tokens_mask = tf.cast(special_tokens_mask, dtype=tf.bool)
- masked_indices = masked_indices & ~special_tokens_mask
- if self.tokenizer._pad_token is not None:
- padding_mask = labels == self.tokenizer.pad_token_id
- masked_indices = masked_indices & ~padding_mask
- # Mask indicating non-functional tokens, where functional tokens are [SEP], [CLS], padding, etc.
- non_func_mask = ~(padding_mask | special_tokens_mask)
- inputs = tf.where(masked_indices, self.tokenizer.mask_token_id, inputs)
- labels = tf.where(masked_indices, labels, -100) # We only compute loss on masked tokens
- perm_mask = []
- for i in range(len(labels)):
- # Generate permutation indices i.e. sample a random factorisation order for the sequence. This will
- # determine which tokens a given token can attend to (encoded in `perm_mask`).
- # Note: Length of token sequence being permuted has to be less than or equal to reused sequence length
- # (see documentation for `mems`), otherwise information may leak through due to reuse. In this implementation,
- # we assume that reused length is half of sequence length and permutation length is equal to reused length.
- # This requires that the sequence length be even.
- # Create a linear factorisation order
- # tf.range is the equivalent of torch.arange
- perm_index = tf.range(labels_shape[1])
- # Split this into two halves, assuming that half the sequence is reused each time
- perm_index = tf.transpose(tf.reshape(perm_index, (-1, labels_shape[1] // 2)))
- # Permute the two halves such that they do not cross over
- perm_index = tf.random.shuffle(perm_index) # Shuffles along the first dimension
- # Flatten this out into the desired permuted factorisation order
- perm_index = tf.reshape(tf.transpose(perm_index), (-1,))
- # Set the permutation indices of non-masked (non-functional) tokens to the
- # smallest index (-1) so that:
- # (1) They can be seen by all other positions
- # (2) They cannot see masked positions, so there won't be information leak
- perm_index = tf.where(~masked_indices[i] & non_func_mask[i], -1, perm_index)
- # The logic for whether the i-th token can attend on the j-th token based on the factorisation order:
- # 0 (can attend): If perm_index[i] > perm_index[j] or j is neither masked nor a functional token
- # 1 (cannot attend): If perm_index[i] <= perm_index[j] and j is either masked or a functional token
- perm_mask.append(
- (tf.reshape(perm_index, (labels_shape[1], 1)) <= tf.reshape(perm_index, (1, labels_shape[1])))
- & masked_indices[i]
- )
- perm_mask = tf.stack(perm_mask, axis=0)
- return tf.cast(inputs, tf.int64), tf.cast(perm_mask, tf.float32), target_mapping, tf.cast(labels, tf.int64)
- def numpy_mask_tokens(self, inputs: Any) -> Tuple[Any, Any, Any, Any]:
- """
- The masked tokens to be predicted for a particular sequence are determined by the following algorithm:
- 0. Start from the beginning of the sequence by setting `cur_len = 0` (number of tokens processed so far).
- 1. Sample a `span_length` from the interval `[1, max_span_length]` (length of span of tokens to be masked)
- 2. Reserve a context of length `context_length = span_length / plm_probability` to surround span to be
- masked
- 3. Sample a starting point `start_index` from the interval `[cur_len, cur_len + context_length -
- span_length]` and mask tokens `start_index:start_index + span_length`
- 4. Set `cur_len = cur_len + context_length`. If `cur_len < max_len` (i.e. there are tokens remaining in the
- sequence to be processed), repeat from Step 1.
- """
- if self.tokenizer.mask_token is None:
- raise ValueError(
- "This tokenizer does not have a mask token which is necessary for permutation language modeling."
- " Please add a mask token if you want to use this tokenizer."
- )
- if inputs.shape[1] % 2 != 0:
- raise ValueError(
- "This collator requires that sequence lengths be even to create a leakage-free perm_mask. Please see"
- " relevant comments in source code for details."
- )
- labels = np.copy(inputs)
- # Creating the mask and target_mapping tensors
- masked_indices = np.full(labels.shape, 0, dtype=bool)
- target_mapping = np.zeros((labels.shape[0], labels.shape[1], labels.shape[1]), dtype=np.float32)
- for i in range(labels.shape[0]):
- # Start from the beginning of the sequence by setting `cur_len = 0` (number of tokens processed so far).
- cur_len = 0
- max_len = labels.shape[1]
- while cur_len < max_len:
- # Sample a `span_length` from the interval `[1, max_span_length]` (length of span of tokens to be masked)
- span_length = randint(1, self.max_span_length + 1)
- # Reserve a context of length `context_length = span_length / plm_probability` to surround the span to be masked
- context_length = int(span_length / self.plm_probability)
- # Sample a starting point `start_index` from the interval `[cur_len, cur_len + context_length - span_length]` and mask tokens `start_index:start_index + span_length`
- start_index = cur_len + randint(0, context_length - span_length + 1)
- masked_indices[i, start_index : start_index + span_length] = 1
- # Set `cur_len = cur_len + context_length`
- cur_len += context_length
- # Since we're replacing non-masked tokens with -100 in the labels tensor instead of skipping them altogether,
- # the i-th predict corresponds to the i-th token.
- target_mapping[i] = np.eye(labels.shape[1])
- special_tokens_mask = np.array(
- [self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()],
- dtype=bool,
- )
- masked_indices[special_tokens_mask] = 0
- if self.tokenizer._pad_token is not None:
- padding_mask = labels == self.tokenizer.pad_token_id
- masked_indices[padding_mask] = 0.0
- # Mask indicating non-functional tokens, where functional tokens are [SEP], [CLS], padding, etc.
- non_func_mask = ~(padding_mask | special_tokens_mask)
- inputs[masked_indices] = self.tokenizer.mask_token_id
- labels[~masked_indices] = -100 # We only compute loss on masked tokens
- perm_mask = np.zeros((labels.shape[0], labels.shape[1], labels.shape[1]), dtype=np.float32)
- for i in range(labels.shape[0]):
- # Generate permutation indices i.e. sample a random factorisation order for the sequence. This will
- # determine which tokens a given token can attend to (encoded in `perm_mask`).
- # Note: Length of token sequence being permuted has to be less than or equal to reused sequence length
- # (see documentation for `mems`), otherwise information may leak through due to reuse. In this implementation,
- # we assume that reused length is half of sequence length and permutation length is equal to reused length.
- # This requires that the sequence length be even.
- # Create a linear factorisation order
- perm_index = np.arange(labels.shape[1])
- # Split this into two halves, assuming that half the sequence is reused each time
- perm_index = perm_index.reshape((-1, labels.shape[1] // 2)).T
- # Permute the two halves such that they do not cross over
- np.random.shuffle(perm_index)
- # Flatten this out into the desired permuted factorisation order
- perm_index = perm_index.T.flatten()
- # Set the permutation indices of non-masked (non-functional) tokens to the
- # smallest index (-1) so that:
- # (1) They can be seen by all other positions
- # (2) They cannot see masked positions, so there won't be information leak
- perm_index[~masked_indices[i] & non_func_mask[i]] = -1
- # The logic for whether the i-th token can attend on the j-th token based on the factorisation order:
- # 0 (can attend): If perm_index[i] > perm_index[j] or j is neither masked nor a functional token
- # 1 (cannot attend): If perm_index[i] <= perm_index[j] and j is either masked or a functional token
- perm_mask[i] = (
- perm_index.reshape((labels.shape[1], 1)) <= perm_index.reshape((1, labels.shape[1]))
- ) & masked_indices[i]
- return inputs.astype(np.int64), perm_mask, target_mapping, labels.astype(np.int64)
- @dataclass
- class DataCollatorWithFlattening(DefaultDataCollator):
- """
- Data collator used for padding free approach. Does the following:
- - concatate the entire mini batch into single long sequence [1, total_tokens]
- - uses `separator_id` to separate sequences within the concatenated `labels`, default value is -100
- - no padding will be added, returns `input_ids`, `labels` and `position_ids`
- """
- def __init__(self, *args, return_position_ids=True, separator_id=-100, **kwargs):
- super().__init__(*args, **kwargs)
- self.return_position_ids = return_position_ids
- self.separator_id = separator_id
- warnings.warn(
- "Using `DataCollatorWithFlattening` will flatten the entire mini batch into single long sequence."
- "Make sure your attention computation is able to handle it!"
- )
- def __call__(self, features, return_tensors=None, separator_id=None):
- if return_tensors is None:
- return_tensors = self.return_tensors
- if separator_id is None:
- separator_id = self.separator_id
- is_labels_provided = "labels" in features[0]
- ret = {"input_ids": [], "labels": []}
- if self.return_position_ids:
- ret.update({"position_ids": []})
- for idx in range(0, len(features)):
- ret["input_ids"] += features[idx]["input_ids"]
- if is_labels_provided:
- ret["labels"] += [separator_id] + features[idx]["labels"][1:]
- else:
- ret["labels"] += [separator_id] + features[idx]["input_ids"][1:]
- if self.return_position_ids:
- ret["position_ids"] += list(range(len(features[idx]["input_ids"])))
- return default_data_collator([ret], return_tensors)
|