| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- # coding=utf-8
- # Copyright 2024 The HuggingFace Inc. team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Image processor class for Pixtral."""
- from typing import Any, Callable, Dict, List, Optional, Tuple, Union
- import numpy as np
- from ...image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict
- from ...image_transforms import (
- resize,
- to_channel_dimension_format,
- )
- from ...image_utils import (
- ChannelDimension,
- ImageInput,
- PILImageResampling,
- get_image_size,
- infer_channel_dimension_format,
- is_scaled_image,
- is_valid_image,
- to_numpy_array,
- valid_images,
- validate_kwargs,
- validate_preprocess_arguments,
- )
- from ...utils import TensorType, is_torch_device, is_torch_dtype, is_torch_tensor, is_vision_available, logging
- from ...utils.import_utils import requires_backends
- logger = logging.get_logger(__name__)
- if is_vision_available():
- import PIL
- class BatchMixFeature(BatchFeature):
- def to(self, *args, **kwargs) -> "BatchMixFeature":
- """
- Send all values to device by calling `v.to(*args, **kwargs)` (PyTorch only). This should support casting in
- different `dtypes` and sending the `BatchFeature` to a different `device`.
- Args:
- args (`Tuple`):
- Will be passed to the `to(...)` function of the tensors.
- kwargs (`Dict`, *optional*):
- Will be passed to the `to(...)` function of the tensors.
- Returns:
- [`BatchFeature`]: The same instance after modification.
- """
- requires_backends(self, ["torch"])
- import torch # noqa
- new_data = {}
- device = kwargs.get("device")
- # Check if the args are a device or a dtype
- if device is None and len(args) > 0:
- # device should be always the first argument
- arg = args[0]
- if is_torch_dtype(arg):
- # The first argument is a dtype
- pass
- elif isinstance(arg, str) or is_torch_device(arg) or isinstance(arg, int):
- device = arg
- else:
- # it's something else
- raise ValueError(f"Attempting to cast a BatchFeature to type {str(arg)}. This is not supported.")
- # We cast only floating point tensors to avoid issues with tokenizers casting `LongTensor` to `FloatTensor`
- for k, v in self.items():
- # check if v is a floating point
- if isinstance(v, list):
- new_data[k] = [
- element.to(*args, **kwargs) for sample in v for element in sample if is_torch_tensor(element)
- ]
- elif isinstance(v, torch.Tensor) and torch.is_floating_point(v):
- # cast and send to device
- new_data[k] = v.to(*args, **kwargs)
- elif isinstance(v, torch.Tensor) and device is not None:
- new_data[k] = v.to(device=device)
- else:
- new_data[k] = v
- self.data = new_data
- return self
- # Copied from transformers.models.idefics2.image_processing_idefics2.make_list_of_images
- def make_list_of_images(images: ImageInput) -> List[List[np.ndarray]]:
- """
- Convert a single image or a list of images to a list of numpy arrays.
- Args:
- images (`ImageInput`):
- A single image or a list of images.
- Returns:
- A list of numpy arrays.
- """
- # If it's a single image, convert it to a list of lists
- if is_valid_image(images):
- images = [[images]]
- # If it's a list of images, it's a single batch, so convert it to a list of lists
- elif isinstance(images, (list, tuple)) and len(images) > 0 and is_valid_image(images[0]):
- images = [images]
- # If it's a list of batches, it's already in the right format
- elif (
- isinstance(images, (list, tuple))
- and len(images) > 0
- and isinstance(images[0], (list, tuple))
- and is_valid_image(images[0][0])
- ):
- pass
- else:
- raise ValueError(
- "Invalid input type. Must be a single image, a list of images, or a list of batches of images."
- )
- return images
- # Adapted from function in image_transforms.py to ensure any transparent pixels are converted to white.
- def convert_to_rgb(image: ImageInput) -> ImageInput:
- """
- Converts an image to RGB format. Only converts if the image is of type PIL.Image.Image, otherwise returns the image
- as is.
- Args:
- image (Image):
- The image to convert.
- """
- requires_backends(convert_to_rgb, ["vision"])
- if not isinstance(image, PIL.Image.Image):
- return image
- if image.mode == "RGB":
- return image
- # First we convert to RGBA to set background to white.
- image = image.convert("RGBA")
- # Create a new image with a white background.
- new_image = PIL.Image.new("RGBA", image.size, "WHITE")
- new_image.paste(image, (0, 0), image)
- new_image = new_image.convert("RGB")
- return new_image
- def _num_image_tokens(image_size: Tuple[int, int], patch_size: Tuple[int, int]) -> int:
- """
- Calculate the number of image tokens given the image size and patch size.
- Args:
- image_size (`Tuple[int, int]`):
- The size of the image as `(height, width)`.
- patch_size (`Tuple[int, int]`):
- The patch size as `(height, width)`.
- Returns:
- `int`: The number of image tokens.
- """
- height, width = image_size
- patch_height, patch_width = patch_size if isinstance(patch_size, (tuple, list)) else (patch_size, patch_size)
- num_width_tokens = (width - 1) // patch_width + 1
- num_height_tokens = (height - 1) // patch_height + 1
- return num_height_tokens, num_width_tokens
- def get_resize_output_image_size(
- input_image: np.ndarray,
- size: Union[int, Tuple[int, int], List[int], Tuple[int]],
- patch_size: Union[int, Tuple[int, int], List[int], Tuple[int]],
- input_data_format: Optional[Union[str, ChannelDimension]] = None,
- ) -> tuple:
- """
- Find the target (height, width) dimension of the output image after resizing given the input image and the desired
- size.
- Args:
- input_image (`np.ndarray`):
- The image to resize.
- size (`int` or `Tuple[int, int]`):
- Max image size an input image can be. Must be a dictionary with the key "longest_edge".
- patch_size (`int` or `Tuple[int, int]`):
- The patch_size as `(height, width)` to use for resizing the image. If patch_size is an integer, `(patch_size, patch_size)`
- will be used
- input_data_format (`ChannelDimension`, *optional*):
- The channel dimension format of the input image. If unset, will use the inferred format from the input.
- Returns:
- `tuple`: The target (height, width) dimension of the output image after resizing.
- """
- max_height, max_width = size if isinstance(size, (tuple, list)) else (size, size)
- patch_height, patch_width = patch_size if isinstance(patch_size, (tuple, list)) else (patch_size, patch_size)
- height, width = get_image_size(input_image, input_data_format)
- ratio = max(height / max_height, width / max_width)
- if ratio > 1:
- # Orgiginal implementation uses `round` which utilises bankers rounding, which can lead to surprising results
- height = int(np.ceil(height / ratio))
- width = int(np.ceil(width / ratio))
- num_height_tokens, num_width_tokens = _num_image_tokens((height, width), (patch_height, patch_width))
- return num_height_tokens * patch_height, num_width_tokens * patch_width
- # Hack to get tensor conversion used in BatchFeature without batching the images
- def _get_is_as_tensor_fns(tensor_type: Union[str, TensorType]) -> Tuple[Callable, Callable]:
- return BatchFeature()._get_is_as_tensor_fns(tensor_type)
- def convert_to_tensor(array, tensor_type: Union[str, TensorType]) -> Any:
- is_tensor, as_tensor = _get_is_as_tensor_fns(tensor_type)
- if is_tensor(array):
- return array
- return as_tensor(array)
- class PixtralImageProcessor(BaseImageProcessor):
- r"""
- Constructs a Pixtral image processor.
- Args:
- do_resize (`bool`, *optional*, defaults to `True`):
- Whether to resize the image's (height, width) dimensions to the specified `size`. Can be overridden by
- `do_resize` in the `preprocess` method.
- size (`Dict[str, int]` *optional*, defaults to `{"longest_edge": 1024}`):
- Size of the maximum dimension of either the height or width dimension of the image. Used to control how
- images are resized. If either the height or width are greater than `size["longest_edge"]` then both the height and width are rescaled by `height / ratio`, `width /ratio` where `ratio = max(height / longest_edge, width / longest_edge)`
- patch_size (`Dict[str, int]` *optional*, defaults to `{"height": 16, "width": 16}`):
- Size of the patches in the model, used to calculate the output image size. Can be overridden by `patch_size` in the `preprocess` method.
- resample (`PILImageResampling`, *optional*, defaults to `Resampling.BICUBIC`):
- Resampling filter to use if resizing the image. Can be overridden by `resample` in the `preprocess` method.
- do_rescale (`bool`, *optional*, defaults to `True`):
- Whether to rescale the image by the specified scale `rescale_factor`. Can be overridden by `do_rescale` in
- the `preprocess` method.
- rescale_factor (`int` or `float`, *optional*, defaults to `1/255`):
- Scale factor to use if rescaling the image. Can be overridden by `rescale_factor` in the `preprocess`
- method.
- do_normalize (`bool`, *optional*, defaults to `True`):
- Whether to normalize the image. Can be overridden by `do_normalize` in the `preprocess` method.
- image_mean (`float` or `List[float]`, *optional*, defaults to `[0.48145466, 0.4578275, 0.40821073]`):
- Mean to use if normalizing the image. This is a float or list of floats the length of the number of
- channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method.
- image_std (`float` or `List[float]`, *optional*, defaults to `[0.26862954, 0.26130258, 0.27577711]`):
- Standard deviation to use if normalizing the image. This is a float or list of floats the length of the
- number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method.
- Can be overridden by the `image_std` parameter in the `preprocess` method.
- do_convert_rgb (`bool`, *optional*, defaults to `True`):
- Whether to convert the image to RGB.
- """
- model_input_names = ["pixel_values"]
- def __init__(
- self,
- do_resize: bool = True,
- size: Dict[str, int] = None,
- patch_size: Dict[str, int] = None,
- resample: PILImageResampling = PILImageResampling.BICUBIC,
- do_rescale: bool = True,
- rescale_factor: Union[int, float] = 1 / 255,
- do_normalize: bool = True,
- image_mean: Optional[Union[float, List[float]]] = None,
- image_std: Optional[Union[float, List[float]]] = None,
- do_convert_rgb: bool = True,
- **kwargs,
- ) -> None:
- super().__init__(**kwargs)
- size = size if size is not None else {"longest_edge": 1024}
- patch_size = patch_size if patch_size is not None else {"height": 16, "width": 16}
- patch_size = get_size_dict(patch_size, default_to_square=True)
- self.do_resize = do_resize
- self.size = size
- self.patch_size = patch_size
- self.resample = resample
- self.do_rescale = do_rescale
- self.rescale_factor = rescale_factor
- self.do_normalize = do_normalize
- self.image_mean = image_mean if image_mean is not None else [0.48145466, 0.4578275, 0.40821073]
- self.image_std = image_std if image_std is not None else [0.26862954, 0.26130258, 0.27577711]
- self.do_convert_rgb = do_convert_rgb
- self._valid_processor_keys = [
- "images",
- "do_resize",
- "size",
- "patch_size",
- "resample",
- "do_rescale",
- "rescale_factor",
- "do_normalize",
- "image_mean",
- "image_std",
- "do_convert_rgb",
- "return_tensors",
- "data_format",
- "input_data_format",
- ]
- def resize(
- self,
- image: np.ndarray,
- size: Dict[str, int],
- patch_size: Dict[str, int],
- resample: PILImageResampling = PILImageResampling.BICUBIC,
- data_format: Optional[Union[str, ChannelDimension]] = None,
- input_data_format: Optional[Union[str, ChannelDimension]] = None,
- **kwargs,
- ) -> np.ndarray:
- """
- Resize an image. The shortest edge of the image is resized to size["shortest_edge"], with the longest edge
- resized to keep the input aspect ratio.
- Args:
- image (`np.ndarray`):
- Image to resize.
- size (`Dict[str, int]`):
- Dict containing the longest possible edge of the image.
- patch_size (`Dict[str, int]`):
- Patch size used to calculate the size of the output image.
- resample (`PILImageResampling`, *optional*, defaults to `PILImageResampling.BICUBIC`):
- Resampling filter to use when resiizing the image.
- data_format (`str` or `ChannelDimension`, *optional*):
- The channel dimension format of the image. If not provided, it will be the same as the input image.
- input_data_format (`ChannelDimension` or `str`, *optional*):
- The channel dimension format of the input image. If not provided, it will be inferred.
- """
- if "longest_edge" in size:
- size = (size["longest_edge"], size["longest_edge"])
- elif "height" in size and "width" in size:
- size = (size["height"], size["width"])
- else:
- raise ValueError("size must contain either 'longest_edge' or 'height' and 'width'.")
- if "height" in patch_size and "width" in patch_size:
- patch_size = (patch_size["height"], patch_size["width"])
- else:
- raise ValueError("patch_size must contain either 'shortest_edge' or 'height' and 'width'.")
- output_size = get_resize_output_image_size(
- image,
- size=size,
- patch_size=patch_size,
- input_data_format=input_data_format,
- )
- return resize(
- image,
- size=output_size,
- resample=resample,
- data_format=data_format,
- input_data_format=input_data_format,
- **kwargs,
- )
- def preprocess(
- self,
- images: ImageInput,
- do_resize: bool = None,
- size: Dict[str, int] = None,
- patch_size: Dict[str, int] = None,
- resample: PILImageResampling = None,
- do_rescale: bool = None,
- rescale_factor: float = None,
- do_normalize: bool = None,
- image_mean: Optional[Union[float, List[float]]] = None,
- image_std: Optional[Union[float, List[float]]] = None,
- do_convert_rgb: bool = None,
- return_tensors: Optional[Union[str, TensorType]] = None,
- data_format: Optional[ChannelDimension] = ChannelDimension.FIRST,
- input_data_format: Optional[Union[str, ChannelDimension]] = None,
- **kwargs,
- ) -> PIL.Image.Image:
- """
- Preprocess an image or batch of images.
- Args:
- images (`ImageInput`):
- Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If
- passing in images with pixel values between 0 and 1, set `do_rescale=False`.
- do_resize (`bool`, *optional*, defaults to `self.do_resize`):
- Whether to resize the image.
- size (`Dict[str, int]`, *optional*, defaults to `self.size`):
- Describes the maximum input dimensions to the model.
- patch_size (`Dict[str, int]`, *optional*, defaults to `self.patch_size`):
- Patch size in the model. Used to calculate the image after resizing.
- resample (`int`, *optional*, defaults to `self.resample`):
- Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`. Only
- has an effect if `do_resize` is set to `True`.
- do_rescale (`bool`, *optional*, defaults to `self.do_rescale`):
- Whether to rescale the image.
- rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`):
- Rescale factor to rescale the image by if `do_rescale` is set to `True`.
- do_normalize (`bool`, *optional*, defaults to `self.do_normalize`):
- Whether to normalize the image.
- image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`):
- Image mean to use for normalization. Only has an effect if `do_normalize` is set to `True`.
- image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`):
- Image standard deviation to use for normalization. Only has an effect if `do_normalize` is set to
- `True`.
- do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`):
- Whether to convert the image to RGB.
- return_tensors (`str` or `TensorType`, *optional*):
- The type of tensors to return. Can be one of:
- - Unset: Return a list of `np.ndarray`.
- - `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`.
- - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`.
- - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`.
- - `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`.
- data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`):
- The channel dimension format for the output image. Can be one of:
- - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- - Unset: Use the channel dimension format of the input image.
- input_data_format (`ChannelDimension` or `str`, *optional*):
- The channel dimension format for the input image. If unset, the channel dimension format is inferred
- from the input image. Can be one of:
- - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- - `"none"` or `ChannelDimension.NONE`: image in (height, width) format.
- """
- patch_size = patch_size if patch_size is not None else self.patch_size
- patch_size = get_size_dict(patch_size, default_to_square=True)
- do_resize = do_resize if do_resize is not None else self.do_resize
- size = size if size is not None else self.size
- resample = resample if resample is not None else self.resample
- do_rescale = do_rescale if do_rescale is not None else self.do_rescale
- rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor
- do_normalize = do_normalize if do_normalize is not None else self.do_normalize
- image_mean = image_mean if image_mean is not None else self.image_mean
- image_std = image_std if image_std is not None else self.image_std
- do_convert_rgb = do_convert_rgb if do_convert_rgb is not None else self.do_convert_rgb
- validate_kwargs(captured_kwargs=kwargs.keys(), valid_processor_keys=self._valid_processor_keys)
- images_list = make_list_of_images(images)
- if not valid_images(images_list[0]):
- raise ValueError(
- "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, "
- "torch.Tensor, tf.Tensor or jax.ndarray."
- )
- validate_preprocess_arguments(
- do_rescale=do_rescale,
- rescale_factor=rescale_factor,
- do_normalize=do_normalize,
- image_mean=image_mean,
- image_std=image_std,
- do_resize=do_resize,
- size=size,
- resample=resample,
- )
- if do_convert_rgb:
- images_list = [[convert_to_rgb(image) for image in images] for images in images_list]
- # All transformations expect numpy arrays.
- images_list = [[to_numpy_array(image) for image in images] for images in images_list]
- if is_scaled_image(images_list[0][0]) and do_rescale:
- logger.warning_once(
- "It looks like you are trying to rescale already rescaled images. If the input"
- " images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again."
- )
- if input_data_format is None:
- # We assume that all images have the same channel dimension format.
- input_data_format = infer_channel_dimension_format(images_list[0][0])
- batch_images = []
- batch_image_sizes = []
- for sample_images in images_list:
- images = []
- image_sizes = []
- for image in sample_images:
- if do_resize:
- image = self.resize(
- image=image,
- size=size,
- patch_size=patch_size,
- resample=resample,
- input_data_format=input_data_format,
- )
- if do_rescale:
- image = self.rescale(image=image, scale=rescale_factor, input_data_format=input_data_format)
- if do_normalize:
- image = self.normalize(
- image=image, mean=image_mean, std=image_std, input_data_format=input_data_format
- )
- images.append(image)
- image_sizes.append(get_image_size(image, input_data_format))
- batch_images.append(images)
- batch_image_sizes.append(image_sizes)
- images_list = [
- [to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) for image in images]
- for images in batch_images
- ]
- # Convert to tensor type outside of BatchFeature to avoid batching the images of different sizes
- images_list = [[convert_to_tensor(image, return_tensors) for image in images] for images in images_list]
- return BatchMixFeature(data={"pixel_values": images_list, "image_sizes": batch_image_sizes}, tensor_type=None)
|