|
| 1 | +# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai> |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: Apache-2.0 |
| 4 | +from dataclasses import replace |
| 5 | +from typing import Any, Dict, List, Optional, Tuple |
| 6 | + |
| 7 | +import requests |
| 8 | +from haystack import Document, component, default_from_dict, default_to_dict, logging |
| 9 | +from haystack.components.converters.image.image_utils import ( |
| 10 | + _batch_convert_pdf_pages_to_images, |
| 11 | + _encode_image_to_base64, |
| 12 | + _extract_image_sources_info, |
| 13 | + _PDFPageInfo, |
| 14 | +) |
| 15 | +from haystack.dataclasses import ByteStream |
| 16 | +from haystack.utils import Secret, deserialize_secrets_inplace |
| 17 | + |
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
| 20 | +JINA_API_URL: str = "https://api.jina.ai/v1/embeddings" |
| 21 | + |
| 22 | + |
| 23 | +@component |
| 24 | +class JinaDocumentImageEmbedder: |
| 25 | + """ |
| 26 | + A component for computing Document embeddings based on images using Jina AI multimodal models. |
| 27 | +
|
| 28 | + The embedding of each Document is stored in the `embedding` field of the Document. |
| 29 | +
|
| 30 | + The JinaDocumentImageEmbedder supports models from the jina-clip series and jina-embeddings-v4 |
| 31 | + which can encode images into vector representations in the same embedding space as text. |
| 32 | +
|
| 33 | + Usage example: |
| 34 | + ```python |
| 35 | + from haystack import Document |
| 36 | + from haystack_integrations.components.embedders.jina import JinaDocumentImageEmbedder |
| 37 | +
|
| 38 | + # Make sure that the environment variable JINA_API_KEY is set |
| 39 | +
|
| 40 | + embedder = JinaDocumentImageEmbedder(model="jina-clip-v2") |
| 41 | +
|
| 42 | + documents = [ |
| 43 | + Document(content="A photo of a cat", meta={"file_path": "cat.jpg"}), |
| 44 | + Document(content="A photo of a dog", meta={"file_path": "dog.jpg"}), |
| 45 | + ] |
| 46 | +
|
| 47 | + result = embedder.run(documents=documents) |
| 48 | + documents_with_embeddings = result["documents"] |
| 49 | + print(documents_with_embeddings[0].embedding) |
| 50 | +
|
| 51 | + # [0.017020374536514282, -0.023255806416273117, ...] |
| 52 | + ``` |
| 53 | + """ |
| 54 | + |
| 55 | + def __init__( |
| 56 | + self, |
| 57 | + *, |
| 58 | + api_key: Secret = Secret.from_env_var("JINA_API_KEY"), # noqa: B008 |
| 59 | + model: str = "jina-clip-v2", |
| 60 | + file_path_meta_field: str = "file_path", |
| 61 | + root_path: Optional[str] = None, |
| 62 | + embedding_dimension: Optional[int] = None, |
| 63 | + image_size: Optional[Tuple[int, int]] = None, |
| 64 | + batch_size: int = 5, |
| 65 | + ): |
| 66 | + """ |
| 67 | + Create a JinaDocumentImageEmbedder component. |
| 68 | +
|
| 69 | + :param api_key: The Jina API key. It can be explicitly provided or automatically read from the |
| 70 | + environment variable `JINA_API_KEY` (recommended). |
| 71 | + :param model: The name of the Jina multimodal model to use. |
| 72 | + Supported models include: |
| 73 | + - "jina-clip-v1" |
| 74 | + - "jina-clip-v2" (default) |
| 75 | + - "jina-embeddings-v4" |
| 76 | + Check the list of available models on [Jina documentation](https://jina.ai/embeddings/). |
| 77 | + :param file_path_meta_field: The metadata field in the Document that contains the file path to the image or PDF. |
| 78 | + :param root_path: The root directory path where document files are located. If provided, file paths in |
| 79 | + document metadata will be resolved relative to this path. If None, file paths are treated as absolute paths. |
| 80 | + :param embedding_dimension: Number of desired dimensions for the embedding. |
| 81 | + Smaller dimensions are easier to store and retrieve, with minimal performance impact thanks to MRL. |
| 82 | + Only supported by jina-embeddings-v4. |
| 83 | + :param image_size: If provided, resizes the image to fit within the specified dimensions (width, height) while |
| 84 | + maintaining aspect ratio. This reduces file size, memory usage, and processing time. |
| 85 | + :param batch_size: Number of images to send in each API request. Defaults to 5. |
| 86 | + """ |
| 87 | + |
| 88 | + resolved_api_key = api_key.resolve_value() |
| 89 | + |
| 90 | + self.api_key = api_key |
| 91 | + self.model_name = model |
| 92 | + self.file_path_meta_field = file_path_meta_field |
| 93 | + self.root_path = root_path or "" |
| 94 | + self.embedding_dimension = embedding_dimension |
| 95 | + self.image_size = image_size |
| 96 | + self.batch_size = batch_size |
| 97 | + self._session = requests.Session() |
| 98 | + self._session.headers.update( |
| 99 | + { |
| 100 | + "Authorization": f"Bearer {resolved_api_key}", |
| 101 | + "Accept-Encoding": "identity", |
| 102 | + "Content-type": "application/json", |
| 103 | + } |
| 104 | + ) |
| 105 | + |
| 106 | + def _get_telemetry_data(self) -> Dict[str, Any]: |
| 107 | + """ |
| 108 | + Data that is sent to Posthog for usage analytics. |
| 109 | + """ |
| 110 | + return {"model": self.model_name} |
| 111 | + |
| 112 | + def to_dict(self) -> Dict[str, Any]: |
| 113 | + """ |
| 114 | + Serializes the component to a dictionary. |
| 115 | +
|
| 116 | + :returns: |
| 117 | + Dictionary with serialized data. |
| 118 | + """ |
| 119 | + return default_to_dict( |
| 120 | + self, |
| 121 | + api_key=self.api_key.to_dict(), |
| 122 | + model=self.model_name, |
| 123 | + file_path_meta_field=self.file_path_meta_field, |
| 124 | + root_path=self.root_path, |
| 125 | + embedding_dimension=self.embedding_dimension, |
| 126 | + image_size=self.image_size, |
| 127 | + batch_size=self.batch_size, |
| 128 | + ) |
| 129 | + |
| 130 | + @classmethod |
| 131 | + def from_dict(cls, data: Dict[str, Any]) -> "JinaDocumentImageEmbedder": |
| 132 | + """ |
| 133 | + Deserializes the component from a dictionary. |
| 134 | +
|
| 135 | + :param data: |
| 136 | + Dictionary to deserialize from. |
| 137 | + :returns: |
| 138 | + Deserialized component. |
| 139 | + """ |
| 140 | + deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"]) |
| 141 | + return default_from_dict(cls, data) |
| 142 | + |
| 143 | + def _extract_images_to_embed(self, documents: List[Document]) -> List[str]: |
| 144 | + """ |
| 145 | + Validates the input documents and extracts the images to embed in the format expected by the Jina API. |
| 146 | +
|
| 147 | + :param documents: |
| 148 | + Documents to embed. |
| 149 | +
|
| 150 | + :returns: |
| 151 | + List of images to embed in the format expected by the Jina API. |
| 152 | +
|
| 153 | + :raises TypeError: |
| 154 | + If the input is not a list of `Documents`. |
| 155 | + :raises RuntimeError: |
| 156 | + If the conversion of some documents fails. |
| 157 | + """ |
| 158 | + if not isinstance(documents, list) or not all(isinstance(d, Document) for d in documents): |
| 159 | + msg = ( |
| 160 | + "JinaDocumentImageEmbedder expects a list of Documents as input. " |
| 161 | + "In case you want to embed a string, please use the JinaTextEmbedder." |
| 162 | + ) |
| 163 | + raise TypeError(msg) |
| 164 | + |
| 165 | + images_source_info = _extract_image_sources_info( |
| 166 | + documents=documents, file_path_meta_field=self.file_path_meta_field, root_path=self.root_path |
| 167 | + ) |
| 168 | + |
| 169 | + images_to_embed: List[Optional[str]] = [None] * len(documents) |
| 170 | + pdf_page_infos: List[_PDFPageInfo] = [] |
| 171 | + |
| 172 | + for doc_idx, image_source_info in enumerate(images_source_info): |
| 173 | + if image_source_info["mime_type"] == "application/pdf": |
| 174 | + # Store PDF documents for later processing |
| 175 | + page_number = image_source_info.get("page_number") |
| 176 | + if page_number is None: |
| 177 | + msg = f"Page number is required for PDF document at index {doc_idx}" |
| 178 | + raise ValueError(msg) |
| 179 | + pdf_page_info: _PDFPageInfo = { |
| 180 | + "doc_idx": doc_idx, |
| 181 | + "path": image_source_info["path"], |
| 182 | + "page_number": page_number, |
| 183 | + } |
| 184 | + pdf_page_infos.append(pdf_page_info) |
| 185 | + else: |
| 186 | + # Process images directly |
| 187 | + image_byte_stream = ByteStream.from_file_path( |
| 188 | + filepath=image_source_info["path"], mime_type=image_source_info["mime_type"] |
| 189 | + ) |
| 190 | + mime_type, base64_image = _encode_image_to_base64(bytestream=image_byte_stream, size=self.image_size) |
| 191 | + images_to_embed[doc_idx] = f"data:{mime_type};base64,{base64_image}" |
| 192 | + |
| 193 | + base64_jpeg_images_by_doc_idx = _batch_convert_pdf_pages_to_images( |
| 194 | + pdf_page_infos=pdf_page_infos, return_base64=True, size=self.image_size |
| 195 | + ) |
| 196 | + for doc_idx, base64_jpeg_image in base64_jpeg_images_by_doc_idx.items(): |
| 197 | + images_to_embed[doc_idx] = f"data:image/jpeg;base64,{base64_jpeg_image}" |
| 198 | + |
| 199 | + none_images_doc_ids = [documents[doc_idx].id for doc_idx, image in enumerate(images_to_embed) if image is None] |
| 200 | + if none_images_doc_ids: |
| 201 | + msg = f"Conversion failed for some documents. Document IDs: {none_images_doc_ids}." |
| 202 | + raise RuntimeError(msg) |
| 203 | + |
| 204 | + # tested above that image is not None, but mypy doesn't know that |
| 205 | + return images_to_embed # type: ignore[return-value] |
| 206 | + |
| 207 | + @component.output_types(documents=List[Document]) |
| 208 | + def run(self, documents: List[Document]) -> Dict[str, List[Document]]: |
| 209 | + """ |
| 210 | + Embed a list of image documents. |
| 211 | +
|
| 212 | + :param documents: |
| 213 | + Documents to embed. |
| 214 | +
|
| 215 | + :returns: |
| 216 | + A dictionary with the following keys: |
| 217 | + - `documents`: Documents with embeddings. |
| 218 | + """ |
| 219 | + |
| 220 | + if not documents: |
| 221 | + return {"documents": []} |
| 222 | + |
| 223 | + images_to_embed = self._extract_images_to_embed(documents) |
| 224 | + |
| 225 | + embeddings = [] |
| 226 | + |
| 227 | + # Process images in batches |
| 228 | + for i in range(0, len(images_to_embed), self.batch_size): |
| 229 | + batch_images = images_to_embed[i : i + self.batch_size] |
| 230 | + |
| 231 | + # Prepare request parameters |
| 232 | + parameters: Dict[str, Any] = {} |
| 233 | + if self.embedding_dimension is not None: |
| 234 | + parameters["dimensions"] = self.embedding_dimension |
| 235 | + |
| 236 | + try: |
| 237 | + response = self._session.post( |
| 238 | + JINA_API_URL, |
| 239 | + json={ |
| 240 | + "input": batch_images, |
| 241 | + "model": self.model_name, |
| 242 | + **parameters, |
| 243 | + }, |
| 244 | + ) |
| 245 | + resp = response.json() |
| 246 | + except Exception as e: |
| 247 | + msg = f"Error calling Jina API: {e}" |
| 248 | + raise RuntimeError(msg) from e |
| 249 | + |
| 250 | + if "data" not in resp: |
| 251 | + error_msg = resp.get("detail", "Unknown error occurred") |
| 252 | + msg = f"Jina API error: {error_msg}" |
| 253 | + raise RuntimeError(msg) |
| 254 | + |
| 255 | + batch_embeddings = [item["embedding"] for item in resp["data"]] |
| 256 | + embeddings.extend(batch_embeddings) |
| 257 | + |
| 258 | + docs_with_embeddings = [] |
| 259 | + for doc, emb in zip(documents, embeddings): |
| 260 | + # we store this information for later inspection |
| 261 | + new_meta = { |
| 262 | + **doc.meta, |
| 263 | + "embedding_source": {"type": "image", "file_path_meta_field": self.file_path_meta_field}, |
| 264 | + } |
| 265 | + new_doc = replace(doc, meta=new_meta, embedding=emb) |
| 266 | + docs_with_embeddings.append(new_doc) |
| 267 | + |
| 268 | + return {"documents": docs_with_embeddings} |
0 commit comments