| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111 |
- # coding=utf-8
- # Copyright 2023 Microsoft Research and The HuggingFace Inc. team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """PyTorch KOSMOS-2 model."""
- import math
- from dataclasses import dataclass
- from typing import Any, List, Optional, Tuple, Union
- import torch
- import torch.utils.checkpoint
- from torch import nn
- from torch.nn import CrossEntropyLoss
- from ...activations import ACT2FN
- from ...generation import GenerationMixin
- from ...modeling_outputs import (
- BaseModelOutput,
- BaseModelOutputWithPastAndCrossAttentions,
- BaseModelOutputWithPooling,
- CausalLMOutputWithCrossAttentions,
- )
- from ...modeling_utils import PreTrainedModel
- from ...utils import (
- ModelOutput,
- add_start_docstrings,
- add_start_docstrings_to_model_forward,
- logging,
- replace_return_docstrings,
- torch_int,
- )
- from .configuration_kosmos2 import Kosmos2Config, Kosmos2TextConfig, Kosmos2VisionConfig
- logger = logging.get_logger(__name__)
- _CONFIG_FOR_DOC = Kosmos2Config
- def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None):
- """
- Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`.
- """
- bsz, src_len = mask.size()
- tgt_len = tgt_len if tgt_len is not None else src_len
- expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype)
- inverted_mask = 1.0 - expanded_mask
- return inverted_mask.masked_fill(inverted_mask.to(torch.bool), torch.finfo(dtype).min)
- def _make_causal_mask(
- input_ids_shape: torch.Size, dtype: torch.dtype, device: torch.device, past_key_values_length: int = 0
- ):
- """
- Make causal mask used for bi-directional self-attention.
- """
- bsz, tgt_len = input_ids_shape
- mask = torch.full((tgt_len, tgt_len), torch.finfo(dtype).min, device=device)
- mask_cond = torch.arange(mask.size(-1), device=device)
- mask.masked_fill_(mask_cond < (mask_cond + 1).view(mask.size(-1), 1), 0)
- mask = mask.to(dtype)
- if past_key_values_length > 0:
- mask = torch.cat([torch.zeros(tgt_len, past_key_values_length, dtype=dtype, device=device), mask], dim=-1)
- return mask[None, None, :, :].expand(bsz, 1, tgt_len, tgt_len + past_key_values_length)
- # Copied from transformers.models.roberta.modeling_roberta.create_position_ids_from_input_ids
- def create_position_ids_from_input_ids(input_ids, padding_idx, past_key_values_length=0):
- """
- Replace non-padding symbols with their position numbers. Position numbers begin at padding_idx+1. Padding symbols
- are ignored. This is modified from fairseq's `utils.make_positions`.
- Args:
- x: torch.Tensor x:
- Returns: torch.Tensor
- """
- # The series of casts and type-conversions here are carefully balanced to both work with ONNX export and XLA.
- mask = input_ids.ne(padding_idx).int()
- incremental_indices = (torch.cumsum(mask, dim=1).type_as(mask) + past_key_values_length) * mask
- return incremental_indices.long() + padding_idx
- KOSMOS2_START_DOCSTRING = r"""
- This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
- library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
- etc.)
- This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
- Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
- and behavior.
- Parameters:
- config ([`Kosmos2Config`]): Model configuration class with all the parameters of the model.
- Initializing with a config file does not load the weights associated with the model, only the
- configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
- """
- KOSMOS2_VISION_INPUTS_DOCSTRING = r"""
- Args:
- pixel_values (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)`):
- Pixel values. Pixel values can be obtained using [`AutoImageProcessor`]. See
- [`CLIPImageProcessor.__call__`] for details.
- output_attentions (`bool`, *optional*):
- Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
- tensors for more detail.
- output_hidden_states (`bool`, *optional*):
- Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
- more detail.
- interpolate_pos_encoding (`bool`, *optional*, defaults `False`):
- Whether to interpolate the pre-trained position encodings.
- return_dict (`bool`, *optional*):
- Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
- """
- KOSMOS2_TEXT_INPUTS_DOCSTRING = r"""
- Args:
- input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
- Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide
- it.
- Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
- [`PreTrainedTokenizer.__call__`] for details.
- [What are input IDs?](../glossary#input-ids)
- attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
- Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- - 1 for tokens that are **not masked**,
- - 0 for tokens that are **masked**.
- [What are attention masks?](../glossary#attention-mask)
- image_embeds: (`torch.FloatTensor` of shape `(batch_size, latent_query_num, hidden_size)`, *optional*):
- Sequence of hidden-states at the output of `Kosmos2ImageToTextProjection`.
- image_embeds_position_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
- Mask to indicate the location in a sequence to insert the image features . Mask values selected in `[0,
- 1]`:
- - 1 for places where to put the image features,
- - 0 for places that are not for image features (i.e. for text tokens).
- encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
- Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if
- the model is configured as a decoder.
- encoder_attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in
- the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`:
- - 1 for tokens that are **not masked**,
- - 0 for tokens that are **masked**.
- head_mask (`torch.FloatTensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*):
- Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`:
- - 1 indicates the head is **not masked**,
- - 0 indicates the head is **masked**.
- cross_attn_head_mask (`torch.Tensor` of shape `(decoder_layers, decoder_attention_heads)`, *optional*):
- Mask to nullify selected heads of the cross-attention modules. Mask values selected in `[0, 1]`:
- - 1 indicates the head is **not masked**,
- - 0 indicates the head is **masked**.
- past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
- Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding.
- If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that
- don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all
- `decoder_input_ids` of shape `(batch_size, sequence_length)`.
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
- is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
- model's internal embedding lookup matrix.
- position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
- config.max_position_embeddings - 1]`.
- [What are position IDs?](../glossary#position-ids)
- use_cache (`bool`, *optional*):
- If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
- `past_key_values`).
- output_attentions (`bool`, *optional*):
- Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
- tensors for more detail.
- output_hidden_states (`bool`, *optional*):
- Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
- more detail.
- return_dict (`bool`, *optional*):
- Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
- """
- KOSMOS2_INPUTS_DOCSTRING = r"""
- Args:
- pixel_values (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)`):
- Pixel values. Pixel values can be obtained using [`AutoImageProcessor`]. See
- [`CLIPImageProcessor.__call__`] for details.
- input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
- Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide
- it.
- Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
- [`PreTrainedTokenizer.__call__`] for details.
- [What are input IDs?](../glossary#input-ids)
- image_embeds_position_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
- Mask to indicate the location in a sequence to insert the image features . Mask values selected in `[0,
- 1]`:
- - 1 for places where to put the image features,
- - 0 for places that are not for image features (i.e. for text tokens).
- attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
- Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- - 1 for tokens that are **not masked**,
- - 0 for tokens that are **masked**.
- [What are attention masks?](../glossary#attention-mask)
- head_mask (`torch.FloatTensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*):
- Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`:
- - 1 indicates the head is **not masked**,
- - 0 indicates the head is **masked**.
- past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
- Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding.
- If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that
- don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all
- `decoder_input_ids` of shape `(batch_size, sequence_length)`.
- image_embeds: (`torch.FloatTensor` of shape `(batch_size, latent_query_num, hidden_size)`, *optional*):
- Sequence of hidden-states at the output of `Kosmos2ImageToTextProjection`.
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
- is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
- model's internal embedding lookup matrix.
- position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
- config.max_position_embeddings - 1]`.
- [What are position IDs?](../glossary#position-ids)
- use_cache (`bool`, *optional*):
- If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
- `past_key_values`).
- output_attentions (`bool`, *optional*):
- Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
- tensors for more detail.
- output_hidden_states (`bool`, *optional*):
- Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
- more detail.
- interpolate_pos_encoding (`bool`, *optional*, defaults `False`):
- Whether to interpolate the pre-trained position encodings.
- return_dict (`bool`, *optional*):
- Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
- """
- @dataclass
- class Kosmos2ModelOutput(ModelOutput):
- """
- Base class for text model's outputs that also contains a pooling of the last hidden states.
- Args:
- last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
- Sequence of hidden-states at the output of the last layer of the model.
- hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
- Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
- one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
- Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
- attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
- Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
- sequence_length)`.
- Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
- heads.
- image_embeds (`torch.FloatTensor` of shape `(batch_size, latent_query_num, hidden_size)`, *optional*):
- Sequence of hidden-states at the output of `Kosmos2ImageToTextProjection`.
- projection_attentions (`tuple(torch.FloatTensor)`, *optional*):
- Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
- sequence_length)`.
- Attentions weights given by `Kosmos2ImageToTextProjection`, after the attention softmax, used to compute
- the weighted average in the self-attention heads.
- vision_model_output(`BaseModelOutputWithPooling`, *optional*):
- The output of the [`Kosmos2VisionModel`].
- past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
- Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
- `(batch_size, num_heads, sequence_length, embed_size_per_head)`) and optionally if
- `config.is_encoder_decoder=True` 2 additional tensors of shape `(batch_size, num_heads,
- encoder_sequence_length, embed_size_per_head)`.
- Contains pre-computed hidden-states (key and values in the self-attention blocks and optionally if
- `config.is_encoder_decoder=True` in the cross-attention blocks) that can be used (see `past_key_values`
- input) to speed up sequential decoding.
- """
- last_hidden_state: torch.FloatTensor = None
- past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
- hidden_states: Optional[Tuple[torch.FloatTensor]] = None
- attentions: Optional[Tuple[torch.FloatTensor]] = None
- image_embeds: Optional[torch.FloatTensor] = None
- projection_attentions: Optional[Tuple[torch.FloatTensor]] = None
- vision_model_output: BaseModelOutputWithPooling = None
- def to_tuple(self) -> Tuple[Any]:
- return tuple(
- self[k] if k not in ["text_model_output", "vision_model_output"] else getattr(self, k).to_tuple()
- for k in self.keys()
- )
- @dataclass
- class Kosmos2ForConditionalGenerationModelOutput(ModelOutput):
- """
- Model output class for `Kosmos2ForConditionalGeneration`.
- Args:
- loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided):
- Language modeling loss (for next-token prediction).
- logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`):
- Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
- hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
- Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
- one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
- Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
- attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
- Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
- sequence_length)`.
- Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
- heads.
- image_embeds (`torch.FloatTensor` of shape `(batch_size, latent_query_num, hidden_size)`, *optional*):
- Sequence of hidden-states at the output of `Kosmos2ImageToTextProjection`.
- projection_attentions (`tuple(torch.FloatTensor)`, *optional*):
- Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
- sequence_length)`.
- Attentions weights given by `Kosmos2ImageToTextProjection`, after the attention softmax, used to compute
- the weighted average in the self-attention heads.
- vision_model_output(`BaseModelOutputWithPooling`, *optional*):
- The output of the [`Kosmos2VisionModel`].
- past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
- Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
- `(batch_size, num_heads, sequence_length, embed_size_per_head)`) and optionally if
- `config.is_encoder_decoder=True` 2 additional tensors of shape `(batch_size, num_heads,
- encoder_sequence_length, embed_size_per_head)`.
- Contains pre-computed hidden-states (key and values in the self-attention blocks and optionally if
- `config.is_encoder_decoder=True` in the cross-attention blocks) that can be used (see `past_key_values`
- input) to speed up sequential decoding.
- """
- loss: Optional[torch.FloatTensor] = None
- logits: torch.FloatTensor = None
- past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
- hidden_states: Optional[Tuple[torch.FloatTensor]] = None
- attentions: Optional[Tuple[torch.FloatTensor]] = None
- image_embeds: Optional[torch.FloatTensor] = None
- projection_attentions: Optional[Tuple[torch.FloatTensor]] = None
- vision_model_output: BaseModelOutputWithPooling = None
- def to_tuple(self) -> Tuple[Any]:
- return tuple(
- self[k] if k not in ["text_model_output", "vision_model_output"] else getattr(self, k).to_tuple()
- for k in self.keys()
- )
- # Copied from transformers.models.clip.modeling_clip.CLIPVisionEmbeddings with CLIP->Kosmos2
- class Kosmos2VisionEmbeddings(nn.Module):
- def __init__(self, config: Kosmos2VisionConfig):
- super().__init__()
- self.config = config
- self.embed_dim = config.hidden_size
- self.image_size = config.image_size
- self.patch_size = config.patch_size
- self.class_embedding = nn.Parameter(torch.randn(self.embed_dim))
- self.patch_embedding = nn.Conv2d(
- in_channels=config.num_channels,
- out_channels=self.embed_dim,
- kernel_size=self.patch_size,
- stride=self.patch_size,
- bias=False,
- )
- self.num_patches = (self.image_size // self.patch_size) ** 2
- self.num_positions = self.num_patches + 1
- self.position_embedding = nn.Embedding(self.num_positions, self.embed_dim)
- self.register_buffer("position_ids", torch.arange(self.num_positions).expand((1, -1)), persistent=False)
- def interpolate_pos_encoding(self, embeddings: torch.Tensor, height: int, width: int) -> torch.Tensor:
- """
- This method allows to interpolate the pre-trained position encodings, to be able to use the model on higher resolution
- images. This method is also adapted to support torch.jit tracing.
- Adapted from:
- - https://github.com/facebookresearch/dino/blob/de9ee3df6cf39fac952ab558447af1fa1365362a/vision_transformer.py#L174-L194, and
- - https://github.com/facebookresearch/dinov2/blob/e1277af2ba9496fbadf7aec6eba56e8d882d1e35/dinov2/models/vision_transformer.py#L179-L211
- """
- num_patches = embeddings.shape[1] - 1
- position_embedding = self.position_embedding.weight.unsqueeze(0)
- num_positions = position_embedding.shape[1] - 1
- # always interpolate when tracing to ensure the exported model works for dynamic input shapes
- if not torch.jit.is_tracing() and num_patches == num_positions and height == width:
- return self.position_embedding(self.position_ids)
- class_pos_embed = position_embedding[:, :1]
- patch_pos_embed = position_embedding[:, 1:]
- dim = embeddings.shape[-1]
- new_height = height // self.patch_size
- new_width = width // self.patch_size
- sqrt_num_positions = torch_int(num_positions**0.5)
- patch_pos_embed = patch_pos_embed.reshape(1, sqrt_num_positions, sqrt_num_positions, dim)
- patch_pos_embed = patch_pos_embed.permute(0, 3, 1, 2)
- patch_pos_embed = nn.functional.interpolate(
- patch_pos_embed,
- size=(new_height, new_width),
- mode="bicubic",
- align_corners=False,
- )
- patch_pos_embed = patch_pos_embed.permute(0, 2, 3, 1).view(1, -1, dim)
- return torch.cat((class_pos_embed, patch_pos_embed), dim=1)
- def forward(self, pixel_values: torch.FloatTensor, interpolate_pos_encoding=False) -> torch.Tensor:
- batch_size, _, height, width = pixel_values.shape
- if not interpolate_pos_encoding and (height != self.image_size or width != self.image_size):
- raise ValueError(
- f"Input image size ({height}*{width}) doesn't match model" f" ({self.image_size}*{self.image_size})."
- )
- target_dtype = self.patch_embedding.weight.dtype
- patch_embeds = self.patch_embedding(pixel_values.to(dtype=target_dtype)) # shape = [*, width, grid, grid]
- patch_embeds = patch_embeds.flatten(2).transpose(1, 2)
- class_embeds = self.class_embedding.expand(batch_size, 1, -1)
- embeddings = torch.cat([class_embeds, patch_embeds], dim=1)
- if interpolate_pos_encoding:
- embeddings = embeddings + self.interpolate_pos_encoding(embeddings, height, width)
- else:
- embeddings = embeddings + self.position_embedding(self.position_ids)
- return embeddings
- # Copied from transformers.models.clip.modeling_clip.CLIPAttention with CLIP->Kosmos2Vision
- class Kosmos2VisionAttention(nn.Module):
- """Multi-headed attention from 'Attention Is All You Need' paper"""
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.embed_dim = config.hidden_size
- self.num_heads = config.num_attention_heads
- self.head_dim = self.embed_dim // self.num_heads
- if self.head_dim * self.num_heads != self.embed_dim:
- raise ValueError(
- f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:"
- f" {self.num_heads})."
- )
- self.scale = self.head_dim**-0.5
- self.dropout = config.attention_dropout
- self.k_proj = nn.Linear(self.embed_dim, self.embed_dim)
- self.v_proj = nn.Linear(self.embed_dim, self.embed_dim)
- self.q_proj = nn.Linear(self.embed_dim, self.embed_dim)
- self.out_proj = nn.Linear(self.embed_dim, self.embed_dim)
- def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
- return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: Optional[torch.Tensor] = None,
- causal_attention_mask: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = False,
- ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
- """Input shape: Batch x Time x Channel"""
- bsz, tgt_len, embed_dim = hidden_states.size()
- # get query proj
- query_states = self.q_proj(hidden_states) * self.scale
- key_states = self._shape(self.k_proj(hidden_states), -1, bsz)
- value_states = self._shape(self.v_proj(hidden_states), -1, bsz)
- proj_shape = (bsz * self.num_heads, -1, self.head_dim)
- query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape)
- key_states = key_states.view(*proj_shape)
- value_states = value_states.view(*proj_shape)
- src_len = key_states.size(1)
- attn_weights = torch.bmm(query_states, key_states.transpose(1, 2))
- if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len):
- raise ValueError(
- f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is"
- f" {attn_weights.size()}"
- )
- # apply the causal_attention_mask first
- if causal_attention_mask is not None:
- if causal_attention_mask.size() != (bsz, 1, tgt_len, src_len):
- raise ValueError(
- f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is"
- f" {causal_attention_mask.size()}"
- )
- attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + causal_attention_mask
- attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
- if attention_mask is not None:
- if attention_mask.size() != (bsz, 1, tgt_len, src_len):
- raise ValueError(
- f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}"
- )
- attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + attention_mask
- attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
- attn_weights = nn.functional.softmax(attn_weights, dim=-1)
- if output_attentions:
- # this operation is a bit akward, but it's required to
- # make sure that attn_weights keeps its gradient.
- # In order to do so, attn_weights have to reshaped
- # twice and have to be reused in the following
- attn_weights_reshaped = attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
- attn_weights = attn_weights_reshaped.view(bsz * self.num_heads, tgt_len, src_len)
- else:
- attn_weights_reshaped = None
- attn_probs = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training)
- attn_output = torch.bmm(attn_probs, value_states)
- if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim):
- raise ValueError(
- f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is"
- f" {attn_output.size()}"
- )
- attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim)
- attn_output = attn_output.transpose(1, 2)
- attn_output = attn_output.reshape(bsz, tgt_len, embed_dim)
- attn_output = self.out_proj(attn_output)
- return attn_output, attn_weights_reshaped
- # Copied from transformers.models.clip.modeling_clip.CLIPMLP with CLIP->Kosmos2Vision
- class Kosmos2VisionMLP(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.activation_fn = ACT2FN[config.hidden_act]
- self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size)
- self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size)
- def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
- hidden_states = self.fc1(hidden_states)
- hidden_states = self.activation_fn(hidden_states)
- hidden_states = self.fc2(hidden_states)
- return hidden_states
- # Copied from transformers.models.altclip.modeling_altclip.AltCLIPEncoderLayer with AltCLIP->Kosmos2Vision
- class Kosmos2VisionEncoderLayer(nn.Module):
- def __init__(self, config: Kosmos2VisionConfig):
- super().__init__()
- self.embed_dim = config.hidden_size
- self.self_attn = Kosmos2VisionAttention(config)
- self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
- self.mlp = Kosmos2VisionMLP(config)
- self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: torch.Tensor,
- causal_attention_mask: torch.Tensor,
- output_attentions: Optional[bool] = False,
- ) -> Tuple[torch.FloatTensor]:
- """
- Args:
- hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)`
- attention_mask (`torch.FloatTensor`): attention mask of size
- `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
- `(config.encoder_attention_heads,)`.
- output_attentions (`bool`, *optional*):
- Whether or not to return the attentions tensors of all attention layers. See `attentions` under
- returned tensors for more detail.
- """
- residual = hidden_states
- hidden_states = self.layer_norm1(hidden_states)
- hidden_states, attn_weights = self.self_attn(
- hidden_states=hidden_states,
- attention_mask=attention_mask,
- causal_attention_mask=causal_attention_mask,
- output_attentions=output_attentions,
- )
- hidden_states = residual + hidden_states
- residual = hidden_states
- hidden_states = self.layer_norm2(hidden_states)
- hidden_states = self.mlp(hidden_states)
- hidden_states = residual + hidden_states
- outputs = (hidden_states,)
- if output_attentions:
- outputs += (attn_weights,)
- return outputs
- # Copied from transformers.models.altclip.modeling_altclip.AltCLIPEncoder with AltCLIP->Kosmos2Vision
- class Kosmos2VisionEncoder(nn.Module):
- """
- Transformer encoder consisting of `config.num_hidden_layers` self attention layers. Each layer is a
- [`Kosmos2VisionEncoderLayer`].
- Args:
- config: Kosmos2VisionConfig
- """
- def __init__(self, config: Kosmos2VisionConfig):
- super().__init__()
- self.config = config
- self.layers = nn.ModuleList([Kosmos2VisionEncoderLayer(config) for _ in range(config.num_hidden_layers)])
- self.gradient_checkpointing = False
- def forward(
- self,
- inputs_embeds,
- attention_mask: Optional[torch.Tensor] = None,
- causal_attention_mask: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, BaseModelOutput]:
- r"""
- Args:
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation.
- This is useful if you want more control over how to convert `input_ids` indices into associated vectors
- than the model's internal embedding lookup matrix.
- attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
- Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- - 1 for tokens that are **not masked**,
- - 0 for tokens that are **masked**.
- [What are attention masks?](../glossary#attention-mask)
- causal_attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
- Causal mask for the text model. Mask values selected in `[0, 1]`:
- - 1 for tokens that are **not masked**,
- - 0 for tokens that are **masked**.
- [What are attention masks?](../glossary#attention-mask)
- output_attentions (`bool`, *optional*):
- Whether or not to return the attentions tensors of all attention layers. See `attentions` under
- returned tensors for more detail.
- output_hidden_states (`bool`, *optional*):
- Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors
- for more detail.
- return_dict (`bool`, *optional*):
- Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
- """
- output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
- output_hidden_states = (
- output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
- )
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- encoder_states = () if output_hidden_states else None
- all_attentions = () if output_attentions else None
- hidden_states = inputs_embeds
- for idx, encoder_layer in enumerate(self.layers):
- if output_hidden_states:
- encoder_states = encoder_states + (hidden_states,)
- if self.gradient_checkpointing and self.training:
- layer_outputs = self._gradient_checkpointing_func(
- encoder_layer.__call__,
- hidden_states,
- attention_mask,
- causal_attention_mask,
- output_attentions,
- )
- else:
- layer_outputs = encoder_layer(
- hidden_states,
- attention_mask,
- causal_attention_mask,
- output_attentions=output_attentions,
- )
- hidden_states = layer_outputs[0]
- if output_attentions:
- all_attentions = all_attentions + (layer_outputs[1],)
- if output_hidden_states:
- encoder_states = encoder_states + (hidden_states,)
- if not return_dict:
- return tuple(v for v in [hidden_states, encoder_states, all_attentions] if v is not None)
- return BaseModelOutput(
- last_hidden_state=hidden_states, hidden_states=encoder_states, attentions=all_attentions
- )
- # Similar to `transformers.models.clip.modeling_clip.CLIPVisionTransformer` but without docstring for `forward`
- class Kosmos2VisionTransformer(nn.Module):
- # Copied from transformers.models.altclip.modeling_altclip.AltCLIPVisionTransformer.__init__ with AltCLIPVision->Kosmos2Vision,ALTCLIP_VISION->KOSMOS2_VISION,AltCLIP->Kosmos2Vision
- def __init__(self, config: Kosmos2VisionConfig):
- super().__init__()
- self.config = config
- embed_dim = config.hidden_size
- self.embeddings = Kosmos2VisionEmbeddings(config)
- self.pre_layrnorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
- self.encoder = Kosmos2VisionEncoder(config)
- self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
- def forward(
- self,
- pixel_values: Optional[torch.FloatTensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- interpolate_pos_encoding: bool = False,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, BaseModelOutputWithPooling]:
- output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
- output_hidden_states = (
- output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
- )
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- if pixel_values is None:
- raise ValueError("You have to specify pixel_values")
- hidden_states = self.embeddings(pixel_values, interpolate_pos_encoding=interpolate_pos_encoding)
- hidden_states = self.pre_layrnorm(hidden_states)
- encoder_outputs = self.encoder(
- inputs_embeds=hidden_states,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- last_hidden_state = encoder_outputs[0]
- pooled_output = last_hidden_state[:, 0, :]
- pooled_output = self.post_layernorm(pooled_output)
- if not return_dict:
- return (last_hidden_state, pooled_output) + encoder_outputs[1:]
- return BaseModelOutputWithPooling(
- last_hidden_state=last_hidden_state,
- pooler_output=pooled_output,
- hidden_states=encoder_outputs.hidden_states,
- attentions=encoder_outputs.attentions,
- )
- # Similar to `transformers.models.m2m_100.modeling_m2m_100.M2M100SinusoidalPositionalEmbedding` but allowing to pass `position_ids`
- class Kosmos2TextSinusoidalPositionalEmbedding(nn.Module):
- """This module produces sinusoidal positional embeddings of any length."""
- # Copied from transformers.models.m2m_100.modeling_m2m_100.M2M100SinusoidalPositionalEmbedding.__init__
- def __init__(self, num_positions: int, embedding_dim: int, padding_idx: Optional[int] = None):
- super().__init__()
- self.offset = 2
- self.embedding_dim = embedding_dim
- self.padding_idx = padding_idx
- self.make_weights(num_positions + self.offset, embedding_dim, padding_idx)
- # Copied from transformers.models.m2m_100.modeling_m2m_100.M2M100SinusoidalPositionalEmbedding.make_weights
- def make_weights(self, num_embeddings: int, embedding_dim: int, padding_idx: Optional[int] = None):
- emb_weights = self.get_embedding(num_embeddings, embedding_dim, padding_idx)
- if hasattr(self, "weights"):
- # in forward put the weights on the correct dtype and device of the param
- emb_weights = emb_weights.to(dtype=self.weights.dtype, device=self.weights.device)
- self.register_buffer("weights", emb_weights, persistent=False)
- @staticmethod
- # Copied from transformers.models.m2m_100.modeling_m2m_100.M2M100SinusoidalPositionalEmbedding.get_embedding
- def get_embedding(num_embeddings: int, embedding_dim: int, padding_idx: Optional[int] = None):
- """
- Build sinusoidal embeddings.
- This matches the implementation in tensor2tensor, but differs slightly from the description in Section 3.5 of
- "Attention Is All You Need".
- """
- half_dim = embedding_dim // 2
- emb = math.log(10000) / (half_dim - 1)
- emb = torch.exp(torch.arange(half_dim, dtype=torch.int64).float() * -emb)
- emb = torch.arange(num_embeddings, dtype=torch.int64).float().unsqueeze(1) * emb.unsqueeze(0)
- emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1).view(num_embeddings, -1)
- if embedding_dim % 2 == 1:
- # zero pad
- emb = torch.cat([emb, torch.zeros(num_embeddings, 1)], dim=1)
- if padding_idx is not None:
- emb[padding_idx, :] = 0
- return emb.to(torch.get_default_dtype())
- @torch.no_grad()
- def forward(
- self,
- input_ids: torch.Tensor = None,
- inputs_embeds: torch.Tensor = None,
- past_key_values_length: int = 0,
- position_ids: torch.Tensor = None,
- ):
- if input_ids is not None:
- bsz, seq_len = input_ids.size()
- if position_ids is None:
- # Create the position ids from the input token ids. Any padded tokens remain padded.
- position_ids = create_position_ids_from_input_ids(
- input_ids, self.padding_idx, past_key_values_length
- ).to(input_ids.device)
- else:
- bsz, seq_len = inputs_embeds.size()[:-1]
- if position_ids is None:
- position_ids = self.create_position_ids_from_inputs_embeds(inputs_embeds, past_key_values_length)
- # expand embeddings if needed
- max_pos = self.padding_idx + 1 + seq_len + past_key_values_length
- if max_pos > self.weights.size(0):
- self.make_weights(max_pos + self.offset, self.embedding_dim, self.padding_idx)
- return self.weights.index_select(0, position_ids.view(-1)).view(bsz, seq_len, self.weights.shape[-1]).detach()
- # Copied from transformers.models.m2m_100.modeling_m2m_100.M2M100SinusoidalPositionalEmbedding.create_position_ids_from_inputs_embeds
- def create_position_ids_from_inputs_embeds(self, inputs_embeds, past_key_values_length):
- """
- We are provided embeddings directly. We cannot infer which are padded so just generate sequential position ids.
- Args:
- inputs_embeds: torch.Tensor
- Returns: torch.Tensor
- """
- input_shape = inputs_embeds.size()[:-1]
- sequence_length = input_shape[1]
- position_ids = torch.arange(
- self.padding_idx + 1, sequence_length + self.padding_idx + 1, dtype=torch.long, device=inputs_embeds.device
- )
- return position_ids.unsqueeze(0).expand(input_shape).contiguous() + past_key_values_length
- class KosmosTextAttention(nn.Module):
- """Multi-headed attention from 'Attention Is All You Need' paper"""
- # Similar to transformers.models.bart.modeling_bart.BartAttention.__init__ except an additional `inner_attn_ln`.
- def __init__(
- self,
- config,
- embed_dim: int,
- num_heads: int,
- dropout: float = 0.0,
- is_decoder: bool = False,
- add_inner_attn_layernorm: bool = False,
- bias: bool = True,
- ):
- super().__init__()
- self.embed_dim = embed_dim
- self.num_heads = num_heads
- self.dropout = dropout
- self.head_dim = embed_dim // num_heads
- if (self.head_dim * num_heads) != self.embed_dim:
- raise ValueError(
- f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}"
- f" and `num_heads`: {num_heads})."
- )
- self.scaling = self.head_dim**-0.5
- self.is_decoder = is_decoder
- self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- # End opy
- self.inner_attn_ln = None
- if add_inner_attn_layernorm:
- self.inner_attn_ln = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
- def _shape(self, projection: torch.Tensor) -> torch.Tensor:
- new_projection_shape = projection.size()[:-1] + (self.num_heads, self.head_dim)
- # move heads to 2nd position (B, T, H * D) -> (B, T, H, D) -> (B, H, T, D)
- new_projection = projection.view(new_projection_shape).permute(0, 2, 1, 3)
- return new_projection
- def forward(
- self,
- hidden_states: torch.Tensor,
- encoder_hidden_states: Optional[torch.Tensor] = None,
- past_key_value: Optional[Tuple[torch.Tensor]] = None,
- attention_mask: Optional[torch.Tensor] = None,
- layer_head_mask: Optional[torch.Tensor] = None,
- output_attentions: bool = False,
- ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
- """Input shape: Batch x Time x Channel"""
- # if key_value_states are provided this layer is used as a cross-attention layer
- # for the decoder
- is_cross_attention = encoder_hidden_states is not None
- batch_size, seq_length = hidden_states.shape[:2]
- # use encoder_hidden_states if cross attention
- current_states = encoder_hidden_states if encoder_hidden_states is not None else hidden_states
- # checking that the `sequence_length` of the `past_key_value` is the same as the he provided
- # `encoder_hidden_states` to support prefix tuning
- if is_cross_attention and past_key_value and past_key_value[0].shape[2] == current_states.shape[1]:
- # reuse k,v, cross_attentions
- key_states = past_key_value[0]
- value_states = past_key_value[1]
- else:
- key_states = self._shape(self.k_proj(current_states))
- value_states = self._shape(self.v_proj(current_states))
- if past_key_value is not None and not is_cross_attention:
- # reuse k, v, self_attention
- key_states = torch.cat([past_key_value[0], key_states], dim=2)
- value_states = torch.cat([past_key_value[1], value_states], dim=2)
- query_states = self._shape(self.q_proj(hidden_states) * self.scaling)
- attn_weights = torch.matmul(query_states, key_states.transpose(-1, -2))
- if self.is_decoder:
- # if cross_attention save Tuple(torch.Tensor, torch.Tensor) of all cross attention key/value_states.
- # Further calls to cross_attention layer can then reuse all cross-attention
- # key/value_states (first "if" case)
- # if uni-directional self-attention (decoder) save Tuple(torch.Tensor, torch.Tensor) of
- # all previous decoder key/value_states. Further calls to uni-directional self-attention
- # can concat previous decoder key/value_states to current projected key/value_states (third "elif" case)
- # if encoder bi-directional self-attention `past_key_value` is always `None`
- past_key_value = (key_states, value_states)
- src_len = key_states.size(2)
- if attention_mask is not None:
- if attention_mask.size() != (batch_size, 1, seq_length, src_len):
- raise ValueError(
- f"Attention mask should be of size {(batch_size, 1, seq_length, src_len)}, but is {attention_mask.size()}"
- )
- attn_weights = attn_weights + attention_mask
- attn_weights = nn.functional.softmax(attn_weights, dim=-1)
- # Mask heads if we want to
- if layer_head_mask is not None:
- attn_weights = attn_weights * layer_head_mask
- attn_weights = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training)
- # attn_output = torch.bmm(attn_probs, value_states) ?
- context_states = torch.matmul(attn_weights, value_states)
- # attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim) ?
- context_states = context_states.permute(0, 2, 1, 3).contiguous().view(batch_size, seq_length, -1)
- if self.inner_attn_ln is not None:
- context_states = self.inner_attn_ln(context_states)
- attn_output = self.out_proj(context_states)
- return attn_output, attn_weights, past_key_value
- class Kosmos2TextFFN(nn.Module):
- def __init__(self, config: Kosmos2TextConfig):
- super().__init__()
- self.dropout = config.dropout
- self.activation_fn = ACT2FN[config.activation_function]
- self.activation_dropout = config.activation_dropout
- self.fc1 = nn.Linear(config.embed_dim, config.ffn_dim)
- self.fc2 = nn.Linear(config.ffn_dim, config.embed_dim)
- self.ffn_layernorm = nn.LayerNorm(config.ffn_dim, eps=config.layer_norm_eps)
- def forward(self, hidden_states):
- hidden_states = self.activation_fn(self.fc1(hidden_states))
- hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training)
- hidden_states = self.ffn_layernorm(hidden_states)
- hidden_states = self.fc2(hidden_states)
- hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
- return hidden_states
- class Kosmos2TextBlock(nn.Module):
- def __init__(self, config: Kosmos2TextConfig):
- super().__init__()
- self.embed_dim = config.embed_dim
- self.self_attn = KosmosTextAttention(
- config,
- embed_dim=self.embed_dim,
- num_heads=config.attention_heads,
- dropout=config.attention_dropout,
- is_decoder=True,
- add_inner_attn_layernorm=True,
- )
- self.dropout = config.dropout
- self.self_attn_layer_norm = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
- if config.add_cross_attention:
- self.encoder_attn = KosmosTextAttention(
- config,
- embed_dim=self.embed_dim,
- num_heads=config.attention_heads,
- dropout=config.attention_dropout,
- is_decoder=True,
- add_inner_attn_layernorm=False,
- )
- self.encoder_attn_layer_norm = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
- self.ffn = Kosmos2TextFFN(config)
- self.final_layer_norm = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: Optional[torch.Tensor] = None,
- encoder_hidden_states: Optional[torch.Tensor] = None,
- encoder_attention_mask: Optional[torch.Tensor] = None,
- layer_head_mask: Optional[torch.Tensor] = None,
- cross_attn_layer_head_mask: Optional[torch.Tensor] = None,
- past_key_value: Optional[Tuple[torch.Tensor]] = None,
- output_attentions: Optional[bool] = False,
- use_cache: Optional[bool] = True,
- ) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
- residual = hidden_states
- # Self Attention
- # decoder uni-directional self-attention cached key/values tuple is at positions 1,2
- self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
- hidden_states = self.self_attn_layer_norm(hidden_states)
- # add present self-attn cache to positions 1,2 of present_key_value tuple
- hidden_states, self_attn_weights, present_key_value = self.self_attn(
- hidden_states=hidden_states,
- past_key_value=self_attn_past_key_value,
- attention_mask=attention_mask,
- layer_head_mask=layer_head_mask,
- output_attentions=output_attentions,
- )
- hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
- hidden_states = residual + hidden_states
- # Cross-Attention Block
- cross_attn_present_key_value = None
- cross_attn_weights = None
- if encoder_hidden_states is not None:
- if not hasattr(self, "encoder_attn"):
- raise ValueError(
- f"If `encoder_hidden_states` are passed, {self} has to be instantiated with cross-attention layers"
- " by setting `config.add_cross_attention=True`"
- )
- residual = hidden_states
- hidden_states = self.encoder_attn_layer_norm(hidden_states)
- # cross_attn cached key/values tuple is at positions 3,4 of present_key_value tuple
- cross_attn_past_key_value = past_key_value[-2:] if past_key_value is not None else None
- hidden_states, cross_attn_weights, cross_attn_present_key_value = self.encoder_attn(
- hidden_states=hidden_states,
- encoder_hidden_states=encoder_hidden_states,
- attention_mask=encoder_attention_mask,
- layer_head_mask=cross_attn_layer_head_mask,
- past_key_value=cross_attn_past_key_value,
- output_attentions=output_attentions,
- )
- hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
- hidden_states = residual + hidden_states
- # add cross-attn to positions 3,4 of present_key_value tuple
- present_key_value = present_key_value + cross_attn_present_key_value
- # Fully Connected
- residual = hidden_states
- hidden_states = self.final_layer_norm(hidden_states)
- # FFN
- hidden_states = self.ffn(hidden_states)
- hidden_states = residual + hidden_states
- outputs = (hidden_states,)
- if output_attentions:
- outputs += (self_attn_weights, cross_attn_weights)
- if use_cache:
- outputs += (present_key_value,)
- return outputs
- class Kosmos2TextTransformer(nn.Module):
- """
- Transformer decoder consisting of `config.layers` layers. Each layer is a [`Kosmos2TextBlock`].
- Args:
- config: Kosmos2TextConfig
- """
- def __init__(self, config: Kosmos2TextConfig):
- super().__init__()
- self.config = config
- self.dropout = config.dropout
- self.layerdrop = config.layerdrop
- self.embed_scale = math.sqrt(config.embed_dim) if config.scale_embedding else 1.0
- self.embed_tokens = nn.Embedding(config.vocab_size, config.embed_dim, padding_idx=config.pad_token_id)
- self.embed_positions = Kosmos2TextSinusoidalPositionalEmbedding(
- num_positions=config.max_position_embeddings,
- embedding_dim=config.embed_dim,
- padding_idx=config.pad_token_id,
- )
- self.layers = nn.ModuleList([Kosmos2TextBlock(config) for _ in range(config.layers)])
- self.layer_norm = nn.LayerNorm(config.embed_dim, config.layer_norm_eps)
- self.gradient_checkpointing = False
- def _prepare_decoder_attention_mask(self, attention_mask, input_shape, inputs_embeds, past_key_values_length):
- # create causal mask
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- combined_attention_mask = None
- if input_shape[-1] > 1:
- combined_attention_mask = _make_causal_mask(
- input_shape,
- inputs_embeds.dtype,
- device=inputs_embeds.device,
- past_key_values_length=past_key_values_length,
- )
- if attention_mask is not None:
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- expanded_attn_mask = _expand_mask(attention_mask, inputs_embeds.dtype, tgt_len=input_shape[-1]).to(
- inputs_embeds.device
- )
- combined_attention_mask = (
- expanded_attn_mask if combined_attention_mask is None else expanded_attn_mask + combined_attention_mask
- )
- return combined_attention_mask
- def forward_embedding(
- self,
- input_ids,
- inputs_embeds: torch.Tensor = None,
- image_embeds: torch.Tensor = None,
- img_input_mask: torch.Tensor = None,
- past_key_values_length: int = 0,
- position_ids: torch.Tensor = None,
- ):
- # The argument `inputs_embeds` should be the one without being multiplied by `self.embed_scale`.
- if inputs_embeds is None:
- inputs_embeds = self.embed_tokens(input_ids)
- if image_embeds is not None:
- inputs_embeds[img_input_mask.to(dtype=torch.bool)] = image_embeds.to(inputs_embeds.device).view(
- -1, image_embeds.size(-1)
- )
- inputs_embeds = inputs_embeds * self.embed_scale
- # embed positions
- positions = self.embed_positions(
- input_ids=input_ids,
- inputs_embeds=inputs_embeds,
- past_key_values_length=past_key_values_length,
- position_ids=position_ids,
- )
- positions = positions.to(inputs_embeds.device)
- hidden_states = inputs_embeds + positions
- hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
- return hidden_states
- def forward(
- self,
- input_ids: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- image_embeds: Optional[torch.Tensor] = None,
- image_embeds_position_mask: Optional[torch.Tensor] = None,
- encoder_hidden_states: Optional[torch.Tensor] = None,
- encoder_attention_mask: Optional[torch.Tensor] = None,
- head_mask: Optional[torch.Tensor] = None,
- cross_attn_head_mask: Optional[torch.Tensor] = None,
- past_key_values: Optional[List[torch.FloatTensor]] = None,
- inputs_embeds: Optional[torch.Tensor] = None,
- position_ids: Optional[torch.Tensor] = None,
- use_cache: Optional[bool] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, BaseModelOutputWithPastAndCrossAttentions]:
- output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
- output_hidden_states = (
- output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
- )
- use_cache = use_cache if use_cache is not None else self.config.use_cache
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- if input_ids is not None and inputs_embeds is not None:
- raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
- elif input_ids is not None:
- input_shape = input_ids.shape
- input_ids = input_ids.view(-1, input_shape[-1])
- elif inputs_embeds is not None:
- input_shape = inputs_embeds.size()[:-1]
- else:
- raise ValueError("You have to specify either input_ids or inputs_embeds")
- # past_key_values_length
- past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0
- # We don't need img info. when `past_key_values_length` > 0
- if past_key_values_length > 0:
- image_embeds = None
- image_embeds_position_mask = None
- hidden_states = self.forward_embedding(
- input_ids=input_ids,
- inputs_embeds=inputs_embeds,
- image_embeds=image_embeds,
- img_input_mask=image_embeds_position_mask,
- past_key_values_length=past_key_values_length,
- position_ids=position_ids,
- )
- attention_mask = self._prepare_decoder_attention_mask(
- attention_mask, input_shape, hidden_states, past_key_values_length
- )
- # expand encoder attention mask
- if encoder_hidden_states is not None and encoder_attention_mask is not None:
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- encoder_attention_mask = _expand_mask(encoder_attention_mask, inputs_embeds.dtype, tgt_len=input_shape[-1])
- hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
- if self.gradient_checkpointing and self.training:
- if use_cache:
- logger.warning_once(
- "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
- )
- use_cache = False
- # decoder layers
- all_hidden_states = () if output_hidden_states else None
- all_self_attns = () if output_attentions else None
- all_cross_attentions = () if (output_attentions and encoder_hidden_states is not None) else None
- present_key_value_states = () if use_cache else None
- # check if head_mask/cross_attn_head_mask has a correct number of layers specified if desired
- for attn_mask, mask_name in zip([head_mask, cross_attn_head_mask], ["head_mask", "cross_attn_head_mask"]):
- if attn_mask is not None:
- if attn_mask.size()[0] != (len(self.layers)):
- raise ValueError(
- f"The `{mask_name}` should be specified for {len(self.layers)} layers, but it is for"
- f" {head_mask.size()[0]}."
- )
- for idx, decoder_layer in enumerate(self.layers):
- # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description)
- if output_hidden_states:
- all_hidden_states += (hidden_states,)
- if self.training:
- dropout_probability = torch.rand([])
- if dropout_probability < self.layerdrop:
- continue
- past_key_value = past_key_values[idx] if past_key_values is not None else None
- if self.gradient_checkpointing and self.training:
- layer_outputs = self._gradient_checkpointing_func(
- decoder_layer.__call__,
- hidden_states,
- attention_mask,
- encoder_hidden_states,
- encoder_attention_mask,
- head_mask[idx] if head_mask is not None else None,
- cross_attn_head_mask[idx] if cross_attn_head_mask is not None else None,
- None,
- output_attentions,
- use_cache,
- )
- else:
- layer_outputs = decoder_layer(
- hidden_states,
- attention_mask=attention_mask,
- encoder_hidden_states=encoder_hidden_states,
- encoder_attention_mask=encoder_attention_mask,
- layer_head_mask=(head_mask[idx] if head_mask is not None else None),
- cross_attn_layer_head_mask=(
- cross_attn_head_mask[idx] if cross_attn_head_mask is not None else None
- ),
- past_key_value=past_key_value,
- output_attentions=output_attentions,
- use_cache=use_cache,
- )
- hidden_states = layer_outputs[0]
- if use_cache:
- present_key_value_states += (layer_outputs[3 if output_attentions else 1],)
- if output_attentions:
- all_self_attns += (layer_outputs[1],)
- if encoder_hidden_states is not None:
- all_cross_attentions += (layer_outputs[2],)
- # add final layer norm
- hidden_states = self.layer_norm(hidden_states)
- # add hidden states from the last decoder layer
- if output_hidden_states:
- all_hidden_states += (hidden_states,)
- if not return_dict:
- return tuple(
- v
- for v in [
- hidden_states,
- present_key_value_states,
- all_hidden_states,
- all_self_attns,
- all_cross_attentions,
- ]
- if v is not None
- )
- return BaseModelOutputWithPastAndCrossAttentions(
- last_hidden_state=hidden_states,
- past_key_values=present_key_value_states,
- hidden_states=all_hidden_states,
- attentions=all_self_attns,
- cross_attentions=all_cross_attentions,
- )
- class Kosmos2PreTrainedModel(PreTrainedModel):
- """
- An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
- models.
- """
- config_class = Kosmos2Config
- supports_gradient_checkpointing = True
- _no_split_modules = ["Kosmos2VisionEncoderLayer", "Kosmos2TextBlock"]
- def _init_weights(self, module):
- """Initialize the weights"""
- if isinstance(self, Kosmos2VisionModel):
- factor = self.config.initializer_factor
- elif isinstance(self, (Kosmos2Model, Kosmos2ForConditionalGeneration)):
- factor = self.config.vision_config.initializer_factor
- if isinstance(self, (Kosmos2TextModel, Kosmos2TextForCausalLM)):
- std = self.config.init_std
- elif isinstance(self, (Kosmos2Model, Kosmos2ForConditionalGeneration)):
- std = self.config.text_config.init_std
- if isinstance(module, Kosmos2VisionEmbeddings):
- nn.init.normal_(module.class_embedding, mean=0.0, std=module.embed_dim**-0.5 * factor)
- nn.init.normal_(module.patch_embedding.weight, std=module.config.initializer_range * factor)
- nn.init.normal_(module.position_embedding.weight, std=module.config.initializer_range * factor)
- elif isinstance(module, Kosmos2VisionAttention):
- in_proj_std = (module.embed_dim**-0.5) * ((2 * module.config.num_hidden_layers) ** -0.5) * factor
- out_proj_std = (module.embed_dim**-0.5) * factor
- nn.init.normal_(module.q_proj.weight, std=in_proj_std)
- nn.init.normal_(module.k_proj.weight, std=in_proj_std)
- nn.init.normal_(module.v_proj.weight, std=in_proj_std)
- nn.init.normal_(module.out_proj.weight, std=out_proj_std)
- if module.q_proj.bias is not None:
- module.q_proj.bias.data.zero_()
- if module.k_proj.bias is not None:
- module.k_proj.bias.data.zero_()
- if module.v_proj.bias is not None:
- module.v_proj.bias.data.zero_()
- if module.out_proj.bias is not None:
- module.out_proj.bias.data.zero_()
- elif isinstance(module, Kosmos2VisionMLP):
- in_proj_std = (module.config.hidden_size**-0.5) * ((2 * module.config.num_hidden_layers) ** -0.5) * factor
- fc_std = (2 * module.config.hidden_size) ** -0.5 * factor
- nn.init.normal_(module.fc1.weight, std=fc_std)
- nn.init.normal_(module.fc2.weight, std=in_proj_std)
- if module.fc1.bias is not None:
- module.fc1.bias.data.zero_()
- if module.fc2.bias is not None:
- module.fc2.bias.data.zero_()
- elif isinstance(module, Kosmos2VisionEncoderLayer):
- module.layer_norm1.bias.data.zero_()
- module.layer_norm1.weight.data.fill_(1.0)
- module.layer_norm2.bias.data.zero_()
- module.layer_norm2.weight.data.fill_(1.0)
- elif isinstance(module, Kosmos2VisionTransformer):
- module.pre_layrnorm.bias.data.zero_()
- module.pre_layrnorm.weight.data.fill_(1.0)
- module.post_layernorm.bias.data.zero_()
- module.post_layernorm.weight.data.fill_(1.0)
- elif isinstance(module, KosmosTextAttention):
- nn.init.normal_(module.q_proj.weight, std=std)
- nn.init.normal_(module.k_proj.weight, std=std)
- nn.init.normal_(module.v_proj.weight, std=std)
- nn.init.normal_(module.out_proj.weight, std=std)
- if module.q_proj.bias is not None:
- module.q_proj.bias.data.zero_()
- if module.k_proj.bias is not None:
- module.k_proj.bias.data.zero_()
- if module.v_proj.bias is not None:
- module.v_proj.bias.data.zero_()
- if module.out_proj.bias is not None:
- module.out_proj.bias.data.zero_()
- elif isinstance(module, Kosmos2TextFFN):
- nn.init.normal_(module.fc1.weight, std=std)
- nn.init.normal_(module.fc2.weight, std=std)
- if module.fc1.bias is not None:
- module.fc1.bias.data.zero_()
- if module.fc2.bias is not None:
- module.fc2.bias.data.zero_()
- elif isinstance(module, Kosmos2TextForCausalLM):
- nn.init.normal_(module.lm_head.weight, std=std)
- if module.lm_head.bias is not None:
- module.lm_head.bias.data.zero_()
- elif isinstance(module, Kosmos2ImageToTextProjection):
- nn.init.normal_(module.dense.weight, std=std)
- if module.dense.bias is not None:
- module.dense.bias.data.zero_()
- elif isinstance(module, Kosmos2TextTransformer):
- module.embed_tokens.weight.data.normal_(mean=0.0, std=std)
- if module.embed_tokens.padding_idx is not None:
- module.embed_tokens.weight.data[module.embed_tokens.padding_idx].zero_()
- class Kosmos2VisionModel(Kosmos2PreTrainedModel):
- config_class = Kosmos2VisionConfig
- main_input_name = "pixel_values"
- # Copied from transformers.models.clip.modeling_clip.CLIPVisionModel.__init__ with CLIP_VISION->KOSMOS2_VISION,CLIP->Kosmos2,self.vision_model->self.model
- def __init__(self, config: Kosmos2VisionConfig):
- super().__init__(config)
- self.model = Kosmos2VisionTransformer(config)
- # Initialize weights and apply final processing
- self.post_init()
- # Copied from transformers.models.clip.modeling_clip.CLIPVisionModel.get_input_embeddings with CLIP_VISION->KOSMOS2_VISION,CLIP->Kosmos2,self.vision_model->self.model
- def get_input_embeddings(self) -> nn.Module:
- return self.model.embeddings.patch_embedding
- @add_start_docstrings_to_model_forward(KOSMOS2_VISION_INPUTS_DOCSTRING)
- @replace_return_docstrings(output_type=BaseModelOutputWithPooling, config_class=Kosmos2VisionConfig)
- def forward(
- self,
- pixel_values: Optional[torch.FloatTensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- interpolate_pos_encoding: bool = False,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, BaseModelOutputWithPooling]:
- r"""
- Returns:
- """
- return self.model(
- pixel_values=pixel_values,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- interpolate_pos_encoding=interpolate_pos_encoding,
- return_dict=return_dict,
- )
- class Kosmos2TextModel(Kosmos2PreTrainedModel):
- config_class = Kosmos2TextConfig
- def __init__(self, config: Kosmos2TextConfig):
- super().__init__(config)
- self.model = Kosmos2TextTransformer(config)
- # Initialize weights and apply final processing
- self.post_init()
- def get_input_embeddings(self) -> nn.Module:
- return self.model.embed_tokens
- def set_input_embeddings(self, value):
- self.model.embed_tokens = value
- @add_start_docstrings_to_model_forward(KOSMOS2_TEXT_INPUTS_DOCSTRING)
- @replace_return_docstrings(output_type=BaseModelOutputWithPastAndCrossAttentions, config_class=Kosmos2TextConfig)
- def forward(
- self,
- input_ids: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- image_embeds: Optional[torch.Tensor] = None,
- image_embeds_position_mask: Optional[torch.Tensor] = None,
- encoder_hidden_states: Optional[torch.Tensor] = None,
- encoder_attention_mask: Optional[torch.Tensor] = None,
- head_mask: Optional[torch.Tensor] = None,
- cross_attn_head_mask: Optional[torch.Tensor] = None,
- past_key_values: Optional[List[torch.FloatTensor]] = None,
- inputs_embeds: Optional[torch.Tensor] = None,
- position_ids: Optional[torch.Tensor] = None,
- use_cache: Optional[bool] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, BaseModelOutputWithPastAndCrossAttentions]:
- r"""
- Returns:
- """
- return self.model(
- input_ids=input_ids,
- attention_mask=attention_mask,
- image_embeds=image_embeds,
- image_embeds_position_mask=image_embeds_position_mask,
- encoder_hidden_states=encoder_hidden_states,
- encoder_attention_mask=encoder_attention_mask,
- head_mask=head_mask,
- cross_attn_head_mask=cross_attn_head_mask,
- past_key_values=past_key_values,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- use_cache=use_cache,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- @add_start_docstrings(
- """
- The text model from KOSMOS-2 with a language modeling head on top (linear layer with weights tied to the input
- embeddings).
- """,
- KOSMOS2_START_DOCSTRING,
- )
- class Kosmos2TextForCausalLM(Kosmos2PreTrainedModel, GenerationMixin):
- config_class = Kosmos2TextConfig
- _tied_weights_keys = ["lm_head.weight"]
- def __init__(self, config: Kosmos2TextConfig):
- super().__init__(config)
- self.model = Kosmos2TextTransformer(config)
- self.lm_head = nn.Linear(in_features=config.embed_dim, out_features=config.vocab_size, bias=False)
- # Initialize weights and apply final processing
- self.post_init()
- def get_input_embeddings(self) -> nn.Module:
- return self.model.embed_tokens
- def set_input_embeddings(self, value):
- self.model.embed_tokens = value
- def get_output_embeddings(self) -> nn.Module:
- return self.lm_head
- def set_output_embeddings(self, new_embeddings):
- self.lm_head = new_embeddings
- @add_start_docstrings_to_model_forward(KOSMOS2_TEXT_INPUTS_DOCSTRING)
- @replace_return_docstrings(output_type=CausalLMOutputWithCrossAttentions, config_class=Kosmos2TextConfig)
- def forward(
- self,
- input_ids: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- image_embeds: Optional[torch.Tensor] = None,
- image_embeds_position_mask: Optional[torch.Tensor] = None,
- encoder_hidden_states: Optional[torch.Tensor] = None,
- encoder_attention_mask: Optional[torch.Tensor] = None,
- head_mask: Optional[torch.Tensor] = None,
- cross_attn_head_mask: Optional[torch.Tensor] = None,
- past_key_values: Optional[List[torch.FloatTensor]] = None,
- inputs_embeds: Optional[torch.Tensor] = None,
- position_ids: Optional[torch.Tensor] = None,
- labels: Optional[torch.LongTensor] = None,
- use_cache: Optional[bool] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, CausalLMOutputWithCrossAttentions]:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Labels for computing the left-to-right language modeling loss (next word prediction). Indices should be in
- `[-100, 0, ..., config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are
- ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`
- Returns:
- """
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- if labels is not None:
- if use_cache:
- logger.warning("The `use_cache` argument is changed to `False` since `labels` is provided.")
- use_cache = False
- outputs = self.model(
- input_ids=input_ids,
- attention_mask=attention_mask,
- image_embeds=image_embeds,
- image_embeds_position_mask=image_embeds_position_mask,
- encoder_hidden_states=encoder_hidden_states,
- encoder_attention_mask=encoder_attention_mask,
- head_mask=head_mask,
- cross_attn_head_mask=cross_attn_head_mask,
- past_key_values=past_key_values,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- use_cache=use_cache,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- lm_logits = self.lm_head(outputs[0])
- loss = None
- if labels is not None:
- # move labels to correct device to enable model parallelism
- labels = labels.to(lm_logits.device)
- # Shift so that tokens < n predict n
- shift_logits = lm_logits[..., :-1, :].contiguous()
- shift_labels = labels[..., 1:].contiguous()
- batch_size, seq_length, vocab_size = shift_logits.shape
- # Flatten the tokens
- loss_fct = CrossEntropyLoss()
- loss = loss_fct(
- shift_logits.view(batch_size * seq_length, vocab_size), shift_labels.view(batch_size * seq_length)
- )
- if not return_dict:
- output = (lm_logits,) + outputs[1:]
- return (loss,) + output if loss is not None else output
- return CausalLMOutputWithCrossAttentions(
- loss=loss,
- logits=lm_logits,
- past_key_values=outputs.past_key_values,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- cross_attentions=outputs.cross_attentions,
- )
- def prepare_inputs_for_generation(
- self,
- input_ids,
- image_embeds=None,
- image_embeds_position_mask=None,
- past_key_values=None,
- attention_mask=None,
- use_cache=None,
- **model_kwargs,
- ):
- # Overwritten -- in specific circumstances we don't want to forward image inputs to the model
- input_shape = input_ids.shape
- # if model is used as a decoder in encoder-decoder model, the decoder attention mask is created on the fly
- if attention_mask is None:
- attention_mask = input_ids.new_ones(input_shape)
- position_ids = None
- # cut input_ids if past_key_values is used
- if past_key_values is not None:
- position_ids = create_position_ids_from_input_ids(
- input_ids,
- padding_idx=self.config.pad_token_id,
- past_key_values_length=0,
- )[:, -1:]
- input_ids = input_ids[:, -1:]
- # the image info. is already encoded into the past keys/values
- image_embeds = None
- image_embeds_position_mask = None
- elif image_embeds_position_mask is not None:
- # appending `False` to `image_embeds_position_mask` (because `input_ids` grows during generation)
- batch_size, seq_len = input_ids.size()
- mask_len = image_embeds_position_mask.size()[-1]
- image_embeds_position_mask = torch.cat(
- (
- image_embeds_position_mask,
- torch.zeros(size=(batch_size, seq_len - mask_len), dtype=torch.bool, device=input_ids.device),
- ),
- dim=1,
- )
- return {
- "input_ids": input_ids,
- "image_embeds": image_embeds,
- "image_embeds_position_mask": image_embeds_position_mask,
- "past_key_values": past_key_values,
- "attention_mask": attention_mask,
- "position_ids": position_ids,
- "use_cache": use_cache,
- }
- @staticmethod
- # Copied from transformers.models.umt5.modeling_umt5.UMT5ForConditionalGeneration._reorder_cache
- def _reorder_cache(past_key_values, beam_idx):
- reordered_past = ()
- for layer_past in past_key_values:
- reordered_past += (
- tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past),
- )
- return reordered_past
- class Kosmos2ImageToTextProjection(nn.Module):
- """The layer that transforms the image model's output to part of the text model's input (namely, image features)"""
- def __init__(self, config: Kosmos2Config):
- super().__init__()
- self.dense = nn.Linear(config.vision_config.hidden_size, config.text_config.embed_dim)
- self.latent_query = nn.Parameter(torch.randn(config.latent_query_num, config.text_config.embed_dim))
- self.x_attn = KosmosTextAttention(
- config.text_config,
- config.text_config.embed_dim,
- config.text_config.attention_heads,
- dropout=config.text_config.attention_dropout,
- is_decoder=False,
- add_inner_attn_layernorm=False,
- )
- def forward(self, features):
- hidden_states = self.dense(features)
- # shape = [batch, latent_query_num, h_dim]
- latent_query = self.latent_query.unsqueeze(0).expand(hidden_states.size(0), -1, -1)
- key_value_states = torch.cat([hidden_states, latent_query], dim=1)
- hidden_states, attn_weights, _ = self.x_attn(
- hidden_states=latent_query,
- encoder_hidden_states=key_value_states,
- past_key_value=None,
- attention_mask=None,
- output_attentions=None,
- )
- return hidden_states, attn_weights
- @add_start_docstrings(
- """
- KOSMOS-2 Model for generating text and image features. The model consists of a vision encoder and a language model.
- """,
- KOSMOS2_START_DOCSTRING,
- )
- class Kosmos2Model(Kosmos2PreTrainedModel):
- config_class = Kosmos2Config
- main_input_name = "pixel_values"
- def __init__(self, config: Kosmos2Config):
- super().__init__(config)
- self.text_model = Kosmos2TextModel(config.text_config)
- self.vision_model = Kosmos2VisionModel(config.vision_config)
- self.image_to_text_projection = Kosmos2ImageToTextProjection(config)
- # Initialize weights and apply final processing
- self.post_init()
- def get_input_embeddings(self) -> nn.Module:
- return self.text_model.model.embed_tokens
- def set_input_embeddings(self, value):
- self.text_model.model.embed_tokens = value
- @add_start_docstrings_to_model_forward(KOSMOS2_INPUTS_DOCSTRING)
- @replace_return_docstrings(output_type=Kosmos2ModelOutput, config_class=_CONFIG_FOR_DOC)
- def forward(
- self,
- pixel_values: Optional[torch.Tensor] = None,
- input_ids: Optional[torch.Tensor] = None,
- image_embeds_position_mask: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- head_mask: Optional[torch.Tensor] = None,
- past_key_values: Optional[List[torch.FloatTensor]] = None,
- image_embeds: Optional[torch.Tensor] = None,
- inputs_embeds: Optional[torch.Tensor] = None,
- position_ids: Optional[torch.Tensor] = None,
- use_cache: Optional[bool] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- interpolate_pos_encoding: bool = False,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, Kosmos2ModelOutput]:
- r"""
- Returns:
- Examples:
- ```python
- >>> from PIL import Image
- >>> import requests
- >>> from transformers import AutoProcessor, Kosmos2Model
- >>> model = Kosmos2Model.from_pretrained("microsoft/kosmos-2-patch14-224")
- >>> processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
- >>> url = "https://huggingface.co/microsoft/kosmos-2-patch14-224/resolve/main/snowman.jpg"
- >>> image = Image.open(requests.get(url, stream=True).raw)
- >>> text = (
- ... "<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863>"
- ... "</object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911>"
- ... "</object>"
- ... )
- >>> inputs = processor(text=text, images=image, return_tensors="pt", add_eos_token=True)
- >>> last_hidden_state = model(
- ... pixel_values=inputs["pixel_values"],
- ... input_ids=inputs["input_ids"],
- ... attention_mask=inputs["attention_mask"],
- ... image_embeds_position_mask=inputs["image_embeds_position_mask"],
- ... ).last_hidden_state
- >>> list(last_hidden_state.shape)
- [1, 91, 2048]
- ```"""
- output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
- output_hidden_states = (
- output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
- )
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- vision_model_output = None
- projection_attentions = None
- if image_embeds is None:
- if pixel_values is None:
- raise ValueError("You have to specify either `pixel_values` or `image_embeds`.")
- vision_model_output = self.vision_model(
- pixel_values=pixel_values,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- interpolate_pos_encoding=interpolate_pos_encoding,
- return_dict=return_dict,
- )
- # The whole `last_hidden_state` through `post_layernorm` instead of just `pooled_output`.
- image_embeds = self.vision_model.model.post_layernorm(vision_model_output[0])
- # normalized features
- image_embeds = nn.functional.normalize(image_embeds, dim=-1)
- image_embeds, projection_attentions = self.image_to_text_projection(image_embeds)
- outputs = self.text_model(
- input_ids=input_ids,
- attention_mask=attention_mask,
- image_embeds=image_embeds,
- image_embeds_position_mask=image_embeds_position_mask,
- head_mask=head_mask,
- past_key_values=past_key_values,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- use_cache=use_cache,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- if not return_dict:
- outputs = outputs + (image_embeds, projection_attentions, vision_model_output)
- return tuple(output for output in outputs if output is not None)
- return Kosmos2ModelOutput(
- last_hidden_state=outputs.last_hidden_state,
- past_key_values=outputs.past_key_values,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- image_embeds=image_embeds,
- projection_attentions=projection_attentions,
- vision_model_output=vision_model_output,
- )
- @add_start_docstrings(
- """
- KOSMOS-2 Model for generating text and bounding boxes given an image. The model consists of a vision encoder and a
- language model.
- """,
- KOSMOS2_START_DOCSTRING,
- )
- class Kosmos2ForConditionalGeneration(Kosmos2PreTrainedModel, GenerationMixin):
- config_class = Kosmos2Config
- main_input_name = "pixel_values"
- _tied_weights_keys = ["text_model.lm_head.weight"]
- def __init__(self, config: Kosmos2Config):
- super().__init__(config)
- self.text_model = Kosmos2TextForCausalLM(config.text_config)
- self.vision_model = Kosmos2VisionModel(config.vision_config)
- self.image_to_text_projection = Kosmos2ImageToTextProjection(config)
- # Initialize weights and apply final processing
- self.post_init()
- def get_input_embeddings(self) -> nn.Module:
- return self.text_model.model.embed_tokens
- def set_input_embeddings(self, value):
- self.text_model.model.embed_tokens = value
- def get_output_embeddings(self) -> nn.Module:
- return self.text_model.get_output_embeddings()
- def set_output_embeddings(self, new_embeddings):
- self.text_model.set_output_embeddings(new_embeddings)
- @add_start_docstrings_to_model_forward(KOSMOS2_INPUTS_DOCSTRING)
- @replace_return_docstrings(output_type=Kosmos2ForConditionalGenerationModelOutput, config_class=_CONFIG_FOR_DOC)
- def forward(
- self,
- pixel_values: Optional[torch.Tensor] = None,
- input_ids: Optional[torch.Tensor] = None,
- image_embeds_position_mask: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- head_mask: Optional[torch.Tensor] = None,
- past_key_values: Optional[List[torch.FloatTensor]] = None,
- image_embeds: Optional[torch.Tensor] = None,
- inputs_embeds: Optional[torch.Tensor] = None,
- position_ids: Optional[torch.Tensor] = None,
- labels: Optional[torch.LongTensor] = None,
- use_cache: Optional[bool] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[Tuple, Kosmos2ForConditionalGenerationModelOutput]:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Labels for computing the left-to-right language modeling loss (next word prediction). Indices should be in
- `[-100, 0, ..., config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are
- ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`
- Returns:
- Examples:
- ```python
- >>> from PIL import Image
- >>> import requests
- >>> from transformers import AutoProcessor, Kosmos2ForConditionalGeneration
- >>> model = Kosmos2ForConditionalGeneration.from_pretrained("microsoft/kosmos-2-patch14-224")
- >>> processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
- >>> url = "https://huggingface.co/microsoft/kosmos-2-patch14-224/resolve/main/snowman.jpg"
- >>> image = Image.open(requests.get(url, stream=True).raw)
- >>> prompt = "<grounding> An image of"
- >>> inputs = processor(text=prompt, images=image, return_tensors="pt")
- >>> generated_ids = model.generate(
- ... pixel_values=inputs["pixel_values"],
- ... input_ids=inputs["input_ids"],
- ... attention_mask=inputs["attention_mask"],
- ... image_embeds=None,
- ... image_embeds_position_mask=inputs["image_embeds_position_mask"],
- ... use_cache=True,
- ... max_new_tokens=64,
- ... )
- >>> generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
- >>> processed_text = processor.post_process_generation(generated_text, cleanup_and_extract=False)
- >>> processed_text
- '<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>.'
- >>> caption, entities = processor.post_process_generation(generated_text)
- >>> caption
- 'An image of a snowman warming himself by a fire.'
- >>> entities
- [('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)])]
- ```"""
- output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
- output_hidden_states = (
- output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
- )
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- vision_model_output = None
- projection_attentions = None
- if image_embeds is None:
- if pixel_values is None:
- raise ValueError("You have to specify either `pixel_values` or `image_embeds`.")
- vision_model_output = self.vision_model(
- pixel_values=pixel_values,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- # The whole `last_hidden_state` through `post_layernorm` instead of just `pooled_output`.
- image_embeds = self.vision_model.model.post_layernorm(vision_model_output[0])
- # normalized features
- image_embeds = nn.functional.normalize(image_embeds, dim=-1)
- image_embeds, projection_attentions = self.image_to_text_projection(image_embeds)
- lm_outputs = self.text_model(
- input_ids=input_ids,
- attention_mask=attention_mask,
- image_embeds=image_embeds,
- image_embeds_position_mask=image_embeds_position_mask,
- head_mask=head_mask,
- past_key_values=past_key_values,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- labels=labels,
- use_cache=use_cache,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- if not return_dict:
- outputs = lm_outputs + (image_embeds, projection_attentions, vision_model_output)
- return tuple(output for output in outputs if output is not None)
- return Kosmos2ForConditionalGenerationModelOutput(
- loss=lm_outputs.loss,
- logits=lm_outputs.logits,
- past_key_values=lm_outputs.past_key_values,
- hidden_states=lm_outputs.hidden_states,
- attentions=lm_outputs.attentions,
- image_embeds=image_embeds,
- projection_attentions=projection_attentions,
- vision_model_output=vision_model_output,
- )
- def generate(
- self,
- pixel_values: Optional[torch.Tensor] = None,
- image_embeds_position_mask: Optional[torch.Tensor] = None,
- input_ids: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- image_embeds: Optional[torch.Tensor] = None,
- **kwargs,
- ):
- # in order to allow `inputs` argument (as in `GenerationMixin`)
- inputs = kwargs.pop("inputs", None)
- if pixel_values is not None and inputs is not None:
- raise ValueError(
- f"`inputs`: {inputs} were passed alongside `pixel_values` which is not allowed."
- f"Make sure to either pass `inputs` or pixel_values=..."
- )
- if pixel_values is None and inputs is not None:
- pixel_values = inputs
- if image_embeds is None:
- vision_model_output = self.vision_model(pixel_values)
- # The whole `last_hidden_state` through `post_layernorm` instead of just `pooled_output`.
- image_embeds = self.vision_model.model.post_layernorm(vision_model_output[0])
- # normalized features
- image_embeds = nn.functional.normalize(image_embeds, dim=-1)
- image_embeds, projection_attentions = self.image_to_text_projection(image_embeds)
- output = self.text_model.generate(
- input_ids=input_ids,
- attention_mask=attention_mask,
- image_embeds=image_embeds,
- image_embeds_position_mask=image_embeds_position_mask,
- **kwargs,
- )
- return output
|