Skip to content

Commit 01305dd

Browse files
committed
Refactor utils
1 parent c338062 commit 01305dd

File tree

18 files changed

+683
-817
lines changed

18 files changed

+683
-817
lines changed

src/apify_client/_http_client.py

Lines changed: 122 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,36 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import gzip
45
import json as jsonlib
56
import logging
67
import os
8+
import random
79
import sys
10+
import time
811
from datetime import datetime, timezone
912
from http import HTTPStatus
1013
from importlib import metadata
11-
from typing import TYPE_CHECKING, Any
14+
from typing import TYPE_CHECKING, Any, TypeVar
1215
from urllib.parse import urlencode
1316

1417
import impit
1518

1619
from apify_client._logging import log_context, logger_name
1720
from apify_client._statistics import ClientStatistics
18-
from apify_client._utils import is_retryable_error, retry_with_exp_backoff, retry_with_exp_backoff_async
19-
from apify_client.errors import ApifyApiError
21+
from apify_client.errors import ApifyApiError, InvalidResponseBodyError
2022

2123
if TYPE_CHECKING:
22-
from collections.abc import Callable
24+
from collections.abc import Awaitable, Callable
2325

2426
from apify_client._config import ClientConfig
2527
from apify_client._consts import JsonSerializable
2628

2729
DEFAULT_BACKOFF_EXPONENTIAL_FACTOR = 2
2830
DEFAULT_BACKOFF_RANDOM_FACTOR = 1
2931

32+
T = TypeVar('T')
33+
3034
logger = logging.getLogger(logger_name)
3135

3236

@@ -99,6 +103,26 @@ def _parse_params(params: dict | None) -> dict | None:
99103

100104
return parsed_params
101105

106+
@staticmethod
107+
def _is_retryable_error(exc: Exception) -> bool:
108+
"""Check if an exception should be retried.
109+
110+
Args:
111+
exc: The exception to check.
112+
113+
Returns:
114+
True if the exception is retryable (network errors, timeouts, etc.).
115+
"""
116+
return isinstance(
117+
exc,
118+
(
119+
InvalidResponseBodyError,
120+
impit.NetworkError,
121+
impit.TimeoutException,
122+
impit.RemoteProtocolError,
123+
),
124+
)
125+
102126
def _prepare_request_call(
103127
self,
104128
headers: dict | None = None,
@@ -201,7 +225,7 @@ def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response:
201225

202226
except Exception as exc:
203227
logger.debug('Request threw exception', exc_info=exc)
204-
if not is_retryable_error(exc):
228+
if not self._is_retryable_error(exc):
205229
logger.debug('Exception is not retryable', exc_info=exc)
206230
stop_retrying()
207231
raise
@@ -217,14 +241,59 @@ def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response:
217241
response.read()
218242
raise ApifyApiError(response, attempt, method=method)
219243

220-
return retry_with_exp_backoff(
244+
return self._retry_with_exp_backoff(
221245
_make_request,
222246
max_retries=self._config.max_retries,
223247
backoff_base_millis=self._config.min_delay_between_retries_millis,
224248
backoff_factor=DEFAULT_BACKOFF_EXPONENTIAL_FACTOR,
225249
random_factor=DEFAULT_BACKOFF_RANDOM_FACTOR,
226250
)
227251

252+
@staticmethod
253+
def _retry_with_exp_backoff(
254+
func: Callable[[Callable[[], None], int], T],
255+
*,
256+
max_retries: int = 8,
257+
backoff_base_millis: int = 500,
258+
backoff_factor: float = 2,
259+
random_factor: float = 1,
260+
) -> T:
261+
"""Retry a function with exponential backoff.
262+
263+
Args:
264+
func: Function to retry. Receives a stop_retrying callback and attempt number.
265+
max_retries: Maximum number of retry attempts.
266+
backoff_base_millis: Base backoff delay in milliseconds.
267+
backoff_factor: Exponential backoff multiplier (1-10).
268+
random_factor: Random jitter factor (0-1).
269+
270+
Returns:
271+
The return value of the function.
272+
"""
273+
random_factor = min(max(0, random_factor), 1)
274+
backoff_factor = min(max(1, backoff_factor), 10)
275+
swallow = True
276+
277+
def stop_retrying() -> None:
278+
nonlocal swallow
279+
swallow = False
280+
281+
for attempt in range(1, max_retries + 1):
282+
try:
283+
return func(stop_retrying, attempt)
284+
except Exception:
285+
if not swallow:
286+
raise
287+
288+
random_sleep_factor = random.uniform(1, 1 + random_factor)
289+
backoff_base_secs = backoff_base_millis / 1000
290+
backoff_exp_factor = backoff_factor ** (attempt - 1)
291+
292+
sleep_time_secs = random_sleep_factor * backoff_base_secs * backoff_exp_factor
293+
time.sleep(sleep_time_secs)
294+
295+
return func(stop_retrying, max_retries + 1)
296+
228297

229298
class HttpClientAsync(_BaseHttpClient):
230299
async def call(
@@ -279,7 +348,7 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response
279348

280349
except Exception as exc:
281350
logger.debug('Request threw exception', exc_info=exc)
282-
if not is_retryable_error(exc):
351+
if not self._is_retryable_error(exc):
283352
logger.debug('Exception is not retryable', exc_info=exc)
284353
stop_retrying()
285354
raise
@@ -295,10 +364,55 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response
295364
await response.aread()
296365
raise ApifyApiError(response, attempt, method=method)
297366

298-
return await retry_with_exp_backoff_async(
367+
return await self._retry_with_exp_backoff(
299368
_make_request,
300369
max_retries=self._config.max_retries,
301370
backoff_base_millis=self._config.min_delay_between_retries_millis,
302371
backoff_factor=DEFAULT_BACKOFF_EXPONENTIAL_FACTOR,
303372
random_factor=DEFAULT_BACKOFF_RANDOM_FACTOR,
304373
)
374+
375+
@staticmethod
376+
async def _retry_with_exp_backoff(
377+
func: Callable[[Callable[[], None], int], Awaitable[T]],
378+
*,
379+
max_retries: int = 8,
380+
backoff_base_millis: int = 500,
381+
backoff_factor: float = 2,
382+
random_factor: float = 1,
383+
) -> T:
384+
"""Retry a function with exponential backoff.
385+
386+
Args:
387+
func: Function to retry. Receives a stop_retrying callback and attempt number.
388+
max_retries: Maximum number of retry attempts.
389+
backoff_base_millis: Base backoff delay in milliseconds.
390+
backoff_factor: Exponential backoff multiplier (1-10).
391+
random_factor: Random jitter factor (0-1).
392+
393+
Returns:
394+
The return value of the function.
395+
"""
396+
random_factor = min(max(0, random_factor), 1)
397+
backoff_factor = min(max(1, backoff_factor), 10)
398+
swallow = True
399+
400+
def stop_retrying() -> None:
401+
nonlocal swallow
402+
swallow = False
403+
404+
for attempt in range(1, max_retries + 1):
405+
try:
406+
return await func(stop_retrying, attempt)
407+
except Exception:
408+
if not swallow:
409+
raise
410+
411+
random_sleep_factor = random.uniform(1, 1 + random_factor)
412+
backoff_base_secs = backoff_base_millis / 1000
413+
backoff_exp_factor = backoff_factor ** (attempt - 1)
414+
415+
sleep_time_secs = random_sleep_factor * backoff_base_secs * backoff_exp_factor
416+
await asyncio.sleep(sleep_time_secs)
417+
418+
return await func(stop_retrying, max_retries + 1)

src/apify_client/_resource_clients/_resource_client.py

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
from __future__ import annotations
22

3+
import asyncio
4+
import math
5+
import time
6+
from datetime import datetime, timezone
37
from typing import TYPE_CHECKING, Any
48
from urllib.parse import urlencode
59

10+
from apify_client._consts import DEFAULT_WAIT_FOR_FINISH_SEC, DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC, ActorJobStatus
611
from apify_client._logging import WithLogDetailsClient
7-
from apify_client._utils import to_safe_id
12+
from apify_client._utils import catch_not_found_or_throw, response_to_dict, to_safe_id
13+
from apify_client.errors import ApifyApiError, ApifyClientError
814

915
if TYPE_CHECKING:
1016
from apify_client._client_classes import ClientRegistry, ClientRegistryAsync
@@ -117,6 +123,77 @@ def _build_params(self, **kwargs: Any) -> dict:
117123
merged = {**self._default_params, **kwargs}
118124
return {k: v for k, v in merged.items() if v is not None}
119125

126+
def _wait_for_finish(
127+
self,
128+
url: str,
129+
params: dict,
130+
wait_secs: int | None = None,
131+
) -> dict | None:
132+
"""Wait synchronously for an Actor job (run or build) to finish.
133+
134+
Polls the job status until it reaches a terminal state or timeout.
135+
Handles 404 errors gracefully (job might not exist yet in replicas).
136+
137+
Args:
138+
url: Full URL to the job endpoint.
139+
params: Base query parameters to include in each request.
140+
wait_secs: Maximum seconds to wait (None = indefinite).
141+
142+
Returns:
143+
Job data dict when finished, or None if job doesn't exist after
144+
DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC seconds.
145+
146+
Raises:
147+
ApifyApiError: If API returns errors other than 404.
148+
"""
149+
started_at = datetime.now(timezone.utc)
150+
should_repeat = True
151+
job: dict | None = None
152+
seconds_elapsed = 0
153+
154+
while should_repeat:
155+
wait_for_finish = DEFAULT_WAIT_FOR_FINISH_SEC
156+
if wait_secs is not None:
157+
wait_for_finish = wait_secs - seconds_elapsed
158+
159+
try:
160+
response = self._http_client.call(
161+
url=url,
162+
method='GET',
163+
params={**params, 'waitForFinish': wait_for_finish},
164+
)
165+
job_response = response_to_dict(response)
166+
job = job_response.get('data') if isinstance(job_response, dict) else job_response
167+
seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
168+
169+
if not isinstance(job, dict):
170+
raise ApifyClientError(
171+
f'Unexpected response format received from the API. '
172+
f'Expected dict with "status" field, got: {type(job).__name__}'
173+
)
174+
175+
is_terminal = ActorJobStatus(job['status']).is_terminal
176+
is_timed_out = wait_secs is not None and seconds_elapsed >= wait_secs
177+
if is_terminal or is_timed_out:
178+
should_repeat = False
179+
180+
if not should_repeat:
181+
# Early return here so that we avoid the sleep below if not needed
182+
return job
183+
184+
except ApifyApiError as exc:
185+
catch_not_found_or_throw(exc)
186+
187+
# If there are still not found errors after DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC, we give up
188+
# and return None. In such case, the requested record probably really doesn't exist.
189+
if seconds_elapsed > DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC:
190+
return None
191+
192+
# It might take some time for database replicas to get up-to-date so sleep a bit before retrying
193+
time.sleep(0.25)
194+
195+
return job
196+
120197

121198
class ResourceClientAsync(metaclass=WithLogDetailsClient):
122199
"""Base class for asynchronous resource clients.
@@ -223,3 +300,74 @@ def _build_params(self, **kwargs: Any) -> dict:
223300
"""
224301
merged = {**self._default_params, **kwargs}
225302
return {k: v for k, v in merged.items() if v is not None}
303+
304+
async def _wait_for_finish(
305+
self,
306+
url: str,
307+
params: dict,
308+
wait_secs: int | None = None,
309+
) -> dict | None:
310+
"""Wait synchronously for an Actor job (run or build) to finish.
311+
312+
Polls the job status until it reaches a terminal state or timeout.
313+
Handles 404 errors gracefully (job might not exist yet in replicas).
314+
315+
Args:
316+
url: Full URL to the job endpoint.
317+
params: Base query parameters to include in each request.
318+
wait_secs: Maximum seconds to wait (None = indefinite).
319+
320+
Returns:
321+
Job data dict when finished, or None if job doesn't exist after
322+
DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC seconds.
323+
324+
Raises:
325+
ApifyApiError: If API returns errors other than 404.
326+
"""
327+
started_at = datetime.now(timezone.utc)
328+
should_repeat = True
329+
job: dict | None = None
330+
seconds_elapsed = 0
331+
332+
while should_repeat:
333+
wait_for_finish = DEFAULT_WAIT_FOR_FINISH_SEC
334+
if wait_secs is not None:
335+
wait_for_finish = wait_secs - seconds_elapsed
336+
337+
try:
338+
response = await self._http_client.call(
339+
url=url,
340+
method='GET',
341+
params={**params, 'waitForFinish': wait_for_finish},
342+
)
343+
job_response = response_to_dict(response)
344+
job = job_response.get('data') if isinstance(job_response, dict) else job_response
345+
346+
if not isinstance(job, dict):
347+
raise ApifyClientError(
348+
f'Unexpected response format received from the API. '
349+
f'Expected dict with "status" field, got: {type(job).__name__}'
350+
)
351+
352+
seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
353+
is_terminal = ActorJobStatus(job['status']).is_terminal
354+
is_timed_out = wait_secs is not None and seconds_elapsed >= wait_secs
355+
if is_terminal or is_timed_out:
356+
should_repeat = False
357+
358+
if not should_repeat:
359+
# Early return here so that we avoid the sleep below if not needed
360+
return job
361+
362+
except ApifyApiError as exc:
363+
catch_not_found_or_throw(exc)
364+
365+
# If there are still not found errors after DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC, we give up
366+
# and return None. In such case, the requested record probably really doesn't exist.
367+
if seconds_elapsed > DEFAULT_WAIT_WHEN_JOB_NOT_EXIST_SEC:
368+
return None
369+
370+
# It might take some time for database replicas to get up-to-date so sleep a bit before retrying
371+
await asyncio.sleep(0.25)
372+
373+
return job

0 commit comments

Comments
 (0)