Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions haystack/components/connectors/openapi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class OpenAPIServiceConnector:
<!-- test-ignore -->
```python
import json
import requests
import httpx

from haystack.components.connectors import OpenAPIServiceConnector
from haystack.dataclasses import ChatMessage, ToolCall
Expand All @@ -187,7 +187,7 @@ class OpenAPIServiceConnector:
message = ChatMessage.from_assistant(tool_calls=[tool_call])

serper_token = Secret.from_env_var("SERPERDEV_API_KEY").resolve_value()
serperdev_openapi_spec = json.loads(requests.get("https://bit.ly/serper_dev_spec").text)
serperdev_openapi_spec = json.loads(httpx.get("https://bit.ly/serper_dev_spec", follow_redirects=True).text)
service_connector = OpenAPIServiceConnector()
result = service_connector.run(
messages=[message],
Expand Down
48 changes: 28 additions & 20 deletions haystack/components/rankers/hugging_face_tei.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from typing import Any
from urllib.parse import urljoin

import httpx

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.utils import Secret
from haystack.utils.misc import _deduplicate_documents
Expand Down Expand Up @@ -133,7 +135,7 @@ def _compose_response(
:returns: A dictionary with the following keys:
- `documents`: A list of reranked documents.

:raises requests.exceptions.RequestException:
:raises RuntimeError:
- If the API request fails.

:raises RuntimeError:
Expand Down Expand Up @@ -186,7 +188,7 @@ def run(
:returns: A dictionary with the following keys:
- `documents`: A list of reranked documents.

:raises requests.exceptions.RequestException:
:raises RuntimeError:
- If the API request fails.

:raises RuntimeError:
Expand All @@ -211,15 +213,18 @@ def run(
headers["Authorization"] = f"Bearer {self.token.resolve_value()}"

# Call the external service with retry
response = request_with_retry(
method="POST",
url=urljoin(self.url, "/rerank"),
json=payload,
timeout=self.timeout,
headers=headers,
attempts=self.max_retries,
status_codes_to_retry=self.retry_status_codes,
)
try:
response = request_with_retry(
method="POST",
url=urljoin(self.url, "/rerank"),
json=payload,
timeout=self.timeout,
headers=headers,
attempts=self.max_retries,
status_codes_to_retry=self.retry_status_codes,
)
except httpx.HTTPStatusError as e:
raise RuntimeError(f"HuggingFaceTEIRanker API call failed. Error: {e}, Response: {e.response.text}") from e

result: dict[str, str] | list[dict[str, Any]] = response.json()

Expand Down Expand Up @@ -270,15 +275,18 @@ async def run_async(
headers["Authorization"] = f"Bearer {self.token.resolve_value()}"

# Call the external service with retry
response = await async_request_with_retry(
method="POST",
url=urljoin(self.url, "/rerank"),
json=payload,
timeout=self.timeout,
headers=headers,
attempts=self.max_retries,
status_codes_to_retry=self.retry_status_codes,
)
try:
response = await async_request_with_retry(
method="POST",
url=urljoin(self.url, "/rerank"),
json=payload,
timeout=self.timeout,
headers=headers,
attempts=self.max_retries,
status_codes_to_retry=self.retry_status_codes,
)
except httpx.HTTPStatusError as e:
raise RuntimeError(f"HuggingFaceTEIRanker API call failed. Error: {e}, Response: {e.response.text}") from e

result: dict[str, str] | list[dict[str, Any]] = response.json()

Expand Down
10 changes: 10 additions & 0 deletions haystack/components/websearch/searchapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def run(self, query: str) -> dict[str, list[Document] | list[str]]:
except httpx.ConnectTimeout as error:
raise TimeoutError(f"Request to {self.__class__.__name__} timed out.") from error

except httpx.HTTPStatusError as e:
raise SearchApiError(
f"An error occurred while querying {self.__class__.__name__}. Error: {e}, Response: {e.response.text}"
) from e

except httpx.HTTPError as e:
raise SearchApiError(f"An error occurred while querying {self.__class__.__name__}. Error: {e}") from e

Expand Down Expand Up @@ -149,6 +154,11 @@ async def run_async(self, query: str) -> dict[str, list[Document] | list[str]]:
except httpx.ConnectTimeout as error:
raise TimeoutError(f"Request to {self.__class__.__name__} timed out.") from error

except httpx.HTTPStatusError as e:
raise SearchApiError(
f"An error occurred while querying {self.__class__.__name__}. Error: {e}, Response: {e.response.text}"
) from e

except httpx.HTTPError as e:
raise SearchApiError(f"An error occurred while querying {self.__class__.__name__}. Error: {e}") from e

Expand Down
10 changes: 10 additions & 0 deletions haystack/components/websearch/serper_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ def run(self, query: str) -> dict[str, list[Document] | list[str]]:
except httpx.ConnectTimeout as error:
raise TimeoutError(f"Request to {self.__class__.__name__} timed out.") from error

except httpx.HTTPStatusError as e:
raise SerperDevError(
f"An error occurred while querying {self.__class__.__name__}. Error: {e}, Response: {e.response.text}"
) from e

except httpx.HTTPError as e:
raise SerperDevError(f"An error occurred while querying {self.__class__.__name__}. Error: {e}") from e

Expand Down Expand Up @@ -194,6 +199,11 @@ async def run_async(self, query: str) -> dict[str, list[Document] | list[str]]:
except httpx.ConnectTimeout as error:
raise TimeoutError(f"Request to {self.__class__.__name__} timed out.") from error

except httpx.HTTPStatusError as e:
raise SerperDevError(
f"An error occurred while querying {self.__class__.__name__}. Error: {e}, Response: {e.response.text}"
) from e

except httpx.HTTPError as e:
raise SerperDevError(f"An error occurred while querying {self.__class__.__name__}. Error: {e}") from e

Expand Down
4 changes: 2 additions & 2 deletions haystack/core/pipeline/draw.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import zlib
from typing import Any

import httpx
import networkx
import requests

from haystack import logging
from haystack.core.errors import PipelineDrawingError
Expand Down Expand Up @@ -234,7 +234,7 @@ def _to_mermaid_image(

logger.debug("Rendering graph at {url}", url=url)
try:
resp = requests.get(url, timeout=timeout)
resp = httpx.get(url, timeout=timeout)
if resp.status_code >= 400:
logger.warning(
"Failed to draw the pipeline: {server_url} returned status {status_code}",
Expand Down
43 changes: 14 additions & 29 deletions haystack/utils/requests_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
from typing import Any

import httpx
import requests
from tenacity import after_log, before_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential

logger = logging.getLogger(__file__)


def request_with_retry(
attempts: int = 3, status_codes_to_retry: list[int] | None = None, **kwargs: Any
) -> requests.Response:
) -> httpx.Response:
"""
Executes an HTTP request with a configurable exponential backoff retry on failures.

Expand All @@ -35,26 +34,11 @@ def request_with_retry(
# Sending an HTTP request with custom timeout in seconds
res = request_with_retry(method="GET", url="https://example.com", timeout=5)

# Sending an HTTP request with custom authorization handling
class CustomAuth(requests.auth.AuthBase):
def __call__(self, r):
r.headers["authorization"] = "Basic <my_token_here>"
return r

res = request_with_retry(method="GET", url="https://example.com", auth=CustomAuth())

# All of the above combined
res = request_with_retry(
method="GET",
url="https://example.com",
auth=CustomAuth(),
attempts=10,
status_codes_to_retry=[408, 503],
timeout=5
)
# Sending an HTTP request with custom headers
res = request_with_retry(method="GET", url="https://example.com", headers={"Authorization": "Bearer <token>"})

# Sending a POST request
res = request_with_retry(method="POST", url="https://example.com", data={"key": "value"}, attempts=10)
res = request_with_retry(method="POST", url="https://example.com", json={"key": "value"}, attempts=10)

# Retry all 5xx status codes
res = request_with_retry(method="GET", url="https://example.com", status_codes_to_retry=list(range(500, 600)))
Expand All @@ -66,9 +50,9 @@ def __call__(self, r):
List of HTTP status codes that will trigger a retry.
When param is `None`, HTTP 408, 418, 429 and 503 will be retried.
:param kwargs:
Optional arguments that `request` accepts.
Optional arguments that `httpx.Client.request` accepts.
:returns:
The `Response` object.
The `httpx.Response` object.
"""

if status_codes_to_retry is None:
Expand All @@ -77,20 +61,21 @@ def __call__(self, r):
@retry(
reraise=True,
wait=wait_exponential(),
retry=retry_if_exception_type((requests.HTTPError, TimeoutError)),
retry=retry_if_exception_type((httpx.HTTPError, TimeoutError)),
stop=stop_after_attempt(attempts),
before=before_log(logger, logging.DEBUG),
after=after_log(logger, logging.DEBUG),
)
def run() -> requests.Response:
def run() -> httpx.Response:
timeout = kwargs.pop("timeout", 10)
res = requests.request(**kwargs, timeout=timeout)
with httpx.Client() as client:
res = client.request(**kwargs, timeout=timeout)

if res.status_code in status_codes_to_retry:
# We raise only for the status codes that must trigger a retry
res.raise_for_status()
if res.status_code in status_codes_to_retry:
# We raise only for the status codes that must trigger a retry
res.raise_for_status()

return res
return res

res = run()
# We raise here too in case the request failed with a status code that
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ dependencies = [
"more-itertools", # TextDocumentSplitter
"networkx", # Pipeline graphs
"typing_extensions>=4.7", # Extended typing features (NotRequired, etc.)
"requests",
"httpx",
"numpy",
"python-dateutil",
"jsonschema", # JsonSchemaValidator, Tool
Expand Down
13 changes: 13 additions & 0 deletions releasenotes/notes/httpx-3f5560ae23ab91b1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
enhancements:
- |
Standardize HTTP request handling in Haystack by adopting ``httpx`` for both synchronous and asynchronous requests,
replacing ``requests``. Error reporting for failed requests has also been improved: exceptions now include
additional details alongside the reason field.
upgrade:
- |
As part of the migration from ``requests`` to ``httpx``, ``request_with_retry`` and ``async_request_with_retry``
(in ``haystack.utils.requests_utils``) no longer raise ``requests.exceptions.RequestException`` on failure;
they now raise ``httpx.HTTPError`` instead. This also affects ``HuggingFaceTEIRanker``, which relies on these
utilities. Users catching ``requests.exceptions.RequestException`` should update their code to catch
``httpx.HTTPError``.
9 changes: 5 additions & 4 deletions test/components/connectors/test_openapi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import Any
from unittest.mock import MagicMock, Mock, patch

import httpx
import pytest
import requests
from openapi3 import OpenAPI

from haystack import Pipeline
Expand Down Expand Up @@ -266,10 +266,11 @@ def prepare_fc_params(openai_functions_schema: dict[str, Any]) -> dict[str, Any]
pipe.connect("openapi_container.service_response", "final_prompt_adapter.service_response")
pipe.connect("final_prompt_adapter", "llm.messages")

serperdev_spec = requests.get(
"https://gist.githubusercontent.com/vblagoje/241a000f2a77c76be6efba71d49e2856/raw/722ccc7fe6170a744afce3e3fb3a30fdd095c184/serper.json"
serperdev_spec = httpx.get(
"https://gist.githubusercontent.com/vblagoje/241a000f2a77c76be6efba71d49e2856/raw/722ccc7fe6170a744afce3e3fb3a30fdd095c184/serper.json",
follow_redirects=True,
).json()
system_prompt = requests.get("https://bit.ly/serper_dev_system").text
system_prompt = httpx.get("https://bit.ly/serper_dev_system", follow_redirects=True).text

query = "Why did Elon Musk sue OpenAI?"

Expand Down
11 changes: 5 additions & 6 deletions test/components/rankers/test_hugging_face_tei.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import httpx
import pytest
import requests

from haystack import Document
from haystack.components.rankers.hugging_face_tei import HuggingFaceTEIRanker, TruncationDirection
Expand Down Expand Up @@ -93,7 +92,7 @@ def test_empty_documents(self, del_hf_env_vars):
def test_run_with_mock(self, mock_request, del_hf_env_vars):
"""Test run method with mocked API response"""
# Setup mock response
mock_response = MagicMock(spec=requests.Response)
mock_response = MagicMock(spec=httpx.Response)
mock_response.json.return_value = [
{"index": 2, "score": 0.95},
{"index": 1, "score": 0.85},
Expand Down Expand Up @@ -141,7 +140,7 @@ def test_run_with_mock(self, mock_request, del_hf_env_vars):
def test_run_with_truncation_direction(self, mock_request, del_hf_env_vars):
"""Test run method with truncation direction parameter"""
# Setup mock response
mock_response = MagicMock(spec=requests.Response)
mock_response = MagicMock(spec=httpx.Response)
mock_response.json.return_value = [{"index": 0, "score": 0.95}]
mock_request.return_value = mock_response

Expand Down Expand Up @@ -174,7 +173,7 @@ def test_run_with_truncation_direction(self, mock_request, del_hf_env_vars):
def test_run_with_custom_top_k(self, mock_request, del_hf_env_vars):
"""Test run method with custom top_k parameter"""
# Setup mock response with 5 documents
mock_response = MagicMock(spec=requests.Response)
mock_response = MagicMock(spec=httpx.Response)
mock_response.json.return_value = [
{"index": 4, "score": 0.95},
{"index": 3, "score": 0.90},
Expand Down Expand Up @@ -210,7 +209,7 @@ def test_run_with_custom_top_k(self, mock_request, del_hf_env_vars):
@patch("haystack.components.rankers.hugging_face_tei.request_with_retry")
def test_run_deduplicates_documents(self, mock_request, del_hf_env_vars):
"""Test that duplicate documents are removed before sending to the API."""
mock_response = MagicMock(spec=requests.Response)
mock_response = MagicMock(spec=httpx.Response)
mock_response.json.return_value = [{"index": 1, "score": 0.9}, {"index": 0, "score": 0.2}]
mock_request.return_value = mock_response

Expand Down Expand Up @@ -241,7 +240,7 @@ def test_run_deduplicates_documents(self, mock_request, del_hf_env_vars):
def test_error_handling(self, mock_request, del_hf_env_vars):
"""Test error handling in the ranker"""
# Setup mock response with error
mock_response = MagicMock(spec=requests.Response)
mock_response = MagicMock(spec=httpx.Response)
mock_response.json.return_value = {"error": "Some error occurred", "error_type": "TestError"}
mock_request.return_value = mock_response

Expand Down
Loading
Loading