Skip to content

Commit ae12935

Browse files
authored
fix(scrapy): async-thread startup race, shutdown lifecycle, and timeout setting (#979)
## Description Fixes several defects in the Scrapy integration's background event-loop thread (`AsyncThread`), the scheduler, and the HTTP cache storage, and makes the loop timeout configurable. ## Fixes - **`run_coro` startup race** — the `is_running()` guard fired spuriously when a coroutine was submitted before the loop thread reached `run_forever()` (observed ~122/500 in `scheduler.open()`). It now guards on `is_closed()`. A coroutine queued on a not-yet-running loop runs once the loop starts; only a closed loop raises. - **`close()` thread leak** — if task cancellation timed out or raised, the loop was never stopped or joined. Stop, join, and the forced-shutdown fallback now run in a `finally`, and the original error still propagates. - **`close()` second call** — a repeated close raised `RuntimeError: Event loop is closed`. An `is_closed()` early-return makes it a no-op. - **`close()` ignored its `timeout`** for the cancellation step (it used the constructor default). It now passes the caller's timeout through. - **`run_coro` timeout** left the coroutine running. It now cancels the future on timeout. - **HTTP cache open/cleanup thread leaks** — `open_spider` now closes the thread if opening the key-value store fails (matching `ApifyScheduler.open`). The expiration sweep runs inside `try` with `close()` in a `finally`. - **Configurable timeout (#955)** — new `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting, wired into the scheduler (via `from_crawler`) and the cache storage. ## Error logging The integration now follows consistent conventions for caught exceptions: - **`except … as exc:` → `logger.warning(f'… {exc}')`, swallowed** — for *expected, recoverable* conditions handled locally: a malformed or legacy stored payload skipped as a cache/queue miss, or non-UTF-8 headers preserved in the serialized request. A short message plus the exception text, with no traceback, because it is not a bug. - **`except Exception:` → `logger.exception('…')`, swallowed** — for *unexpected* failures handled at a terminal point: the cleanup sweep, shutdown, or skip-and-continue. `logger.exception` attaches the full traceback, and nothing re-raises because the error is handled here. - **`except …:` → `raise` (no logging)** — when the error is re-raised and the caller or Scrapy logs it with a traceback anyway. `run_coro`'s timeout path cancels the future and re-raises without logging, so the failure is reported once. - **`except Exception:` → `logger.exception('…'); raise`** — the boundary log, used only where local context materially helps *and* the propagated error would otherwise be logged only generically or not at all. The scheduler's `next_request` / `enqueue_request` / `has_pending_requests` are called synchronously by the Scrapy engine (not inside a Deferred), so without this log the Apify-specific context would be lost. **Why `logger.exception` replaced `traceback.print_exc()`:** `traceback.print_exc()` writes a bare traceback straight to stderr, bypassing logging entirely. It has no level, no logger name, no message, and ignores Scrapy's and the SDK's log configuration and handlers. `logger.exception(msg)` logs at ERROR through the configured logging, so it is routed, formatted, and filterable like every other log line. It adds a message explaining *what* failed and still attaches the full traceback automatically, which makes including the exception object in the message (`{exc}`) redundant (ruff TRY401). ## Tests New `tests/unit/scrapy/test_async_thread.py` covers the startup race, run-after-close, timeout cancellation, idempotent close, the caller timeout reaching the shutdown step, and stop/join when task cancellation fails. The scheduler and HTTP cache test modules gain coverage for the timeout setting, closing the thread on open failure, and the cleanup-failure path still closing the thread.
1 parent 62eb505 commit ae12935

7 files changed

Lines changed: 487 additions & 84 deletions

File tree

src/apify/scrapy/_async_thread.py

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,51 +52,63 @@ def run_coro(
5252
The result returned by the coroutine.
5353
5454
Raises:
55-
RuntimeError: If the event loop is not running.
55+
RuntimeError: If the event loop has been closed.
5656
TimeoutError: If the coroutine does not complete within the timeout.
5757
Exception: Any exception raised during coroutine execution.
5858
"""
5959
if timeout == 'default':
6060
timeout = self._default_timeout
6161

62-
if not self._eventloop.is_running():
63-
raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.')
62+
if self._eventloop.is_closed():
63+
raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is closed.')
6464

6565
# Submit the coroutine to the event loop running in the other thread.
6666
future = asyncio.run_coroutine_threadsafe(coro, self._eventloop)
6767
try:
6868
# Wait for the coroutine's result until the specified timeout.
6969
return future.result(timeout=timeout.total_seconds())
70-
except futures.TimeoutError as exc:
71-
logger.exception('Coroutine execution timed out.', exc_info=exc)
72-
raise
73-
except Exception as exc:
74-
logger.exception('Coroutine execution raised an exception.', exc_info=exc)
70+
except futures.TimeoutError:
71+
# `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does
72+
# not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so
73+
# this method does not log it itself.
74+
future.cancel()
7575
raise
7676

77-
def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
77+
def close(self, timeout: timedelta | None = None) -> None:
7878
"""Close the event loop and its thread gracefully.
7979
8080
This method cancels all pending tasks, stops the event loop, and waits for the thread to exit.
8181
If the thread does not exit within the given timeout, a forced shutdown is attempted.
8282
8383
Args:
84-
timeout: The maximum number of seconds to wait for the event loop thread to exit.
84+
timeout: The maximum time to wait for the event loop thread to exit. Pass `None` to use the
85+
`default_timeout` passed to the constructor.
8586
"""
86-
if self._eventloop.is_running():
87-
# Cancel all pending tasks in the event loop.
88-
self.run_coro(self._shutdown_tasks())
87+
if timeout is None:
88+
timeout = self._default_timeout
89+
90+
# A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise
91+
# `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close
92+
# is a no-op.
93+
if self._eventloop.is_closed():
94+
return
8995

90-
# Schedule the event loop to stop.
91-
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
96+
try:
97+
if self._eventloop.is_running():
98+
# Cancel all pending tasks in the event loop, honouring the caller's timeout.
99+
self.run_coro(self._shutdown_tasks(), timeout=timeout)
100+
finally:
101+
# Stop the loop and join its thread even if cancelling the pending tasks above raised or timed
102+
# out. Skipping this would leave the loop running and leak its thread.
103+
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
92104

93-
# Wait for the event loop thread to finish execution.
94-
self._thread.join(timeout=timeout.total_seconds())
105+
# Wait for the event loop thread to finish execution.
106+
self._thread.join(timeout=timeout.total_seconds())
95107

96-
# If the thread is still running after the timeout, force a shutdown.
97-
if self._thread.is_alive():
98-
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
99-
self._force_exit_event_loop()
108+
# If the thread is still running after the timeout, force a shutdown.
109+
if self._thread.is_alive():
110+
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
111+
self._force_exit_event_loop()
100112

101113
def _start_event_loop(self) -> None:
102114
"""Set up and run the asyncio event loop in the dedicated thread."""
@@ -125,5 +137,5 @@ def _force_exit_event_loop(self) -> None:
125137
logger.info('Forced shutdown of the event loop and its thread...')
126138
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
127139
self._thread.join(timeout=5)
128-
except Exception as exc:
129-
logger.exception('Exception occurred during forced event loop shutdown.', exc_info=exc)
140+
except Exception:
141+
logger.exception('Exception occurred during forced event loop shutdown.')

src/apify/scrapy/extensions/_httpcache.py

Lines changed: 133 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io
55
import re
66
import struct
7+
from datetime import timedelta
78
from logging import getLogger
89
from time import time
910
from typing import TYPE_CHECKING
@@ -35,16 +36,36 @@ class ApifyCacheStorage:
3536
"""
3637

3738
def __init__(self, settings: BaseSettings) -> None:
38-
# Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
3939
self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
40+
"""Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`)."""
41+
42+
self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60))
43+
"""Caps how long each coroutine run on the background event loop may take."""
44+
4045
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
46+
"""Seconds a cached entry stays fresh; older entries are treated as expired, and `0` disables expiration."""
47+
4148
self._spider: Spider | None = None
49+
"""The Scrapy `Spider` this cache storage is bound to; set in `open_spider`."""
50+
4251
self._kvs: KeyValueStore | None = None
52+
"""The Apify `KeyValueStore` backing the cache; opened in `open_spider`."""
53+
4354
self._fingerprinter: RequestFingerprinterProtocol | None = None
55+
"""Scrapy's request fingerprinter, used to derive the cache key for each request."""
56+
4457
self._async_thread: AsyncThread | None = None
58+
"""Background event-loop thread that runs the storage coroutines from Scrapy's synchronous callbacks."""
4559

4660
def open_spider(self, spider: Spider) -> None:
47-
"""Open the cache storage for a spider."""
61+
"""Open the cache storage for a spider.
62+
63+
Starts the background event-loop thread and opens the spider's key-value store. If opening the store
64+
fails, the freshly started thread is closed so it is not leaked.
65+
66+
Args:
67+
spider: The spider the cache storage is being opened for.
68+
"""
4869
logger.debug('Using Apify key value cache storage', extra={'spider': spider})
4970
self._spider = spider
5071
self._fingerprinter = spider.crawler.request_fingerprinter
@@ -62,58 +83,75 @@ async def open_kvs() -> KeyValueStore:
6283
return await KeyValueStore.open(name=kvs_name)
6384

6485
logger.debug("Starting background thread for cache storage's event loop")
65-
self._async_thread = AsyncThread()
86+
self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout)
6687
logger.debug(f"Opening cache storage's {kvs_name!r} key value store")
67-
self._kvs = self._async_thread.run_coro(open_kvs())
88+
89+
try:
90+
self._kvs = self._async_thread.run_coro(open_kvs())
91+
except Exception:
92+
logger.exception('Failed to open the cache key-value store.')
93+
# Opening the key-value store failed, so close the freshly started async thread instead of
94+
# leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). Guard
95+
# the close so a secondary failure here cannot mask the original error.
96+
try:
97+
self._async_thread.close()
98+
except Exception:
99+
logger.exception('Failed to close the async thread after a failed cache storage open.')
100+
raise
68101

69102
def close_spider(self, _: Spider, current_time: int | None = None) -> None:
70-
"""Close the cache storage for a spider."""
103+
"""Close the cache storage for a spider.
104+
105+
Runs a best-effort cleanup sweep that deletes expired entries when expiration is enabled, then shuts
106+
down the background event-loop thread. The thread is always closed, even if the sweep fails.
107+
108+
Args:
109+
_: The spider being closed. Part of Scrapy's storage interface, unused here.
110+
current_time: Unix time in seconds used as the current time when deciding which entries have
111+
expired. Defaults to the current time.
112+
"""
71113
if self._async_thread is None:
72114
raise ValueError('Async thread not initialized')
73115

116+
if current_time is None:
117+
current_time = int(time())
118+
74119
logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
75-
if self._expiration_secs > 0:
76-
if current_time is None:
77-
current_time = int(time())
78-
79-
async def expire_kvs() -> None:
80-
if self._kvs is None:
81-
raise ValueError('Key value store not initialized')
82-
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
83-
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
84-
# an expired entry as a cache miss.
85-
processed = 0
86-
async for item in self._kvs.iterate_keys():
87-
if processed >= self._expiration_max_items:
88-
break
89-
processed += 1
90-
value = await self._kvs.get_value(item.key)
91-
try:
92-
gzip_time = read_gzip_time(value)
93-
except Exception as e:
94-
logger.warning(f'Malformed cache item {item.key}: {e}')
95-
await self._kvs.delete_value(item.key)
96-
else:
97-
if self._expiration_secs < current_time - gzip_time:
98-
logger.debug(f'Expired cache item {item.key}')
99-
await self._kvs.delete_value(item.key)
100-
else:
101-
logger.debug(f'Valid cache item {item.key}')
102-
103-
self._async_thread.run_coro(expire_kvs())
104-
105-
logger.debug('Closing cache storage')
120+
121+
# Best-effort: a cleanup failure is logged and swallowed (the sweep only reclaims storage, so failing it
122+
# must not turn a normal spider close into an error), and `close` always runs in the `finally`, so
123+
# neither the failure nor an early return can leak the event-loop thread.
106124
try:
107-
self._async_thread.close()
108-
except KeyboardInterrupt:
109-
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
125+
if self._expiration_secs > 0:
126+
self._async_thread.run_coro(self._expire_kvs(current_time))
110127
except Exception:
111-
logger.exception('Exception occurred while shutting down cache storage')
128+
logger.exception('Failed to clean up expired cache items.')
112129
finally:
113-
logger.debug('Cache storage closed')
130+
logger.debug('Closing cache storage')
131+
try:
132+
self._async_thread.close()
133+
except KeyboardInterrupt:
134+
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
135+
except Exception:
136+
logger.exception('Exception occurred while shutting down cache storage')
137+
finally:
138+
logger.debug('Cache storage closed')
114139

115140
def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None:
116-
"""Retrieve a response from the cache storage."""
141+
"""Retrieve a cached response for a request.
142+
143+
A malformed, legacy, or expired cache entry is treated as a miss, so Scrapy re-fetches the request and
144+
re-stores it in the current format.
145+
146+
Args:
147+
_: The spider making the request. Part of Scrapy's storage interface, unused here.
148+
request: The request to look up in the cache.
149+
current_time: Unix time in seconds used as the current time when checking whether the entry has
150+
expired. Defaults to the current time.
151+
152+
Returns:
153+
The cached response on a hit, or `None` on a miss, an expired entry, or an unreadable entry.
154+
"""
117155
if self._async_thread is None:
118156
raise ValueError('Async thread not initialized')
119157
if self._kvs is None:
@@ -122,7 +160,13 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
122160
raise ValueError('Request fingerprinter not initialized')
123161

124162
key = self._fingerprinter.fingerprint(request).hex()
125-
value = self._async_thread.run_coro(self._kvs.get_value(key))
163+
# Log here before re-raising: this coroutine ran on a separate event-loop thread, and the failure is
164+
# otherwise easy to lose as it crosses that thread boundary back into Scrapy's synchronous machinery.
165+
try:
166+
value = self._async_thread.run_coro(self._kvs.get_value(key))
167+
except Exception:
168+
logger.exception('Failed to retrieve a response from the cache.')
169+
raise
126170

127171
if value is None:
128172
logger.debug('Cache miss', extra={'request': request})
@@ -139,6 +183,7 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
139183
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
140184
logger.debug('Cache expired', extra={'request': request})
141185
return None
186+
142187
data = from_gzip(value)
143188
url = data['url']
144189
status = data['status']
@@ -153,7 +198,13 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
153198
return respcls(url=url, headers=headers, status=status, body=body)
154199

155200
def store_response(self, _: Spider, request: Request, response: Response) -> None:
156-
"""Store a response in the cache storage."""
201+
"""Store a response in the cache storage.
202+
203+
Args:
204+
_: The spider that produced the response. Part of Scrapy's storage interface, unused here.
205+
request: The request the response belongs to. Its fingerprint is used as the cache key.
206+
response: The response to store in the cache.
207+
"""
157208
if self._async_thread is None:
158209
raise ValueError('Async thread not initialized')
159210
if self._kvs is None:
@@ -169,7 +220,42 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
169220
'body': response.body,
170221
}
171222
value = to_gzip(data)
172-
self._async_thread.run_coro(self._kvs.set_value(key, value))
223+
# Log here before re-raising: this coroutine ran on a separate event-loop thread, and the failure is
224+
# otherwise easy to lose as it crosses that thread boundary back into Scrapy's synchronous machinery.
225+
try:
226+
self._async_thread.run_coro(self._kvs.set_value(key, value))
227+
except Exception:
228+
logger.exception('Failed to store a response in the cache.')
229+
raise
230+
231+
async def _expire_kvs(self, current_time: int) -> None:
232+
"""Sweep the cache key-value store, deleting expired or unreadable entries.
233+
234+
Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, so stale
235+
entries may linger. This only reclaims storage; `retrieve_response` already treats an expired entry as
236+
a cache miss.
237+
"""
238+
if self._kvs is None:
239+
raise ValueError('Key value store not initialized')
240+
241+
processed = 0
242+
243+
async for item in self._kvs.iterate_keys():
244+
if processed >= self._expiration_max_items:
245+
break
246+
247+
processed += 1
248+
value = await self._kvs.get_value(item.key)
249+
250+
try:
251+
gzip_time = read_gzip_time(value)
252+
except Exception as exc:
253+
logger.warning(f'Malformed cache item {item.key}: {exc}')
254+
await self._kvs.delete_value(item.key)
255+
else:
256+
if self._expiration_secs < current_time - gzip_time:
257+
logger.debug(f'Expired cache item {item.key}')
258+
await self._kvs.delete_value(item.key)
173259

174260

175261
def to_gzip(data: dict, mtime: int | None = None) -> bytes:
@@ -219,7 +305,8 @@ def get_kvs_name(spider_name: str, max_length: int = 60) -> str:
219305
spider_name: Value of the Spider instance's name attribute.
220306
max_length: Maximum length of the key value store name.
221307
222-
Returns: Key value store name.
308+
Returns:
309+
Key value store name.
223310
224311
Raises:
225312
ValueError: If the spider name contains only special characters.

src/apify/scrapy/requests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ
129129

130130
apify_request = ApifyRequest.from_url(**request_kwargs)
131131

132-
except Exception as exc:
133-
logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}')
132+
except Exception:
133+
logger.exception(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; skipping it.')
134134
return None
135135

136136
# Serialize the Scrapy request as JSON under 'scrapy_request'. Kept outside the broad except above so

0 commit comments

Comments
 (0)