33of generated fixtures.
44"""
55
6+ import heapq
67import json
78import os
89import re
1314 IO ,
1415 ClassVar ,
1516 Dict ,
17+ Generator ,
1618 List ,
1719 Literal ,
1820 Optional ,
2628from .file import Fixtures
2729
2830
31+ def _sorted_entries_from_partial (
32+ partial_path : Path ,
33+ ) -> Generator [Tuple [str , str ], None , None ]:
34+ """
35+ Generator yielding (key, value) pairs from a partial file, sorted by key.
36+
37+ Loads one partial file into memory at a time (not all partials together).
38+ Each worker's partial file is typically small relative to the total.
39+ """
40+ entries = []
41+ with open (partial_path ) as f :
42+ for line in f :
43+ line = line .strip ()
44+ if line :
45+ entry = json .loads (line )
46+ entries .append ((entry ["k" ], entry ["v" ]))
47+ entries .sort (key = lambda x : x [0 ])
48+ yield from entries
49+
50+
2951def merge_partial_fixture_files (output_dir : Path ) -> None :
3052 """
3153 Merge all partial fixture JSONL files into final JSON fixture files.
3254
3355 Called at session end after all workers have written their partials.
3456 Each partial file contains JSONL lines: {"k": fixture_id, "v": json_str}
57+
58+ Uses k-way merge: each partial file is sorted individually, then merged
59+ using heapq.merge. This keeps memory usage proportional to the largest
60+ single partial file, not the total of all partials.
3561 """
3662 # Find all partial files
3763 partial_files = list (output_dir .rglob ("*.partial.*.jsonl" ))
@@ -56,29 +82,25 @@ def merge_partial_fixture_files(output_dir: Path) -> None:
5682
5783 # Merge each group into its target file
5884 for target_path , partials in partials_by_target .items ():
59- entries : Dict [str , str ] = {}
60-
61- # Read all partial files
62- for partial in partials :
63- with open (partial ) as f :
64- for line in f :
65- line = line .strip ()
66- if not line :
67- continue
68- entry = json .loads (line )
69- entries [entry ["k" ]] = entry ["v" ]
70-
71- # Write final JSON file
72- sorted_keys = sorted (entries .keys ())
73- last_idx = len (sorted_keys ) - 1
74- with open (target_path , "w" ) as f :
75- f .write ("{\n " )
76- for i , key in enumerate (sorted_keys ):
85+ # K-way merge: sort each partial individually, then merge streams
86+ # Memory = O(largest single partial), not O(sum of all partials)
87+ sorted_iterators = [_sorted_entries_from_partial (p ) for p in partials ]
88+ merged = heapq .merge (* sorted_iterators , key = lambda x : x [0 ])
89+
90+ # Stream merged entries to output file
91+ with open (target_path , "w" ) as out_f :
92+ out_f .write ("{\n " )
93+ first = True
94+ for key , value in merged :
95+ if not first :
96+ out_f .write (",\n " )
97+ first = False
7798 key_json = json .dumps (key )
78- value_indented = entries [key ].replace ("\n " , "\n " )
79- f .write (f" { key_json } : { value_indented } " )
80- f .write (",\n " if i < last_idx else "\n " )
81- f .write ("}" )
99+ value_indented = value .replace ("\n " , "\n " )
100+ out_f .write (f" { key_json } : { value_indented } " )
101+ if not first :
102+ out_f .write ("\n " )
103+ out_f .write ("}" )
82104
83105 # Clean up partial files
84106 for partial in partials :
0 commit comments