Skip to content

Commit c0a74b9

Browse files
vdusekclaude
andauthored
fix: Eliminate race condition in _fetch_requests_from_url (#796)
## Summary - `_fetch_requests_from_url` used `add_done_callback` + `asyncio.create_task` to process HTTP responses, but `asyncio.gather` only awaited the HTTP request tasks — not the callback-spawned processing tasks - This caused `created_requests` to be returned before processing completed, yielding empty or incomplete results - Refactored to gather HTTP responses first, then process each response sequentially, ensuring all extracted URLs are collected before returning ## Test plan - [x] CI passes Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 28b87ec commit c0a74b9

File tree

1 file changed

+37
-61
lines changed

1 file changed

+37
-61
lines changed

src/apify/request_loaders/_apify_request_list.py

Lines changed: 37 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import asyncio
44
import re
5-
from asyncio import Task
5+
from itertools import chain
66
from typing import Annotated, Any
77

88
from pydantic import BaseModel, Field, TypeAdapter
@@ -44,8 +44,10 @@ class ApifyRequestList(RequestList):
4444
Method open is used to create RequestList from actor's requestListSources input.
4545
"""
4646

47-
@staticmethod
47+
@classmethod
4848
async def open(
49+
cls,
50+
*,
4951
name: str | None = None,
5052
request_list_sources_input: list[dict[str, Any]] | None = None,
5153
http_client: HttpClient | None = None,
@@ -73,12 +75,7 @@ async def open(
7375
```
7476
"""
7577
request_list_sources_input = request_list_sources_input or []
76-
return await ApifyRequestList._create_request_list(name, request_list_sources_input, http_client)
7778

78-
@staticmethod
79-
async def _create_request_list(
80-
name: str | None, request_list_sources_input: list[dict[str, Any]], http_client: HttpClient | None
81-
) -> ApifyRequestList:
8279
if not http_client:
8380
http_client = ImpitHttpClient()
8481

@@ -87,15 +84,30 @@ async def _create_request_list(
8784
simple_url_inputs = [url_input for url_input in url_inputs if isinstance(url_input, _SimpleUrlInput)]
8885
remote_url_inputs = [url_input for url_input in url_inputs if isinstance(url_input, _RequestsFromUrlInput)]
8986

90-
simple_url_requests = ApifyRequestList._create_requests_from_input(simple_url_inputs)
91-
remote_url_requests = await ApifyRequestList._fetch_requests_from_url(
92-
remote_url_inputs, http_client=http_client
93-
)
87+
simple_url_requests = cls._create_requests_from_input(simple_url_inputs)
88+
remote_url_requests = await cls._fetch_requests_from_url(remote_url_inputs, http_client)
9489

9590
return ApifyRequestList(name=name, requests=simple_url_requests + remote_url_requests)
9691

92+
@classmethod
93+
async def _fetch_requests_from_url(
94+
cls,
95+
remote_url_requests_inputs: list[_RequestsFromUrlInput],
96+
http_client: HttpClient,
97+
) -> list[Request]:
98+
"""Create list of requests from url.
99+
100+
Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Extract links from
101+
each response body using URL_NO_COMMAS_REGEX regex. Create list of Requests from collected links and additional
102+
inputs stored in other attributes of each remote_url_requests_inputs.
103+
"""
104+
tasks = [cls._process_remote_url(request_input, http_client) for request_input in remote_url_requests_inputs]
105+
results = await asyncio.gather(*tasks)
106+
return list(chain.from_iterable(results))
107+
97108
@staticmethod
98109
def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> list[Request]:
110+
"""Create `Request` objects from simple URL inputs."""
99111
return [
100112
Request.from_url(
101113
method=request_input.method,
@@ -108,55 +120,19 @@ def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> lis
108120
]
109121

110122
@staticmethod
111-
async def _fetch_requests_from_url(
112-
remote_url_requests_inputs: list[_RequestsFromUrlInput],
113-
http_client: HttpClient,
114-
) -> list[Request]:
115-
"""Create list of requests from url.
116-
117-
Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Run extracting
118-
callback on each response body and use URL_NO_COMMAS_REGEX regex to find all links. Create list of Requests from
119-
collected links and additional inputs stored in other attributes of each remote_url_requests_inputs.
120-
"""
121-
created_requests: list[Request] = []
122-
123-
async def create_requests_from_response(request_input: _RequestsFromUrlInput, task: Task) -> None:
124-
"""Extract links from response body and use them to create `Request` objects.
125-
126-
Use the regular expression to find all matching links in the response body, then create `Request`
127-
objects from these links and the provided input attributes.
128-
"""
129-
response = await (task.result()).read()
130-
matches = re.finditer(URL_NO_COMMAS_REGEX, response.decode('utf-8'))
131-
132-
created_requests.extend(
133-
[
134-
Request.from_url(
135-
match.group(0),
136-
method=request_input.method,
137-
payload=request_input.payload.encode('utf-8'),
138-
headers=request_input.headers,
139-
user_data=request_input.user_data,
140-
)
141-
for match in matches
142-
]
143-
)
123+
async def _process_remote_url(request_input: _RequestsFromUrlInput, http_client: HttpClient) -> list[Request]:
124+
"""Fetch a remote URL and extract links from the response body."""
125+
http_response = await http_client.send_request(method='GET', url=request_input.requests_from_url)
126+
response_body = await http_response.read()
127+
matches = re.finditer(URL_NO_COMMAS_REGEX, response_body.decode('utf-8'))
144128

145-
remote_url_requests = []
146-
for remote_url_requests_input in remote_url_requests_inputs:
147-
get_response_task = asyncio.create_task(
148-
http_client.send_request(
149-
method='GET',
150-
url=remote_url_requests_input.requests_from_url,
151-
)
152-
)
153-
154-
get_response_task.add_done_callback(
155-
lambda task, inp=remote_url_requests_input: asyncio.create_task(
156-
create_requests_from_response(inp, task)
157-
)
129+
return [
130+
Request.from_url(
131+
url=match.group(0),
132+
method=request_input.method,
133+
payload=request_input.payload.encode('utf-8'),
134+
headers=request_input.headers,
135+
user_data=request_input.user_data,
158136
)
159-
remote_url_requests.append(get_response_task)
160-
161-
await asyncio.gather(*remote_url_requests)
162-
return created_requests
137+
for match in matches
138+
]

0 commit comments

Comments
 (0)