Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 68 additions & 6 deletions haystack/components/fetchers/link_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

DEFAULT_USER_AGENT = f"haystack/LinkContentFetcher/{__version__}"

DEFAULT_MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MiB

REQUEST_HEADERS = {
"accept": "*/*",
"User-Agent": DEFAULT_USER_AGENT,
Expand Down Expand Up @@ -121,6 +123,7 @@ def __init__(
http2: bool = False,
client_kwargs: dict | None = None,
request_headers: dict[str, str] | None = None,
max_response_size: int | None = DEFAULT_MAX_RESPONSE_SIZE,
) -> None:
"""
Initializes the component.
Expand All @@ -135,6 +138,10 @@ def __init__(
Requires the 'h2' package to be installed (via `pip install httpx[http2]`).
:param client_kwargs: Additional keyword arguments to pass to the httpx client.
If `None`, default values are used.
:param max_response_size: Maximum number of bytes to read from a single response body.
If a server returns more bytes than this limit (or advertises a larger
`Content-Length`), the request is aborted with an error. Defaults to
10 MiB. Pass `None` to disable the limit and read the full body.
"""
self.raise_on_failure = raise_on_failure
self.user_agents = user_agents or [DEFAULT_USER_AGENT]
Expand All @@ -144,6 +151,7 @@ def __init__(
self.http2 = http2
self.client_kwargs = client_kwargs or {}
self.request_headers = request_headers or {}
self.max_response_size = max_response_size

# Configure default client settings
self.client_kwargs.setdefault("timeout", timeout)
Expand Down Expand Up @@ -189,12 +197,68 @@ def __init__(
after=self._switch_user_agent,
)
def get_response(url: str) -> httpx.Response:
response = self._client.get(url, headers=self._get_headers())
response.raise_for_status()
return response
return self._stream_get_capped(url)

self._get_response: Callable = get_response

def _check_advertised_length(self, response: httpx.Response) -> None:
if self.max_response_size is None:
return
cl_str = response.headers.get("content-length")
if cl_str is None:
return
try:
cl = int(cl_str)
except ValueError:
return
if cl > self.max_response_size:
raise httpx.RequestError(
f"Response body advertises {cl} bytes, exceeding max_response_size="
f"{self.max_response_size}. Aborting download.",
request=response.request,
)

def _stream_get_capped(self, url: str) -> httpx.Response:
"""
Issue a GET that aborts if the response body exceeds ``self.max_response_size``.

Uses ``httpx.Client.stream`` so that bytes are read incrementally and the connection
can be torn down before a malicious or unexpectedly large body is fully received.
The captured bytes are stashed back on the response so the existing handlers
(``response.text`` / ``response.content``) keep working unchanged.
"""
with self._client.stream("GET", url, headers=self._get_headers()) as response:
response.raise_for_status()
self._check_advertised_length(response)
buffer = bytearray()
for chunk in response.iter_bytes():
if self.max_response_size is not None and len(buffer) + len(chunk) > self.max_response_size:
raise httpx.RequestError(
f"Response body exceeded max_response_size={self.max_response_size} bytes. "
"Aborting download.",
request=response.request,
)
buffer.extend(chunk)
response._content = bytes(buffer)
return response

async def _stream_get_capped_async(self, url: str, client: httpx.AsyncClient) -> httpx.Response:
"""Async variant of :meth:`_stream_get_capped`."""
async with client.stream("GET", url, headers=self._get_headers()) as response:
response.raise_for_status()
self._check_advertised_length(response)
buffer = bytearray()
async for chunk in response.aiter_bytes():
if self.max_response_size is not None and len(buffer) + len(chunk) > self.max_response_size:
raise httpx.RequestError(
f"Response body exceeded max_response_size={self.max_response_size} bytes. "
"Aborting download.",
request=response.request,
)
buffer.extend(chunk)
response._content = bytes(buffer)
return response

def _get_headers(self) -> dict[str, str]:
"""
Build headers with precedence
Expand Down Expand Up @@ -404,9 +468,7 @@ async def _get_response_async(self, url: str, client: httpx.AsyncClient) -> http

while attempt <= self.retry_attempts:
try:
response = await client.get(url, headers=self._get_headers())
response.raise_for_status()
return response
return await self._stream_get_capped_async(url, client)
except (httpx.HTTPStatusError, httpx.RequestError) as e:
last_exception = e
attempt += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
enhancements:
- |
``LinkContentFetcher`` now caps the size of fetched response bodies. A new
``max_response_size`` constructor parameter (default: 10 MiB) bounds the
number of bytes read from each URL; responses that advertise a larger
``Content-Length``, or that stream more bytes than the cap before EOF, are
aborted with an ``httpx.RequestError``. Pass ``max_response_size=None`` to
restore the previous unbounded behavior. Both the synchronous and
asynchronous fetch paths now stream the response and stop reading as
soon as the cap is exceeded.