Skip to content

Commit a87e8d1

Browse files
authored
fix(scrapy)!: serialize requests and HTTP cache as JSON instead of pickle (#951)
Scrapy requests (in the Apify request queue) and cached responses (in the Scrapy HTTP cache) were serialized with pickle, but both storages hold JSON. This switches them to JSON via a shared serializer (`apify/scrapy/_serialization.py`). Request serialization: - Binary fields (`body`, `headers`) are base64-encoded; pydantic models such as Crawlee's `UserData` are dumped via `model_dump`. - A non-JSON-serializable `meta`/`cb_kwargs` is logged and the request skipped, instead of being silently corrupted. - A serialized `_class` is honored only when it is already imported as a `scrapy.Request` subclass. HTTP cache, so existing caches survive the upgrade: - A legacy pickle entry fails to load, so reads treat it as a cache miss and re-fetch it, instead of crashing the download. - The cleanup sweep now deletes stale, expired, or malformed entries. It previously overwrote them with `set_value(..., None)`, which under apify-client v3 stores JSON `null` rather than deleting, leaking dead records. **BREAKING CHANGE:** requests and HTTP cache entries are now stored as JSON, not pickle. Entries written by older versions are ignored (re-fetched), not read.
1 parent 1c6ad7a commit a87e8d1

12 files changed

Lines changed: 1136 additions & 100 deletions

File tree

docs/03_guides/06_scrapy.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ The Apify SDK provides several custom components to support integration with the
4646
- <ApiLink to="class/ApifyScheduler">`apify.scrapy.ApifyScheduler`</ApiLink> - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests.
4747
- <ApiLink to="class/ActorDatasetPushPipeline">`apify.scrapy.ActorDatasetPushPipeline`</ApiLink> - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset.
4848
- <ApiLink to="class/ApifyHttpProxyMiddleware">`apify.scrapy.ApifyHttpProxyMiddleware`</ApiLink> - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service.
49-
- <ApiLink to="class/ApifyCacheStorage">`apify.scrapy.extensions.ApifyCacheStorage`</ApiLink> - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work.
49+
- <ApiLink to="class/ApifyCacheStorage">`apify.scrapy.extensions.ApifyCacheStorage`</ApiLink> - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). To enable caching, set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings. By default, when the spider closes, up to 100 expired and unreadable entries per run are cleaned up. To change this number, update `APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS`.
5050

5151
Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include:
5252

docs/04_upgrading/upgrading_to_v4.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,36 @@ await client.run('my-run').charge('my-event', count=5)
198198
### Async `iterate_*` are no longer coroutine functions
199199

200200
`DatasetClientAsync.iterate_items()` and `KeyValueStoreClientAsync.iterate_keys()` are now plain `def` functions returning `AsyncIterator[T]`. Consumer code (`async for ...`) is unchanged; if you annotate the call's return value, change `AsyncGenerator[T, None]` to `AsyncIterator[T]`.
201+
202+
## Scrapy requests and HTTP cache stored as JSON
203+
204+
This applies only if you use the Scrapy integration (`apify[scrapy]`).
205+
206+
The integration now serializes data as JSON instead of pickle. `ApifyScheduler` stores Scrapy requests in the Apify request queue (under `user_data['scrapy_request']`), and `ApifyCacheStorage` stores HTTP cache entries in the key-value store. Both now hold JSON. Unlike pickle, JSON stays stable across Python and library versions.
207+
208+
### Persisted data from before the upgrade is not read back
209+
210+
Data written by an older SDK uses the pickle format, which v4 does not load. The two storages handle this differently:
211+
212+
- HTTP cache: a legacy entry is treated as a cache miss. Scrapy re-fetches the page and re-stores it as JSON, so the cache heals itself. No action is needed.
213+
- Request queue: a request stored by an older SDK cannot be reconstructed, so it is skipped and the failure is logged. This matters only when pre-upgrade requests are still in the queue, for example after a run is migrated or restarted, or when you reuse a named request queue. A fresh run is not affected.
214+
215+
### `meta` and `cb_kwargs` must be JSON-serializable
216+
217+
Pickle could store arbitrary Python objects. JSON cannot, so the values in a request's `meta` and `cb_kwargs` are now subject to JSON's type system:
218+
219+
- A `tuple` comes back as a `list`.
220+
- Non-string `dict` keys come back as strings, so `{1: 'a'}` becomes `{'1': 'a'}`.
221+
- A value JSON cannot represent (`datetime`, `set`, `Decimal`, a custom object) is no longer stored silently. The request is skipped and the failure is logged. Pydantic models are still supported and are dumped with `model_dump()`.
222+
223+
Convert such values to a JSON-friendly form before yielding the request:
224+
225+
```python
226+
from datetime import datetime
227+
228+
# Before (v3): relied on pickle to store the datetime object.
229+
yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1)})
230+
231+
# After (v4): store a JSON-serializable value.
232+
yield scrapy.Request(url, meta={'since': datetime(2024, 1, 1).isoformat()})
233+
```

src/apify/scrapy/_serialization.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""JSON serialization of Scrapy requests and cached responses for storage on the Apify platform.
2+
3+
Scrapy requests and cached responses are stored in the Apify request queue and key-value store which hold JSON,
4+
so they are serialized as JSON here rather than pickled.
5+
6+
Only `body` (`bytes`) and `headers` (`{bytes: [bytes]}`) are not natively JSON-serializable; both sit at fixed keys
7+
and are base64-encoded in place. A `str` `body` is encoded as its UTF-8 bytes and comes back as `bytes`, matching
8+
Scrapy, which always stores `body` as `bytes`. Pydantic models such as Crawlee's `UserData` are dumped via
9+
`model_dump()`. Everything else, notably `meta` and `cb_kwargs`, must already be JSON-serializable, otherwise
10+
serialization fails with a clear error naming the offending value. No in-band sentinel is used, so no user value
11+
can collide with the encoding.
12+
13+
Known limitations of the pickle -> JSON switch (a documented breaking change): JSON has fewer types than pickle,
14+
so values in `meta`/`cb_kwargs` are subject to JSON's coercions. A `tuple` round-trips as a `list` and non-string
15+
`dict` keys round-trip as strings (e.g. `{1: 'a'}` becomes `{'1': 'a'}`). Values JSON cannot represent at all
16+
(`datetime`, `set`, `Decimal`, arbitrary objects, ...) are not coerced silently: serialization raises and the request
17+
is skipped loudly rather than stored in a corrupted form.
18+
"""
19+
20+
from __future__ import annotations
21+
22+
import base64
23+
import json
24+
from typing import Any
25+
26+
from pydantic import BaseModel
27+
28+
# Cap the offending value's repr in a serialization error message so a huge value cannot bloat the log.
29+
_MAX_ERROR_VALUE_REPR_LEN = 200
30+
31+
32+
def encode_to_json(data: dict[str, Any]) -> str:
33+
"""Serialize a Scrapy request/response dict to a JSON string.
34+
35+
The `body` and `headers` fields are base64-encoded in place (a `str` `body` via its UTF-8 bytes); pydantic
36+
models are dumped to plain dicts. A `TypeError` is raised if any other value cannot be JSON-encoded.
37+
38+
Args:
39+
data: The dict to serialize, e.g. the output of `scrapy.Request.to_dict()`.
40+
41+
Returns:
42+
The JSON-encoded string.
43+
"""
44+
if not isinstance(data, dict):
45+
raise TypeError(f'Expected a dict to serialize, got {type(data)}')
46+
47+
safe = dict(data)
48+
49+
# `body` is base64-encoded so binary payloads survive; a `str` body is taken as its UTF-8 bytes, which keeps
50+
# encode/decode symmetric (decode always base64-decodes `body` back to `bytes`).
51+
body = safe.get('body')
52+
if isinstance(body, (bytes, str)):
53+
raw_body = body.encode('utf-8') if isinstance(body, str) else body
54+
safe['body'] = base64.b64encode(raw_body).decode('ascii')
55+
56+
if isinstance(safe.get('headers'), dict):
57+
safe['headers'] = _encode_headers(safe['headers'])
58+
59+
try:
60+
# `ensure_ascii=False` keeps non-ASCII URLs/meta as their UTF-8 form instead of `\uXXXX` escapes, which
61+
# would otherwise roughly double the size of non-Latin text in storage.
62+
return json.dumps(safe, default=_json_default, ensure_ascii=False)
63+
except TypeError as exc:
64+
raise TypeError(
65+
'Failed to JSON-serialize a Scrapy request/response for storage on the Apify platform. '
66+
'All values in `meta` and `cb_kwargs` must be JSON-serializable (str, int, float, bool, None, '
67+
'list, dict, or a pydantic model).'
68+
) from exc
69+
70+
71+
def decode_from_json(text: str) -> Any:
72+
"""Reconstruct a Scrapy request/response dict from a string produced by `encode_to_json`.
73+
74+
The base64-encoded `body` and `headers` fields are decoded back to their `bytes` representation.
75+
76+
Args:
77+
text: The JSON-encoded string.
78+
79+
Returns:
80+
The decoded object (a dict for valid request/response payloads).
81+
"""
82+
data = json.loads(text)
83+
if not isinstance(data, dict):
84+
return data
85+
86+
# `validate=True` makes a non-base64 body raise loudly instead of silently decoding to garbage.
87+
if isinstance(data.get('body'), str):
88+
data['body'] = base64.b64decode(data['body'], validate=True)
89+
90+
if isinstance(data.get('headers'), dict):
91+
data['headers'] = _decode_headers(data['headers'])
92+
93+
return data
94+
95+
96+
def _json_default(obj: Any) -> Any:
97+
"""Fallback for values `json.dumps` cannot serialize: pydantic models are dumped, anything else raises.
98+
99+
The error names the offending value (type and a truncated repr) so a failed serialization points straight
100+
at the bad `meta`/`cb_kwargs` entry instead of just reporting that something failed.
101+
"""
102+
if isinstance(obj, BaseModel):
103+
return obj.model_dump(by_alias=True)
104+
value_repr = repr(obj)
105+
if len(value_repr) > _MAX_ERROR_VALUE_REPR_LEN:
106+
value_repr = value_repr[:_MAX_ERROR_VALUE_REPR_LEN] + '...'
107+
raise TypeError(f'Object of type {type(obj).__name__} is not JSON-serializable: {value_repr}')
108+
109+
110+
def _encode_headers(headers: dict[Any, Any]) -> dict[str, list[str]]:
111+
"""Encode a Scrapy `{bytes: [bytes]}` headers mapping to a JSON-safe `{str: [base64-str]}`."""
112+
encoded: dict[str, list[str]] = {}
113+
for key, value in headers.items():
114+
str_key = key.decode('latin-1') if isinstance(key, bytes) else key
115+
values = value if isinstance(value, (list, tuple)) else [value]
116+
encoded[str_key] = [_b64encode_value(item) for item in values]
117+
return encoded
118+
119+
120+
def _decode_headers(headers: dict[str, Any]) -> dict[bytes, list[bytes]]:
121+
"""Reverse `_encode_headers`, restoring the `{bytes: [bytes]}` mapping Scrapy expects."""
122+
decoded: dict[bytes, list[bytes]] = {}
123+
for key, value in headers.items():
124+
bytes_key = key.encode('latin-1') if isinstance(key, str) else key
125+
values = value if isinstance(value, list) else [value]
126+
decoded[bytes_key] = [base64.b64decode(item, validate=True) for item in values]
127+
return decoded
128+
129+
130+
def _b64encode_value(value: Any) -> str:
131+
"""Base64-encode a single header value.
132+
133+
Scrapy stores header values as `bytes`; a `str` is encoded as its UTF-8 bytes. Any other type is coerced with
134+
`str()` as a lenient last resort. That coercion is lossy (e.g. `5` becomes `b'5'`), but Scrapy does not produce
135+
non-`bytes`/`str` header values, so it is not hit on the real path.
136+
"""
137+
raw = value if isinstance(value, bytes) else str(value).encode('utf-8')
138+
return base64.b64encode(raw).decode('ascii')

src/apify/scrapy/extensions/_httpcache.py

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import gzip
44
import io
5-
import pickle
65
import re
76
import struct
87
from logging import getLogger
@@ -14,6 +13,7 @@
1413

1514
from apify import Configuration
1615
from apify.scrapy._async_thread import AsyncThread
16+
from apify.scrapy._serialization import decode_from_json, encode_to_json
1717
from apify.storage_clients import ApifyStorageClient
1818
from apify.storages import KeyValueStore
1919

@@ -29,14 +29,14 @@
2929
class ApifyCacheStorage:
3030
"""A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses.
3131
32-
It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches
33-
responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`)
34-
in the Scrapy documentation for more information. Requires the asyncio Twisted reactor
35-
to be installed.
32+
It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches responses to requests.
33+
See HTTPCache middleware settings (prefixed with `HTTPCACHE_`) in the Scrapy documentation for more information.
34+
Requires the asyncio Twisted reactor to be installed.
3635
"""
3736

3837
def __init__(self, settings: BaseSettings) -> None:
39-
self._expiration_max_items = 100
38+
# Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
39+
self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
4040
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
4141
self._spider: Spider | None = None
4242
self._kvs: KeyValueStore | None = None
@@ -79,23 +79,26 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None:
7979
async def expire_kvs() -> None:
8080
if self._kvs is None:
8181
raise ValueError('Key value store not initialized')
82-
i = 0
82+
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
83+
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
84+
# an expired entry as a cache miss.
85+
processed = 0
8386
async for item in self._kvs.iterate_keys():
87+
if processed >= self._expiration_max_items:
88+
break
89+
processed += 1
8490
value = await self._kvs.get_value(item.key)
8591
try:
8692
gzip_time = read_gzip_time(value)
8793
except Exception as e:
8894
logger.warning(f'Malformed cache item {item.key}: {e}')
89-
await self._kvs.set_value(item.key, None)
95+
await self._kvs.delete_value(item.key)
9096
else:
9197
if self._expiration_secs < current_time - gzip_time:
9298
logger.debug(f'Expired cache item {item.key}')
93-
await self._kvs.set_value(item.key, None)
99+
await self._kvs.delete_value(item.key)
94100
else:
95101
logger.debug(f'Valid cache item {item.key}')
96-
if i == self._expiration_max_items:
97-
break
98-
i += 1
99102

100103
self._async_thread.run_coro(expire_kvs())
101104

@@ -127,17 +130,25 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
127130

128131
if current_time is None:
129132
current_time = int(time())
130-
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
131-
logger.debug('Cache expired', extra={'request': request})
133+
134+
# A malformed or legacy cache entry must not crash retrieval; treat it as a cache miss so Scrapy re-fetches
135+
# and re-stores it in the current format. The field reads stay inside the `try` as well: a value that decodes
136+
# to a dict missing any expected key (a forward/older format, or a truncated-but-valid JSON payload) must
137+
# also degrade to a miss rather than raising an uncaught `KeyError`.
138+
try:
139+
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
140+
logger.debug('Cache expired', extra={'request': request})
141+
return None
142+
data = from_gzip(value)
143+
url = data['url']
144+
status = data['status']
145+
headers = Headers(data['headers'])
146+
body = data['body']
147+
except Exception as exc:
148+
logger.warning(f'Ignoring malformed cache entry {key!r}: {exc}', extra={'request': request})
132149
return None
133150

134-
data = from_gzip(value)
135-
url = data['url']
136-
status = data['status']
137-
headers = Headers(data['headers'])
138-
body = data['body']
139151
respcls = responsetypes.from_args(headers=headers, url=url, body=body)
140-
141152
logger.debug('Cache hit', extra={'request': request})
142153
return respcls(url=url, headers=headers, status=status, body=body)
143154

@@ -162,18 +173,25 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
162173

163174

164175
def to_gzip(data: dict, mtime: int | None = None) -> bytes:
165-
"""Dump a dictionary to a gzip-compressed byte stream."""
176+
"""Dump a dictionary to a gzip-compressed JSON byte stream.
177+
178+
Cache entries live in the Apify key-value store, which holds JSON, so they are serialized as JSON rather
179+
than pickled. See `apify.scrapy._serialization` for the encoding.
180+
"""
181+
payload = encode_to_json(data).encode('utf-8')
166182
with io.BytesIO() as byte_stream:
167183
with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file:
168-
pickle.dump(data, gzip_file, protocol=4)
184+
gzip_file.write(payload)
169185
return byte_stream.getvalue()
170186

171187

172188
def from_gzip(gzip_bytes: bytes) -> dict:
173-
"""Load a dictionary from a gzip-compressed byte stream."""
189+
"""Load a dictionary from a gzip-compressed JSON byte stream."""
174190
with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file:
175-
data: dict = pickle.load(gzip_file)
176-
return data
191+
data = decode_from_json(gzip_file.read().decode('utf-8'))
192+
if not isinstance(data, dict):
193+
raise TypeError(f'Expected a dict from the cached payload, got {type(data)}')
194+
return data
177195

178196

179197
def read_gzip_time(gzip_bytes: bytes) -> int:
@@ -187,17 +205,15 @@ def read_gzip_time(gzip_bytes: bytes) -> int:
187205
def get_kvs_name(spider_name: str, max_length: int = 60) -> str:
188206
"""Get the key value store name for a spider.
189207
190-
The key value store name is derived from the spider name by replacing all special characters
191-
with hyphens and trimming leading and trailing hyphens. The resulting name is prefixed with
192-
'httpcache-' and truncated to the maximum length.
208+
The key value store name is derived from the spider name by replacing all special characters with hyphens
209+
and trimming leading and trailing hyphens. The resulting name is prefixed with 'httpcache-' and truncated
210+
to the maximum length.
193211
194-
The documentation
195-
[about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
212+
The documentation [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
196213
mentions that names can be up to 63 characters long, so the default max length is set to 60.
197214
198-
Such naming isn't unique per spider, but should be sufficiently unique for most use cases.
199-
The name of the key value store should indicate to which spider it belongs, e.g. in
200-
the listing in the Apify's console.
215+
Such naming isn't unique per spider, but should be sufficiently unique for most use cases. The name
216+
of the key-value store should indicate to which spider it belongs, e.g. in the listing in the Apify's console.
201217
202218
Args:
203219
spider_name: Value of the Spider instance's name attribute.

0 commit comments

Comments
 (0)