Skip to content

Commit 9ba242c

Browse files
feat(jina): add async support and migrate from requests to httpx (#3144)
* feat(jina): add async support to JinaTextEmbedder and migrate to httpx Replace `requests` with `httpx` for HTTP calls and add `run_async` method to `JinaTextEmbedder` for use with Haystack's `AsyncPipeline`. Ref: #3134 * feat(jina): add async support to JinaDocumentEmbedder and migrate to httpx Replace `requests` with `httpx` for HTTP calls and add `run_async` method to `JinaDocumentEmbedder` for use with Haystack's `AsyncPipeline`. Ref: #3134 * feat(jina): add async support to JinaRanker and migrate to httpx Replace `requests` with `httpx` for HTTP calls and add `run_async` method to `JinaRanker` for use with Haystack's `AsyncPipeline`. Ref: #3134 * feat(jina): add async support to JinaReaderConnector and migrate to httpx Replace `requests` with `httpx` for HTTP calls and add `run_async` method to `JinaReaderConnector` for use with Haystack's `AsyncPipeline`. Ref: #3134 * feat(jina): add async support to JinaDocumentImageEmbedder and migrate to httpx Replace `requests` with `httpx` for HTTP calls and add `run_async` method to `JinaDocumentImageEmbedder` for use with Haystack's `AsyncPipeline`. Ref: #3134 * build(jina): replace requests dependency with httpx The `requests` library is no longer used after migrating all components to `httpx` for both sync and async HTTP calls. Ref: #3134 * fix(jina): fix async test to actually call run_async and normalize variable naming - Make test_run_async_wrong_input_format async and call run_async() - Rename resp -> response in _embed_batch_async for consistency Ref: #3134 * build(jina): pin httpx>=0.24.0 Ref: #3134 * build(jina): pin httpcore>=1.0.8 for Python 3.14 compatibility httpcore < 1.0.8 fails to import on Python 3.14 due to typing.Union changes. httpx only requires httpcore==1.* without a floor, so we add an explicit minimum. Ref: #3134 * build(jina): bump httpx>=0.28.0 for Python 3.13+ compatibility httpx < 0.28.0 imports the removed `cgi` module on Python 3.13+ and depends on httpcore versions incompatible with Python 3.14. Bumping to >=0.28.0 fixes both issues, replacing the separate httpcore pin. Ref: #3134 --------- Co-authored-by: bogdankostic <bogdankostic@web.de>
1 parent d9d7f6c commit 9ba242c

11 files changed

Lines changed: 736 additions & 277 deletions

File tree

integrations/jina/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ classifiers = [
2626
"Programming Language :: Python :: Implementation :: CPython",
2727
"Programming Language :: Python :: Implementation :: PyPy",
2828
]
29-
dependencies = ["requests>=2.25.0", "haystack-ai>=2.22.0"]
29+
dependencies = ["httpx>=0.28.0", "haystack-ai>=2.22.0"]
3030

3131
[project.urls]
3232
Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/jina#readme"

integrations/jina/src/haystack_integrations/components/connectors/jina/reader.py

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
import json
65
from typing import Any
76
from urllib.parse import quote
87

9-
import requests
8+
import httpx
109
from haystack import Document, component, default_from_dict, default_to_dict
1110
from haystack.utils import Secret, deserialize_secrets_inplace
1211

@@ -105,19 +104,7 @@ def _json_to_document(self, data: dict) -> Document:
105104
document = Document(content=content, meta=data)
106105
return document
107106

108-
@component.output_types(documents=list[Document])
109-
def run(self, query: str, headers: dict[str, str] | None = None) -> dict[str, list[Document]]:
110-
"""
111-
Process the query/URL using the Jina AI reader service.
112-
113-
:param query: The query string or URL to process.
114-
:param headers: Optional headers to include in the request for customization. Refer to the
115-
[Jina Reader documentation](https://jina.ai/reader/) for more information.
116-
117-
:returns:
118-
A dictionary with the following keys:
119-
- `documents`: A list of `Document` objects.
120-
"""
107+
def _prepare_request(self, query: str, headers: dict[str, str] | None = None) -> tuple[str, dict[str, str]]:
121108
headers = headers or {}
122109
headers["Authorization"] = f"Bearer {self.api_key.resolve_value()}"
123110

@@ -127,17 +114,60 @@ def run(self, query: str, headers: dict[str, str] | None = None) -> dict[str, li
127114
endpoint_url = READER_ENDPOINT_URL_BY_MODE[self.mode]
128115
encoded_target = quote(query, safe="")
129116
url = f"{endpoint_url}{encoded_target}"
117+
return url, headers
130118

131-
response = requests.get(url, headers=headers, timeout=60)
132-
119+
def _parse_response(self, response: httpx.Response, query: str) -> dict[str, list[Document]]:
133120
# raw response: we just return a single Document with text
134121
if not self.json_response:
135122
meta = {"content_type": response.headers["Content-Type"], "query": query}
136123
return {"documents": [Document(content=response.text, meta=meta)]}
137124

138-
response_json = json.loads(response.content).get("data", {})
125+
response_json = response.json().get("data", {})
139126
if self.mode == JinaReaderMode.SEARCH:
140127
documents = [self._json_to_document(record) for record in response_json]
141128
return {"documents": documents}
142129

143130
return {"documents": [self._json_to_document(response_json)]}
131+
132+
@component.output_types(documents=list[Document])
133+
def run(self, query: str, headers: dict[str, str] | None = None) -> dict[str, list[Document]]:
134+
"""
135+
Process the query/URL using the Jina AI reader service.
136+
137+
:param query: The query string or URL to process.
138+
:param headers: Optional headers to include in the request for customization. Refer to the
139+
[Jina Reader documentation](https://jina.ai/reader/) for more information.
140+
141+
:returns:
142+
A dictionary with the following keys:
143+
- `documents`: A list of `Document` objects.
144+
"""
145+
url, request_headers = self._prepare_request(query, headers)
146+
147+
with httpx.Client() as client:
148+
response = client.get(url, headers=request_headers, timeout=60)
149+
150+
return self._parse_response(response, query)
151+
152+
@component.output_types(documents=list[Document])
153+
async def run_async(self, query: str, headers: dict[str, str] | None = None) -> dict[str, list[Document]]:
154+
"""
155+
Asynchronously process the query/URL using the Jina AI reader service.
156+
157+
This is the asynchronous version of the `run` method. It has the same parameters and return values
158+
but can be used with `await` in async code.
159+
160+
:param query: The query string or URL to process.
161+
:param headers: Optional headers to include in the request for customization. Refer to the
162+
[Jina Reader documentation](https://jina.ai/reader/) for more information.
163+
164+
:returns:
165+
A dictionary with the following keys:
166+
- `documents`: A list of `Document` objects.
167+
"""
168+
url, request_headers = self._prepare_request(query, headers)
169+
170+
async with httpx.AsyncClient() as client:
171+
response = await client.get(url, headers=request_headers, timeout=60)
172+
173+
return self._parse_response(response, query)

integrations/jina/src/haystack_integrations/components/embedders/jina/document_embedder.py

Lines changed: 115 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import replace
55
from typing import Any
66

7-
import requests
7+
import httpx
88
from haystack import Document, component, default_from_dict, default_to_dict
99
from haystack.utils import Secret, deserialize_secrets_inplace
1010
from tqdm import tqdm
@@ -89,14 +89,11 @@ def __init__(
8989
self.progress_bar = progress_bar
9090
self.meta_fields_to_embed = meta_fields_to_embed or []
9191
self.embedding_separator = embedding_separator
92-
self._session = requests.Session()
93-
self._session.headers.update(
94-
{
95-
"Authorization": f"Bearer {resolved_api_key}",
96-
"Accept-Encoding": "identity",
97-
"Content-type": "application/json",
98-
}
99-
)
92+
self._headers = {
93+
"Authorization": f"Bearer {resolved_api_key}",
94+
"Accept-Encoding": "identity",
95+
"Content-type": "application/json",
96+
}
10097
self.task = task
10198
self.dimensions = dimensions
10299
self.late_chunking = late_chunking
@@ -164,40 +161,96 @@ def _prepare_texts_to_embed(self, documents: list[Document]) -> list[str]:
164161
texts_to_embed.append(text_to_embed)
165162
return texts_to_embed
166163

164+
def _validate_input(self, documents: list[Document]) -> None:
165+
if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)):
166+
msg = (
167+
"JinaDocumentEmbedder expects a list of Documents as input."
168+
"In case you want to embed a string, please use the JinaTextEmbedder."
169+
)
170+
raise TypeError(msg)
171+
172+
def _prepare_parameters(self) -> dict[str, Any]:
173+
parameters: dict[str, Any] = {}
174+
if self.task is not None:
175+
parameters["task"] = self.task
176+
if self.dimensions is not None:
177+
parameters["dimensions"] = self.dimensions
178+
if self.late_chunking is not None:
179+
parameters["late_chunking"] = self.late_chunking
180+
return parameters
181+
182+
@staticmethod
183+
def _process_batch_response(
184+
response: dict[str, Any], all_embeddings: list[list[float]], metadata: dict[str, Any]
185+
) -> None:
186+
if "data" not in response:
187+
raise RuntimeError(response["detail"])
188+
189+
# Sort resulting embeddings by index
190+
sorted_embeddings = sorted(response["data"], key=lambda e: e["index"])
191+
embeddings = [result["embedding"] for result in sorted_embeddings]
192+
all_embeddings.extend(embeddings)
193+
if "model" not in metadata:
194+
metadata["model"] = response["model"]
195+
if "usage" not in metadata:
196+
metadata["usage"] = dict(response["usage"].items())
197+
else:
198+
metadata["usage"]["prompt_tokens"] += response["usage"]["prompt_tokens"]
199+
metadata["usage"]["total_tokens"] += response["usage"]["total_tokens"]
200+
167201
def _embed_batch(
168202
self, texts_to_embed: list[str], batch_size: int, parameters: dict | None = None
169203
) -> tuple[list[list[float]], dict[str, Any]]:
170-
"""
171-
Embed a list of texts in batches.
172-
"""
204+
"""Embed a list of texts in batches."""
205+
all_embeddings: list[list[float]] = []
206+
metadata: dict[str, Any] = {}
207+
with httpx.Client() as client:
208+
for i in tqdm(
209+
range(0, len(texts_to_embed), batch_size),
210+
disable=not self.progress_bar,
211+
desc="Calculating embeddings",
212+
):
213+
batch = texts_to_embed[i : i + batch_size]
214+
response = client.post(
215+
self.base_url,
216+
json={"input": batch, "model": self.model_name, **(parameters or {})},
217+
headers=self._headers,
218+
).json()
219+
self._process_batch_response(response, all_embeddings, metadata)
173220

174-
all_embeddings = []
175-
metadata = {}
176-
for i in tqdm(
177-
range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
178-
):
179-
batch = texts_to_embed[i : i + batch_size]
180-
response = self._session.post(
181-
self.base_url,
182-
json={"input": batch, "model": self.model_name, **(parameters or {})},
183-
).json()
184-
if "data" not in response:
185-
raise RuntimeError(response["detail"])
186-
187-
# Sort resulting embeddings by index
188-
sorted_embeddings = sorted(response["data"], key=lambda e: e["index"])
189-
embeddings = [result["embedding"] for result in sorted_embeddings]
190-
all_embeddings.extend(embeddings)
191-
if "model" not in metadata:
192-
metadata["model"] = response["model"]
193-
if "usage" not in metadata:
194-
metadata["usage"] = dict(response["usage"].items())
195-
else:
196-
metadata["usage"]["prompt_tokens"] += response["usage"]["prompt_tokens"]
197-
metadata["usage"]["total_tokens"] += response["usage"]["total_tokens"]
221+
return all_embeddings, metadata
222+
223+
async def _embed_batch_async(
224+
self, texts_to_embed: list[str], batch_size: int, parameters: dict | None = None
225+
) -> tuple[list[list[float]], dict[str, Any]]:
226+
"""Asynchronously embed a list of texts in batches."""
227+
all_embeddings: list[list[float]] = []
228+
metadata: dict[str, Any] = {}
229+
async with httpx.AsyncClient() as client:
230+
for i in tqdm(
231+
range(0, len(texts_to_embed), batch_size),
232+
disable=not self.progress_bar,
233+
desc="Calculating embeddings",
234+
):
235+
batch = texts_to_embed[i : i + batch_size]
236+
response = await client.post(
237+
self.base_url,
238+
json={"input": batch, "model": self.model_name, **(parameters or {})},
239+
headers=self._headers,
240+
)
241+
self._process_batch_response(response.json(), all_embeddings, metadata)
198242

199243
return all_embeddings, metadata
200244

245+
@staticmethod
246+
def _build_result(
247+
documents: list[Document], embeddings: list[list[float]], metadata: dict[str, Any]
248+
) -> dict[str, Any]:
249+
new_documents: list[Document] = []
250+
for doc, emb in zip(documents, embeddings, strict=True):
251+
new_documents.append(replace(doc, embedding=emb))
252+
return {"documents": new_documents, "meta": metadata}
253+
201254
@component.output_types(documents=list[Document], meta=dict[str, Any])
202255
def run(self, documents: list[Document]) -> dict[str, Any]:
203256
"""
@@ -209,27 +262,36 @@ def run(self, documents: list[Document]) -> dict[str, Any]:
209262
- `meta`: A dictionary with metadata including the model name and usage statistics.
210263
:raises TypeError: If the input is not a list of Documents.
211264
"""
212-
if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)):
213-
msg = (
214-
"JinaDocumentEmbedder expects a list of Documents as input."
215-
"In case you want to embed a string, please use the JinaTextEmbedder."
216-
)
217-
raise TypeError(msg)
265+
self._validate_input(documents)
218266

219267
texts_to_embed = self._prepare_texts_to_embed(documents=documents)
220-
parameters: dict[str, Any] = {}
221-
if self.task is not None:
222-
parameters["task"] = self.task
223-
if self.dimensions is not None:
224-
parameters["dimensions"] = self.dimensions
225-
if self.late_chunking is not None:
226-
parameters["late_chunking"] = self.late_chunking
268+
parameters = self._prepare_parameters()
227269
embeddings, metadata = self._embed_batch(
228270
texts_to_embed=texts_to_embed, batch_size=self.batch_size, parameters=parameters
229271
)
230272

231-
new_documents: list[Document] = []
232-
for doc, emb in zip(documents, embeddings, strict=True):
233-
new_documents.append(replace(doc, embedding=emb))
273+
return self._build_result(documents, embeddings, metadata)
234274

235-
return {"documents": new_documents, "meta": metadata}
275+
@component.output_types(documents=list[Document], meta=dict[str, Any])
276+
async def run_async(self, documents: list[Document]) -> dict[str, Any]:
277+
"""
278+
Asynchronously compute the embeddings for a list of Documents.
279+
280+
This is the asynchronous version of the `run` method. It has the same parameters and return values
281+
but can be used with `await` in async code.
282+
283+
:param documents: A list of Documents to embed.
284+
:returns: A dictionary with following keys:
285+
- `documents`: List of Documents, each with an `embedding` field containing the computed embedding.
286+
- `meta`: A dictionary with metadata including the model name and usage statistics.
287+
:raises TypeError: If the input is not a list of Documents.
288+
"""
289+
self._validate_input(documents)
290+
291+
texts_to_embed = self._prepare_texts_to_embed(documents=documents)
292+
parameters = self._prepare_parameters()
293+
embeddings, metadata = await self._embed_batch_async(
294+
texts_to_embed=texts_to_embed, batch_size=self.batch_size, parameters=parameters
295+
)
296+
297+
return self._build_result(documents, embeddings, metadata)

0 commit comments

Comments
 (0)