Skip to content

Commit c48ecea

Browse files
fselmodanceratopz
andauthored
feat(test-tools): t8n file streaming optimizations (#2751)
* feat(test-tools): file-based t8n stream; replacing in-memory stdout Replaces Python's stdout-buffered, multi-pass parse of t8n output (which held up to ~5 copies of the alloc in flight: raw stdout bytes, parsed dict, re-serialized string, re-parsed dict, validated Alloc) with a file-based LazyAllocFile that streams {address: account_dict} entries incrementally via ``ijson`` and validates each ``Account`` one at a time, preserving every existing Pydantic validator. This cuts per-test peak RSS on the unchunkified benchmark from 5.38 GB → 3.40 GB (measured, fixtures byte-identical) — ~2 GB of Python-heap pressure removed per xdist worker. * fix(test-tools): fix issues with evm-dump-dir and ijson updates * fix: updates from comments on PR #2751 * feat(test-tools): stream chained-block t8n input via file path * fix(test-client-clis): ensure output dir exists in evm-dump script * fix(test-tools): force GC in `LazyAllocFile` keepalive test under PyPy `test_lazy_alloc_file_keepalive_pins_temp_dir` asserted that `del lazy` immediately wipes the temp directory pinned by `LazyAllocFile._keepalive`. That holds on CPython, where reference-counted finalization runs `TemporaryDirectory.__del__` -> `cleanup()` synchronously when the last reference drops. PyPy uses a generational garbage collector with no refcounting, so after `del lazy` the `TemporaryDirectory` is unreachable but not yet finalized, and the directory is still on disk when the next assertion runs. Add an explicit `gc.collect()` to trigger the finalizer deterministically on both interpreters. * fix(test-client-clis): release `LazyAllocFile` keepalive on `OutputCache` insert The PR introducing the file-based t8n alloc streaming path pins each producing call's `TemporaryDirectory` onto the resulting `LazyAllocFile` via `_keepalive`, so the next chained block can read `output/alloc.json` directly without re-serialization. That lifetime is correct for the live, unchained-handoff path: the predecessor's temp dir is dropped as soon as the consumer call returns and the previous `TransitionToolOutput` falls out of scope. `OutputCache`, however, keeps every `TransitionToolOutput` produced during a single test alive for the duration of that test (single-key cache, cleared on key change). When chained-block tests run with caching enabled, every cached subcall retains its own `output/alloc.json` plus the entire surrounding `TemporaryDirectory` on `/tmp`. This is `O(N)` for an `N`-block chained test, where each `alloc.json` can be hundreds of MB to several GB; for the unchunkified benchmark scenario this could easily exhaust `/tmp` on shared CI runners. Materialize the streamed alloc into the cached `Alloc` and clear `_keepalive` before storing the result. By the time we cache, the chained handoff has already happened (the consumer block has finished its `subprocess.run`), so the on-disk file is no longer needed for zero-copy chaining: the only future readers are cache replays from another fixture format, which want a parsed `Alloc` anyway. Tradeoff: - Cache replays now pay the `ijson` parse cost on insert rather than on first `.get()`. This is the same parse the live path performs lazily; doing it eagerly at cache-set is fine because we know the caller has just finished using the result, so amortizing the parse here avoids a later surprise during replay. - The cached entry retains the parsed `Alloc` in Python heap rather than the `alloc.json` on disk. For a typical chained benchmark this is a strict improvement: one parsed `Alloc` is smaller than the equivalent JSON text plus the `TemporaryDirectory` overhead, and the peak-RSS savings of the original streaming PR are preserved during the live run (the materialization happens after the producing call has already released its working buffers). - The streaming benefit during the first run is fully preserved; only the cached-replay path materializes. A move-the-keepalive-to-the-consumer alternative was considered and rejected: the consumer block currently sees only the predecessor's `Path`, not the `LazyAllocFile` instance, so threading explicit cleanup back to the predecessor would be invasive and tangled with the `TransitionToolData` flow. Releasing at cache-insertion is the lowest-blast-radius point: it's the moment we know the file isn't needed for chained handoff anymore, and it's localized to `OutputCache.set` rather than spread across the two `_evaluate_*` paths. * fix(test-client-clis): reject non-object top-level JSON in `LazyAllocFile.validate` `ijson.kvitems(f, "")` only yields key-value pairs when the top-level JSON value is an object: for `null`, `[]`, scalars, or other valid-but-non-object inputs, the iterator silently yields zero pairs. With the streaming alloc parser landed in this branch, that means `Alloc.model_validate({})` quietly succeeds and the t8n caller receives an empty post-state instead of an error. The pre-streaming path (`LazyAllocStr.validate`, which calls `Alloc.model_validate_json`) raised on those same inputs, so the streaming PR is a regression in error fidelity even though success-path behavior is unchanged: the t8n binaries we drive (geth, evmone, execution-specs) all produce object-shaped `alloc.json` by contract, so this failure mode does not fire under normal operation. The risk is the failure shape: a t8n that crashes after redirecting stdout, an output path that gets overwritten with an error JSON, or any future divergence from the contract would silently zero the post-state, and the test would only fail downstream with a misleading consensus mismatch rather than a clean "alloc.json is malformed" at the point of corruption. Silent corruption that surfaces far from its source is the costly kind of debugging. Probe the first parse event from `ijson.parse` and raise `ValueError` if it is not `start_map`, then seek back to zero and let `ijson.kvitems` consume the stream as before. Tradeoffs: - One extra parse event per call. `ijson.parse` is event-driven; pulling a single event then re-seeking is effectively free relative to the streamed body parse, and we keep the `kvitems`-based hot path so the entry-by-entry validation pattern is unchanged. - `ValueError` rather than reusing `ijson.IncompleteJSONError`: the input here is well-formed JSON (just the wrong shape), so reporting it as an incomplete-stream error would itself be misleading. The existing malformed-JSON tests continue to surface as `IncompleteJSONError` from inside `ijson.parse` because those inputs raise before we ever read a complete first event: the new guard only takes effect for valid JSON of the wrong type. - The legitimately empty alloc (`{}`) is preserved: the guard accepts `start_map` and the rest of the function returns an empty `Alloc` exactly as before. New regression test covers this. Three new parametrized cases (`null`, `[]`, `42`) cover the silent-zero scenarios, and a positive test for `{}` pins the empty-object behavior so future tightening cannot accidentally reject it. --------- Co-authored-by: danceratopz <danceratopz@gmail.com>
1 parent 1fdd7e3 commit c48ecea

6 files changed

Lines changed: 644 additions & 87 deletions

File tree

packages/testing/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ dependencies = [
5252
"ckzg>=2.1.3,<3",
5353
"tenacity>=9.0.0,<10",
5454
"Jinja2>=3,<4",
55+
"ijson>=3.3,<4",
5556
]
5657

5758
[project.urls]

packages/testing/src/execution_testing/client_clis/cli_types.py

Lines changed: 118 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Types used in the transition tool interactions."""
22

33
import json
4-
from dataclasses import dataclass
4+
import shutil
5+
import tempfile
6+
from dataclasses import dataclass, field
57
from pathlib import Path
68
from typing import (
79
Annotated,
@@ -10,13 +12,16 @@
1012
Generic,
1113
List,
1214
NamedTuple,
15+
Optional,
1316
Self,
1417
TypeVar,
1518
)
1619

20+
import ijson # type: ignore[import-untyped]
1721
from pydantic import Field, PlainSerializer, PlainValidator
1822

1923
from execution_testing.base_types import (
24+
Account,
2025
Bloom,
2126
Bytes,
2227
CamelModel,
@@ -458,6 +463,55 @@ def validate(self) -> Alloc:
458463
return Alloc.model_validate_json(self.raw)
459464

460465

466+
@dataclass(kw_only=True)
467+
class LazyAllocFile(LazyAlloc[Path]):
468+
"""
469+
Lazy allocation backed by a filesystem path.
470+
471+
Parses `{address: account_or_null}` entries from the file incrementally
472+
via `ijson.kvitems` and validates each `Account` one at a time through
473+
`Account.model_validate`. The full mapping of validated accounts is still
474+
accumulated before `Alloc.model_validate` is called, so peak memory
475+
scales with the size of the alloc — but the raw JSON string is never
476+
held in Python memory, and there is no re-serialize / re-parse round
477+
trip, which is where `LazyAllocStr` incurs its multi-GB peak.
478+
479+
The optional ``_keepalive`` field holds the producing t8n call's
480+
``TemporaryDirectory`` so the on-disk alloc.json survives until this
481+
LazyAllocFile is dropped. That lets a chained next-block t8n call
482+
consume the alloc directly from disk (via ``--input.alloc=<path>`` for
483+
geth, or ``shutil.copyfile`` for filesystem t8ns) without round-tripping
484+
through ``Alloc.get().model_dump_json()`` in Python.
485+
"""
486+
487+
_keepalive: Optional[tempfile.TemporaryDirectory] = field(default=None)
488+
489+
def validate(self) -> Alloc:
490+
"""Validate the alloc by streaming entries from the backing file."""
491+
accumulated: Dict[str, Account | None] = {}
492+
with open(self.raw, "rb") as f:
493+
# `ijson.kvitems(f, "")` silently yields nothing for non-object
494+
# top-level JSON (`null`, `[]`, scalars), which would turn a
495+
# corrupted alloc.json into an empty post-state. Probe the first
496+
# parse event so the streaming path matches the fail-loud
497+
# behavior of `LazyAllocStr.validate` /
498+
# `Alloc.model_validate_json`.
499+
first = next(ijson.parse(f), None)
500+
if first is None or first[1] != "start_map":
501+
raise ValueError(
502+
f"Expected JSON object at top level of {self.raw}"
503+
)
504+
f.seek(0)
505+
for address_str, account_data in ijson.kvitems(f, ""):
506+
if account_data is None:
507+
accumulated[address_str] = None
508+
else:
509+
accumulated[address_str] = Account.model_validate(
510+
account_data
511+
)
512+
return Alloc.model_validate(accumulated)
513+
514+
461515
@dataclass
462516
class TransitionToolInput:
463517
"""Transition tool input."""
@@ -473,13 +527,20 @@ def to_files(
473527
"""
474528
Prepare the input in a directory path in the file system for
475529
consumption by the t8n tool.
530+
531+
For ``LazyAllocFile`` inputs whose backing file is still on disk
532+
(chained-block handoff: previous t8n call's temp dir is pinned via
533+
the keepalive field), the alloc is copied byte-for-byte rather than
534+
round-tripped through ``Alloc.get().model_dump_json()``.
476535
"""
477-
if isinstance(self.alloc, Alloc):
478-
alloc_contents = self.alloc.model_dump_json(**model_dump_config)
479-
elif isinstance(self.alloc, LazyAllocStr):
480-
alloc_contents = self.alloc.raw
536+
alloc_path = directory_path / "alloc.json"
537+
if (
538+
isinstance(self.alloc, LazyAllocFile)
539+
and Path(self.alloc.raw).exists()
540+
):
541+
shutil.copyfile(self.alloc.raw, alloc_path)
481542
else:
482-
raise Exception(f"Invalid alloc type: {type(self.alloc)}")
543+
alloc_path.write_text(self._serialize_alloc(**model_dump_config))
483544

484545
env_contents = self.env.model_dump_json(**model_dump_config)
485546
txs_contents = (
@@ -489,33 +550,41 @@ def to_files(
489550
)
490551
+ "]"
491552
)
492-
input_contents: Dict[str, str] = {
493-
"alloc": alloc_contents,
494-
"env": env_contents,
495-
"txs": txs_contents,
496-
}
497-
if self.blob_params is not None:
498-
input_contents["blobParams"] = self.blob_params.model_dump_json(
499-
**model_dump_config
500-
)
501553

502-
input_paths: Dict[str, str] = {}
503-
for content_type, contents in input_contents.items():
504-
file_path = directory_path / f"{content_type}.json"
554+
input_paths: Dict[str, str] = {"alloc": str(alloc_path)}
555+
for name, contents in (("env", env_contents), ("txs", txs_contents)):
556+
file_path = directory_path / f"{name}.json"
505557
file_path.write_text(contents)
506-
input_paths[content_type] = str(file_path)
558+
input_paths[name] = str(file_path)
559+
if self.blob_params is not None:
560+
blob_path = directory_path / "blobParams.json"
561+
blob_path.write_text(
562+
self.blob_params.model_dump_json(**model_dump_config)
563+
)
564+
input_paths["blobParams"] = str(blob_path)
507565

508566
return input_paths
509567

510-
def model_dump_json(self, **model_dump_config: Any) -> str:
511-
"""Dump the model in string JSON format."""
568+
def _serialize_alloc(self, **model_dump_config: Any) -> str:
569+
"""Serialize ``self.alloc`` to a JSON string."""
512570
if isinstance(self.alloc, Alloc):
513-
alloc_contents = self.alloc.model_dump_json(**model_dump_config)
514-
elif isinstance(self.alloc, LazyAllocStr):
515-
alloc_contents = self.alloc.raw
516-
else:
517-
raise Exception(f"Invalid alloc type: {type(self.alloc)}")
571+
return self.alloc.model_dump_json(**model_dump_config)
572+
if isinstance(self.alloc, LazyAllocStr):
573+
return self.alloc.raw
574+
if isinstance(self.alloc, LazyAllocFile):
575+
return self.alloc.get().model_dump_json(**model_dump_config)
576+
raise Exception(f"Invalid alloc type: {type(self.alloc)}")
577+
578+
def model_dump_json(
579+
self, *, exclude_alloc: bool = False, **model_dump_config: Any
580+
) -> str:
581+
"""
582+
Dump the model in string JSON format.
518583
584+
Pass ``exclude_alloc=True`` when the t8n is reading the alloc from a
585+
file path instead of the stdin bundle, to avoid building a multi-GB
586+
JSON string in Python memory for chained-block handoffs.
587+
"""
519588
env_contents = self.env.model_dump_json(**model_dump_config)
520589
txs_contents = (
521590
"["
@@ -524,18 +593,21 @@ def model_dump_json(self, **model_dump_config: Any) -> str:
524593
)
525594
+ "]"
526595
)
527-
input_contents: Dict[str, str] = {
528-
"alloc": alloc_contents,
529-
"env": env_contents,
530-
"txs": txs_contents,
531-
}
596+
input_contents: Dict[str, str] = {}
597+
if not exclude_alloc:
598+
input_contents["alloc"] = self._serialize_alloc(
599+
**model_dump_config
600+
)
601+
input_contents["env"] = env_contents
602+
input_contents["txs"] = txs_contents
532603
if self.blob_params is not None:
533604
input_contents["blobParams"] = self.blob_params.model_dump_json(
534605
**model_dump_config
535606
)
536-
contents: List[str] = []
537-
for content_type, type_contents in input_contents.items():
538-
contents.append(f'"{content_type}": {type_contents}')
607+
contents: List[str] = [
608+
f'"{content_type}": {type_contents}'
609+
for content_type, type_contents in input_contents.items()
610+
]
539611
return "{" + ",".join(contents) + "}"
540612

541613
def model_dump(self, mode: str, **model_dump_config: Any) -> Any:
@@ -547,6 +619,10 @@ def model_dump(self, mode: str, **model_dump_config: Any) -> Any:
547619
)
548620
elif isinstance(self.alloc, LazyAllocJson):
549621
alloc_contents = self.alloc.raw
622+
elif isinstance(self.alloc, LazyAllocFile):
623+
alloc_contents = self.alloc.get().model_dump(
624+
mode=mode, **model_dump_config
625+
)
550626
else:
551627
raise Exception(f"Invalid alloc type: {type(self.alloc)}")
552628

@@ -582,13 +658,19 @@ def model_validate_files(
582658
"""
583659
Validate the model from the file system where each key is a
584660
different JSON file.
661+
662+
`alloc.json` is referenced by path and parsed incrementally on
663+
`.get()` via `LazyAllocFile`, so the full file is never held in
664+
memory alongside the validated `Alloc`.
585665
"""
586-
alloc_data = (directory_path / "alloc.json").read_text()
587666
result_data = (directory_path / "result.json").read_text()
588667
result = Result.model_validate_json(
589668
json_data=result_data, context=context
590669
)
591-
alloc = LazyAllocStr(raw=alloc_data, _state_root=result.state_root)
670+
alloc = LazyAllocFile(
671+
raw=directory_path / "alloc.json",
672+
_state_root=result.state_root,
673+
)
592674
output = cls(result=result, alloc=alloc)
593675
return output
594676

packages/testing/src/execution_testing/client_clis/file_utils.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Methods to work with the filesystem and json."""
22

33
import os
4+
import shutil
45
import stat
56
from json import dump
67
from pathlib import Path
@@ -9,6 +10,7 @@
910
from pydantic import BaseModel, RootModel
1011

1112
from execution_testing.client_clis.cli_types import (
13+
LazyAllocFile,
1214
LazyAllocJson,
1315
LazyAllocStr,
1416
TransitionToolInput,
@@ -28,27 +30,41 @@ def dump_files_to_directory(output_path: Path, files: Dict[str, Any]) -> None:
2830
if rel_path:
2931
os.makedirs(output_path / rel_path, exist_ok=True)
3032
file_path = output_path / file_rel_path
31-
with open(file_path, "w") as f:
32-
if isinstance(file_contents, (LazyAllocStr, LazyAllocJson)):
33-
if isinstance(file_contents, LazyAllocJson):
34-
dump(file_contents.raw, f, ensure_ascii=True, indent=4)
35-
else:
36-
f.write(file_contents.raw)
37-
38-
elif isinstance(
39-
file_contents, (BaseModel, RootModel, TransitionToolInput)
40-
):
41-
f.write(
42-
file_contents.model_dump_json(
43-
indent=4,
44-
exclude_none=True,
45-
by_alias=True,
46-
)
33+
if (
34+
isinstance(file_contents, LazyAllocFile)
35+
and Path(file_contents.raw).exists()
36+
):
37+
shutil.copyfile(file_contents.raw, file_path)
38+
elif isinstance(file_contents, LazyAllocFile):
39+
# Backing temp dir was cleaned up after a previous `.get()`
40+
# (e.g. chained-block t8n on the next block); fall back to
41+
# the cached Alloc so debug dumps still capture the input.
42+
file_path.write_text(
43+
file_contents.get().model_dump_json(
44+
indent=4, exclude_none=True, by_alias=True
4745
)
48-
elif isinstance(file_contents, str):
49-
f.write(file_contents)
50-
else:
51-
dump(file_contents, f, ensure_ascii=True, indent=4)
46+
)
47+
else:
48+
with open(file_path, "w") as f:
49+
if isinstance(file_contents, (LazyAllocStr, LazyAllocJson)):
50+
if isinstance(file_contents, LazyAllocJson):
51+
dump(file_contents.raw, f, ensure_ascii=True, indent=4)
52+
else:
53+
f.write(file_contents.raw)
54+
elif isinstance(
55+
file_contents, (BaseModel, RootModel, TransitionToolInput)
56+
):
57+
f.write(
58+
file_contents.model_dump_json(
59+
indent=4,
60+
exclude_none=True,
61+
by_alias=True,
62+
)
63+
)
64+
elif isinstance(file_contents, str):
65+
f.write(file_contents)
66+
else:
67+
dump(file_contents, f, ensure_ascii=True, indent=4)
5268
if flags:
5369
file_mode = os.stat(file_path).st_mode
5470
if "x" in flags:

0 commit comments

Comments
 (0)