forked from huggingface/transformers
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodeling_new_task_model.py
More file actions
514 lines (440 loc) · 22.7 KB
/
modeling_new_task_model.py
File metadata and controls
514 lines (440 loc) · 22.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# This file was automatically generated from examples/modular-transformers/modular_new_task_model.py.
# Do NOT edit this file manually as any edits will be overwritten by the generation of
# the file from the modular. If any change should be done, please apply the change to the
# modular_new_task_model.py file directly. One of our CI enforces this.
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
from collections.abc import Callable
from dataclasses import dataclass
from typing import ClassVar
import torch
from torch import nn
from ...cache_utils import Cache
from ...configuration_utils import PreTrainedConfig
from ...generation import GenerationMixin
from ...masking_utils import create_masks_for_generate
from ...modeling_flash_attention_utils import FlashAttentionKwargs
from ...modeling_outputs import BaseModelOutputWithPast, BaseModelOutputWithPooling
from ...modeling_utils import PreTrainedModel
from ...processing_utils import Unpack
from ...utils import ModelOutput, TransformersKwargs, auto_docstring, can_return_tuple, logging, torch_compilable_check
from ...utils.deprecation import deprecate_kwarg
from ..auto import AutoModel
from .configuration_new_task_model import NewTaskModelConfig
logger = logging.get_logger(__name__)
@dataclass
@auto_docstring(
custom_intro="""
Base class for NewTaskModel outputs, with hidden states and attentions.
"""
)
class NewTaskModelModelOutputWithPast(BaseModelOutputWithPast):
r"""
image_hidden_states (`torch.FloatTensor`, *optional*):
A `torch.FloatTensor` of size `(batch_size, num_images, sequence_length, hidden_size)`.
image_hidden_states of the model produced by the vision encoder and after projecting the last hidden state.
"""
image_hidden_states: torch.FloatTensor | None = None
@dataclass
@auto_docstring(
custom_intro="""
Base class for NewTaskModel causal language model (or autoregressive) outputs.
"""
)
class NewTaskModelCausalLMOutputWithPast(ModelOutput):
r"""
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided):
Language modeling loss (for next-token prediction).
logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.text_config.vocab_size)`):
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
It is a [`~cache_utils.Cache`] instance. For more details, see our [kv cache guide](https://huggingface.co/docs/transformers/en/kv_cache).
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see
`past_key_values` input) to speed up sequential decoding.
image_hidden_states (`torch.FloatTensor`, *optional*):
A `torch.FloatTensor` of size `(batch_size, num_images, sequence_length, hidden_size)`.
image_hidden_states of the model produced by the vision encoder after projecting last hidden state.
"""
loss: torch.FloatTensor | None = None
logits: torch.FloatTensor | None = None
past_key_values: Cache | None = None
hidden_states: tuple[torch.FloatTensor] | None = None
attentions: tuple[torch.FloatTensor] | None = None
image_hidden_states: torch.FloatTensor | None = None
class NewTaskModelMultiModalProjector(nn.Module):
def __init__(self, config: NewTaskModelConfig):
super().__init__()
self.linear = nn.Linear(config.vision_config.hidden_size, config.vision_config.projection_dim, bias=True)
def forward(self, image_features):
hidden_states = self.linear(image_features)
return hidden_states
@auto_docstring
class NewTaskModelPreTrainedModel(PreTrainedModel):
config: NewTaskModelConfig
base_model_prefix = "model"
input_modalities = ("image", "text")
supports_gradient_checkpointing = True
_no_split_modules = ["NewTaskModelMultiModalProjector"]
_skip_keys_device_placement = "past_key_values"
_can_compile_fullgraph = False
_supports_flash_attn = True
_supports_sdpa = True
_supports_flex_attn = True
_supports_attention_backend = True
def token_type_ids_mask_function(group_ids: torch.Tensor) -> Callable:
"""
This function adds the correct offsets to the `q_idx` and `kv_idx` as the torch API can only accept lengths,
not start and end indices.
Args:
group_ids (`torch.Tensor`):
A tensor of shape `(bs, len)` assigning each token to a vision group. Tokens with the same group
come from the same input image. Text is denoted by `-1`.
"""
def inner_mask(batch_idx: int, head_idx: int, q_idx: int, kv_idx: int) -> bool:
seq_length = group_ids.shape[-1]
# clamp indices because with static cache they can go beyond `group_ids.shape[-1]`
q_idx_clamped = q_idx.clamp(max=seq_length - 1)
kv_idx_clamped = kv_idx.clamp(max=seq_length - 1)
# Unmask if the q and kv come from same group which is not -1 (i.e. non-text)
q_group = group_ids[batch_idx, q_idx_clamped]
kv_group = group_ids[batch_idx, kv_idx_clamped]
q_group = torch.where(q_idx < seq_length, q_group, -1)
kv_group = torch.where(kv_idx < seq_length, kv_group, -1)
return (q_group == kv_group) & (q_group >= 0)
return inner_mask
@deprecate_kwarg("input_embeds", version="5.6.0", new_name="inputs_embeds")
def create_causal_mask_mapping(
config: PreTrainedConfig,
inputs_embeds: torch.Tensor,
attention_mask: torch.Tensor | None,
past_key_values: Cache | None,
position_ids: torch.Tensor | None,
token_type_ids: torch.Tensor | None = None,
pixel_values: torch.FloatTensor | None = None,
is_training: bool | None = False,
is_first_iteration: bool | None = None,
**kwargs,
) -> dict:
"""
Overwrites the base `create_masks_for_generate` with `token_type_ids` masking to create the causal mask mapping
for all kinds of forward passes. NewTaskModel uses a bidirectional mask on the prompt tokens.
Uses `pixel_values` as an optional input to disambiguate edge cases.
"""
if is_training and token_type_ids is None:
raise ValueError("`token_type_ids` is required as a model input when training")
mask_kwargs = {
"config": config.get_text_config(),
"inputs_embeds": inputs_embeds,
"attention_mask": attention_mask,
"past_key_values": past_key_values,
"position_ids": position_ids,
}
# Infer if prefill or decoding stage, if the flag isn't passed. This happens only when the mask is constructed
# from `forward` call. If users run a `forward` call, we have no option to infer `is_first_iteration` because users may be
# running generation with custom loop. Thus we need to infer it in a `non-perfect` way
# NOTE: Determining prefill in that case requires checking data values, which is not compile-compatible.
is_first_iteration = is_first_iteration or (
past_key_values is None or not past_key_values.is_initialized or pixel_values is not None
)
if is_first_iteration or not kwargs.get("use_cache", True):
if token_type_ids is not None:
# The logic bellow was originally written for Gemma3, where `token_type_ids` is reversed. Let's reverse
# it to then use exactly the same logic.
token_type_ids = 1 - token_type_ids
else:
logger.warning_once(
"It is a prefill stage but The `token_type_ids` is not provided. We recommend "
"passing `token_type_ids` to the model to prevent bad attention masking."
)
# NOTE: this branch can't be reached when training because `token_type_ids` is required as a model input.
token_type_ids = torch.ones_like(inputs_embeds)[:, :, 0]
# Logic originally copied from Gemma3. It holds up for NewTaskModel as well because NewTaskModel assumes up to one image
# per prompt AND we reverse `token_type_ids` above. Gemma3 uses a bidirectional mask for images, tagged through
# `token_type_ids` 1s.
if token_type_ids is not None and is_first_iteration:
# We need to pass an additional mask function to account for token type ids, and it needs to be an `or` (to
# undo the causal masking)
# First find where a new image block starts: 1 if image and previous not image
# The images cannot attend to future images, but can attend to all prev images and to itself bidirectionally
is_image = (token_type_ids == 1).to(inputs_embeds.device)
is_previous_image = nn.functional.pad(is_image, (1, 0), value=0)[:, :-1]
new_image_start = is_image & ~is_previous_image
group_ids = torch.cumsum(new_image_start.int(), dim=1) - 1
group_ids = torch.where(is_image, group_ids, torch.full_like(token_type_ids, -1))
mask_kwargs["or_mask_function"] = token_type_ids_mask_function(group_ids)
return create_masks_for_generate(**mask_kwargs)
@auto_docstring(
custom_intro="""
The Base NewTaskModel model which consists of a vision backbone and a language model without language modeling head.,
"""
)
class NewTaskModelModel(NewTaskModelPreTrainedModel):
# we are filtering the logits/labels so we shouldn't divide the loss based on num_items_in_batch
accepts_loss_kwargs = False
def __init__(self, config: NewTaskModelConfig):
super().__init__(config)
self.vision_tower = AutoModel.from_config(config=config.vision_config)
self.multi_modal_projector = NewTaskModelMultiModalProjector(config)
self.vocab_size = config.text_config.vocab_size
language_model = AutoModel.from_config(config=config.text_config)
self.language_model = language_model
self.text_config_dtype = self.config.get_text_config().dtype or self.dtype
self.post_init()
def get_input_embeddings(self):
return self.language_model.get_input_embeddings()
def set_input_embeddings(self, value):
self.language_model.set_input_embeddings(value)
@can_return_tuple
@auto_docstring(
custom_intro="Obtains image last hidden states from the vision tower and apply multimodal projection."
)
def get_image_features(
self, pixel_values: torch.FloatTensor, **kwargs: Unpack[TransformersKwargs]
) -> tuple | BaseModelOutputWithPooling:
image_outputs = self.vision_tower(pixel_values, **kwargs)
selected_image_feature = image_outputs.last_hidden_state
image_features = self.multi_modal_projector(selected_image_feature)
image_outputs.pooler_output = image_features
return image_outputs
def get_placeholder_mask(
self, input_ids: torch.LongTensor, inputs_embeds: torch.FloatTensor, image_features: torch.FloatTensor
):
"""
Obtains multimodal placeholder mask from `input_ids` or `inputs_embeds`, and checks that the placeholder token count is
equal to the length of multimodal features. If the lengths are different, an error is raised.
"""
if input_ids is None:
special_image_mask = inputs_embeds == self.get_input_embeddings()(
torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device)
)
special_image_mask = special_image_mask.all(-1)
else:
special_image_mask = input_ids == self.config.image_token_id
n_image_tokens = special_image_mask.sum()
n_image_features = image_features.shape[0] * image_features.shape[1]
special_image_mask = special_image_mask.unsqueeze(-1).to(inputs_embeds.device)
torch_compilable_check(
n_image_tokens * inputs_embeds.shape[-1] == image_features.numel(),
f"Image features and image tokens do not match, tokens: {n_image_tokens}, features: {n_image_features}",
)
return special_image_mask
@can_return_tuple
@auto_docstring
def forward(
self,
input_ids: torch.LongTensor | None = None,
pixel_values: torch.FloatTensor | None = None,
attention_mask: torch.Tensor | None = None,
position_ids: torch.LongTensor | None = None,
past_key_values: Cache | None = None,
token_type_ids: torch.LongTensor | None = None,
inputs_embeds: torch.FloatTensor | None = None,
labels: torch.LongTensor | None = None,
use_cache: bool | None = None,
**kwargs: Unpack[FlashAttentionKwargs],
) -> tuple | NewTaskModelModelOutputWithPast:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.text_config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.text_config.vocab_size]`.
Example:
```python
>>> from PIL import Image
>>> import httpx
>>> from io import BytesIO
>>> from transformers import AutoProcessor, NewTaskModelForConditionalGeneration
>>> model = NewTaskModelForConditionalGeneration.from_pretrained("google/new_task_model2-3b-mix-224")
>>> processor = AutoProcessor.from_pretrained("google/new_task_model2-3b-mix-224")
>>> prompt = "Where is the cat standing?"
>>> url = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/pipeline-cat-chonk.jpeg"
>>> with httpx.stream("GET", url) as response:
... image = Image.open(BytesIO(response.read()))
>>> inputs = processor(images=image, text=prompt, return_tensors="pt")
>>> # Generate
>>> generate_ids = model.generate(**inputs,)
>>> processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
"Where is the cat standing?\nsnow"
```"""
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError("You must specify exactly one of input_ids or inputs_embeds")
# Replace image id with PAD if the image token if OOV, to avoid index-errors
if input_ids is not None and self.config.image_token_id >= self.vocab_size:
special_image_mask = input_ids == self.config.image_token_id
llm_input_ids = input_ids.clone()
llm_input_ids[special_image_mask] = 0
else:
llm_input_ids = input_ids
if inputs_embeds is None:
inputs_embeds = self.get_input_embeddings()(llm_input_ids)
if position_ids is None:
past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
position_ids = torch.arange(inputs_embeds.shape[1], device=inputs_embeds.device) + past_seen_tokens
position_ids = position_ids.unsqueeze(0) + 1 # NewTaskModel positions are 1-indexed
# Merge text and images
if pixel_values is not None:
image_features = self.get_image_features(pixel_values).pooler_output
image_features = image_features.to(inputs_embeds.device, inputs_embeds.dtype)
special_image_mask = self.get_placeholder_mask(
input_ids, inputs_embeds=inputs_embeds, image_features=image_features
)
inputs_embeds = inputs_embeds.masked_scatter(special_image_mask, image_features)
# It may already have been prepared by e.g. `generate`
if not isinstance(causal_mask_mapping := attention_mask, dict):
causal_mask_mapping = create_causal_mask_mapping(
self.config,
inputs_embeds,
attention_mask,
past_key_values,
position_ids,
token_type_ids,
pixel_values,
is_training=self.training,
)
outputs = self.language_model(
attention_mask=causal_mask_mapping,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
**kwargs,
)
return NewTaskModelModelOutputWithPast(
last_hidden_state=outputs.last_hidden_state,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=image_features if pixel_values is not None else None,
)
@auto_docstring(
custom_intro="""
The Base NewTaskModel model which consists of a vision backbone and a language model without language modeling head.,
"""
)
class NewTaskModelForNewTask(NewTaskModelPreTrainedModel, GenerationMixin):
_tied_weights_keys = {"lm_head.weight": "model.language_model.embed_tokens.weight"}
main_input_name: ClassVar[str] = "doc_input_ids" # transformers-related
def __init__(self, config):
super().__init__(config)
self.model = NewTaskModelModel(config)
self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False)
self.embedding_dim = self.config.embedding_dim
self.custom_text_proj = nn.Linear(self.config.text_config.hidden_size, self.embedding_dim)
self.post_init()
@auto_docstring
def get_image_features(self, pixel_values: torch.FloatTensor, **kwargs: Unpack[TransformersKwargs]):
return self.model.get_image_features(pixel_values, **kwargs)
@can_return_tuple
@auto_docstring
def forward(
self,
input_ids: torch.LongTensor = None,
pixel_values: torch.FloatTensor = None,
attention_mask: torch.Tensor | None = None,
position_ids: torch.LongTensor | None = None,
past_key_values: Cache | None = None,
token_type_ids: torch.LongTensor | None = None,
inputs_embeds: torch.FloatTensor | None = None,
labels: torch.LongTensor | None = None,
use_cache: bool | None = None,
output_attentions: bool | None = None,
output_hidden_states: bool | None = None,
return_dict: bool | None = None,
num_logits_to_keep: int = 0,
) -> tuple | NewTaskModelCausalLMOutputWithPast:
r"""
Returns:
"""
vlm_outputs = super().forward(
input_ids=input_ids,
pixel_values=pixel_values,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
token_type_ids=token_type_ids,
inputs_embeds=inputs_embeds,
labels=labels,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=True,
return_dict=True,
num_logits_to_keep=num_logits_to_keep,
)
last_hidden_states = vlm_outputs.hidden_states[-1] # (batch_size, sequence_length, hidden_size)
proj = self.custom_text_proj(last_hidden_states) # (batch_size, sequence_length, dim)
# L2 normalization
embeddings = proj / proj.norm(dim=-1, keepdim=True) # (batch_size, sequence_length, dim)
if attention_mask is not None:
embeddings = embeddings * attention_mask.unsqueeze(-1) # (batch_size, sequence_length, dim)
return (embeddings,) + vlm_outputs
def prepare_inputs_for_generation(
self,
input_ids,
past_key_values=None,
inputs_embeds=None,
position_ids=None,
pixel_values=None,
attention_mask=None,
token_type_ids=None,
use_cache=True,
logits_to_keep=None,
labels=None,
is_first_iteration=False,
**kwargs,
):
# Overwritten -- custom `position_ids` and `pixel_values` handling
model_inputs = super().prepare_inputs_for_generation(
input_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
attention_mask=attention_mask,
position_ids=position_ids,
use_cache=use_cache,
logits_to_keep=logits_to_keep,
token_type_ids=token_type_ids,
is_first_iteration=is_first_iteration,
**kwargs,
)
# position_ids in NewTaskModel are 1-indexed
if model_inputs.get("position_ids") is not None:
# NOTE: we need this op out-of-place, otherwise it modifies the `model_kwargs` dict used in `generate` in-place!
model_inputs["position_ids"] = model_inputs["position_ids"] + 1
# Pixel values are used only in the first iteration if available
# In subsequent iterations, they are already merged with text and cached
# NOTE: first iteration doesn't have to be prefill, it can be the first
# iteration with a question and cached system prompt (continue generate from cache). NOTE: use_cache=False needs pixel_values always
if is_first_iteration or not use_cache:
model_inputs["pixel_values"] = pixel_values
return model_inputs
@staticmethod
@deprecate_kwarg("input_embeds", version="5.6.0", new_name="inputs_embeds")
def create_masks_for_generate(
config: PreTrainedConfig,
inputs_embeds: torch.Tensor,
attention_mask: torch.Tensor | None,
past_key_values: Cache | None,
position_ids: torch.Tensor | None,
token_type_ids: torch.Tensor | None = None,
is_first_iteration: bool | None = False,
**kwargs,
) -> dict:
# Uses the overwritten `create_masks_for_generate` with `token_type_ids` masking
return create_causal_mask_mapping(
config,
inputs_embeds,
attention_mask,
past_key_values,
position_ids,
token_type_ids,
is_first_iteration=is_first_iteration,
**{k: v for k, v in kwargs.items() if k != "pixel_values"},
)
def resize_token_embeddings(
self, new_num_tokens: int | None = None, pad_to_multiple_of=None, mean_resizing=True
) -> nn.Embedding:
model_embeds = self.language_model.resize_token_embeddings(new_num_tokens, pad_to_multiple_of, mean_resizing)
# Update vocab size
self.config.text_config.vocab_size = model_embeds.num_embeddings
self.config.vocab_size = model_embeds.num_embeddings
self.vocab_size = model_embeds.num_embeddings
return model_embeds