Skip to content

Commit 6c1b503

Browse files
committed
feat(results): stream batch files in export() to avoid OOM on large datasets
- Rewrite export() to read batch parquet files one at a time instead of materialising the full dataset via load_dataset(); peak memory is now proportional to a single batch regardless of dataset size - Infer output format from file extension by default; format= parameter kept as an explicit override (e.g. writing .txt as JSONL) - _export_parquet unifies schemas across batches (pa.unify_schemas) to handle type drift (e.g. int64 vs float64 in the same column) - Drop format= from the controller's export() call — path already carries the correct extension - Rewrite export tests around real batch parquet files (stub_batch_dir fixture); add tests for multi-batch output, schema unification, unknown extension, empty batch directory, and explicit format override
1 parent 50a93d9 commit 6c1b503

4 files changed

Lines changed: 159 additions & 40 deletions

File tree

packages/data-designer/src/data_designer/cli/controllers/generation_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def run_create(
171171
if output_format is not None:
172172
export_path = results.artifact_storage.base_dataset_path / f"dataset.{output_format}"
173173
try:
174-
results.export(export_path, format=output_format) # type: ignore[arg-type]
174+
results.export(export_path)
175175
except Exception as e:
176176
print_error(f"Export failed: {e}")
177177
raise typer.Exit(code=1)

packages/data-designer/src/data_designer/interface/results.py

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77
from typing import TYPE_CHECKING, Literal, get_args
88

9+
import data_designer.lazy_heavy_imports as lazy
910
from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults
1011
from data_designer.config.config_builder import DataDesignerConfigBuilder
1112
from data_designer.config.dataset_metadata import DatasetMetadata
@@ -99,40 +100,55 @@ def get_path_to_processor_artifacts(self, processor_name: str) -> Path:
99100
raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.")
100101
return self.artifact_storage.processors_outputs_path / processor_name
101102

102-
def export(self, path: Path | str, *, format: ExportFormat = "jsonl") -> Path:
103-
"""Export the generated dataset to a single file.
103+
def export(self, path: Path | str, *, format: ExportFormat | None = None) -> Path:
104+
"""Export the generated dataset to a single file by streaming batch files.
105+
106+
The output format is inferred from the file extension when *format* is
107+
omitted. Pass *format* explicitly to override the extension (e.g. write a
108+
``.txt`` file as JSONL).
109+
110+
Unlike :meth:`load_dataset`, this method never materialises the full dataset
111+
in memory — it reads batch parquet files one at a time and appends each to
112+
the output file, keeping peak memory proportional to a single batch.
104113
105114
Args:
106-
path: Output file path. The extension is not inferred from *format* —
107-
the exact path is used as-is.
115+
path: Output file path. The exact path is used as-is; the extension is
116+
not rewritten.
108117
format: Output format. One of ``'jsonl'``, ``'csv'``, or ``'parquet'``.
109-
Defaults to ``'jsonl'``.
118+
When omitted, the format is inferred from the file extension.
110119
111120
Returns:
112121
Path to the written file.
113122
114123
Raises:
115-
InvalidFileFormatError: If an unsupported format is requested.
124+
InvalidFileFormatError: If the format cannot be determined or is not
125+
one of the supported values.
126+
ArtifactStorageError: If no batch parquet files are found.
116127
117128
Example:
118129
>>> results = data_designer.create(config, num_records=1000)
119130
>>> results.export("output.jsonl")
120131
PosixPath('output.jsonl')
121-
>>> results.export("output.csv", format="csv")
132+
>>> results.export("output.csv")
122133
PosixPath('output.csv')
134+
>>> results.export("output.txt", format="jsonl")
135+
PosixPath('output.txt')
123136
"""
124-
if format not in SUPPORTED_EXPORT_FORMATS:
137+
path = Path(path)
138+
resolved_format: str = format if format is not None else path.suffix.lstrip(".")
139+
if resolved_format not in SUPPORTED_EXPORT_FORMATS:
125140
raise InvalidFileFormatError(
126-
f"Unsupported export format: {format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}."
141+
f"Unsupported export format: {resolved_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}."
127142
)
128-
path = Path(path)
129-
df = self.load_dataset()
130-
if format == "jsonl":
131-
df.to_json(path, orient="records", lines=True, force_ascii=False, date_format="iso")
132-
elif format == "csv":
133-
df.to_csv(path, index=False)
134-
elif format == "parquet":
135-
df.to_parquet(path, index=False)
143+
batch_files = sorted(self.artifact_storage.final_dataset_path.glob("batch_*.parquet"))
144+
if not batch_files:
145+
raise ArtifactStorageError("No batch parquet files found to export.")
146+
if resolved_format == "jsonl":
147+
_export_jsonl(batch_files, path)
148+
elif resolved_format == "csv":
149+
_export_csv(batch_files, path)
150+
elif resolved_format == "parquet":
151+
_export_parquet(batch_files, path)
136152
return path
137153

138154
def push_to_hub(
@@ -180,3 +196,39 @@ def push_to_hub(
180196
description=description,
181197
tags=tags,
182198
)
199+
200+
201+
def _export_jsonl(batch_files: list[Path], output: Path) -> None:
202+
"""Write *batch_files* to *output* as JSONL, one record per line.
203+
204+
Each batch is appended in turn so peak memory stays proportional to one batch.
205+
"""
206+
with output.open("w", encoding="utf-8") as f:
207+
for batch_file in batch_files:
208+
chunk = lazy.pd.read_parquet(batch_file)
209+
content = chunk.to_json(orient="records", lines=True, force_ascii=False, date_format="iso")
210+
f.write(content)
211+
if not content.endswith("\n"):
212+
f.write("\n")
213+
214+
215+
def _export_csv(batch_files: list[Path], output: Path) -> None:
216+
"""Write *batch_files* to *output* as CSV with a single header row."""
217+
for i, batch_file in enumerate(batch_files):
218+
chunk = lazy.pd.read_parquet(batch_file)
219+
chunk.to_csv(output, mode="a" if i > 0 else "w", header=(i == 0), index=False)
220+
221+
222+
def _export_parquet(batch_files: list[Path], output: Path) -> None:
223+
"""Write *batch_files* to *output* as a single Parquet file.
224+
225+
Schemas are unified across batches before writing so that columns with minor
226+
type drift (e.g. ``int64`` vs ``float64`` across batches) are cast to a
227+
consistent schema rather than causing a write error.
228+
"""
229+
schemas = [lazy.pq.read_schema(f) for f in batch_files]
230+
unified_schema = lazy.pa.unify_schemas(schemas)
231+
with lazy.pq.ParquetWriter(output, unified_schema) as writer:
232+
for batch_file in batch_files:
233+
table = lazy.pq.read_table(batch_file)
234+
writer.write_table(table.cast(unified_schema))

packages/data-designer/tests/cli/controllers/test_generation_controller.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,6 @@ def test_run_create_with_output_format_happy_path(mock_load_config: MagicMock, m
795795

796796
mock_results.export.assert_called_once_with(
797797
Path("/output/artifacts/dataset") / "dataset.jsonl",
798-
format="jsonl",
799798
)
800799

801800

packages/data-designer/tests/interface/test_results.py

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from data_designer.config.preview_results import PreviewResults
1818
from data_designer.config.utils.errors import DatasetSampleDisplayError
1919
from data_designer.config.utils.visualization import display_sample_record as display_fn
20+
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
2021
from data_designer.engine.storage.artifact_storage import ArtifactStorage
2122
from data_designer.interface.results import DatasetCreationResults
2223

@@ -262,62 +263,129 @@ def test_load_dataset_independent_of_record_sampler_cache(stub_dataset_creation_
262263
stub_artifact_storage.load_dataset.assert_called_once()
263264

264265

266+
@pytest.fixture
267+
def stub_batch_dir(stub_dataframe, tmp_path):
268+
"""Directory with two batch parquet files split from stub_dataframe.
269+
270+
Splitting into two batches exercises the multi-batch streaming path in export().
271+
"""
272+
batch_dir = tmp_path / "parquet-files"
273+
batch_dir.mkdir()
274+
mid = len(stub_dataframe) // 2
275+
stub_dataframe.iloc[:mid].to_parquet(batch_dir / "batch_00000.parquet", index=False)
276+
stub_dataframe.iloc[mid:].to_parquet(batch_dir / "batch_00001.parquet", index=False)
277+
return batch_dir
278+
279+
265280
@pytest.mark.parametrize("fmt", ["jsonl", "csv", "parquet"])
266-
def test_export_writes_file(stub_dataset_creation_results, tmp_path, fmt):
267-
"""export() writes a file in the requested format."""
281+
def test_export_writes_file(stub_dataset_creation_results, stub_batch_dir, tmp_path, fmt) -> None:
282+
"""export() writes a non-empty file for each supported format."""
283+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
268284
out = tmp_path / f"out.{fmt}"
269-
result = stub_dataset_creation_results.export(out, format=fmt)
285+
result = stub_dataset_creation_results.export(out)
270286
assert result == out
271287
assert out.exists()
272288
assert out.stat().st_size > 0
273289

274290

275-
def test_export_jsonl_content(stub_dataset_creation_results, stub_dataframe, tmp_path):
276-
"""JSONL export writes one JSON object per line."""
291+
def test_export_jsonl_content(stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path) -> None:
292+
"""JSONL export writes one valid JSON object per line, covering all records."""
293+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
277294
out = tmp_path / "out.jsonl"
278-
stub_dataset_creation_results.export(out, format="jsonl")
295+
stub_dataset_creation_results.export(out)
279296
lines = out.read_text(encoding="utf-8").splitlines()
280297
assert len(lines) == len(stub_dataframe)
281-
# Each line must be valid JSON
282298
for line in lines:
283299
json.loads(line)
284300

285301

286-
def test_export_csv_content(stub_dataset_creation_results, stub_dataframe, tmp_path):
287-
"""CSV export has a header row and one data row per record."""
302+
def test_export_csv_content(stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path) -> None:
303+
"""CSV export produces a single header row and one data row per record."""
304+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
288305
out = tmp_path / "out.csv"
289-
stub_dataset_creation_results.export(out, format="csv")
306+
stub_dataset_creation_results.export(out)
290307
loaded = lazy.pd.read_csv(out)
291308
assert list(loaded.columns) == list(stub_dataframe.columns)
292309
assert len(loaded) == len(stub_dataframe)
293310

294311

295-
def test_export_parquet_content(stub_dataset_creation_results, stub_dataframe, tmp_path):
296-
"""Parquet export round-trips to the original DataFrame."""
312+
def test_export_parquet_content(stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path) -> None:
313+
"""Parquet export round-trips to the original DataFrame across two batches."""
314+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
297315
out = tmp_path / "out.parquet"
298-
stub_dataset_creation_results.export(out, format="parquet")
316+
stub_dataset_creation_results.export(out)
299317
loaded = lazy.pd.read_parquet(out)
300-
lazy.pd.testing.assert_frame_equal(loaded.reset_index(drop=True), stub_dataframe.reset_index(drop=True))
318+
lazy.pd.testing.assert_frame_equal(
319+
loaded.reset_index(drop=True),
320+
stub_dataframe.reset_index(drop=True),
321+
)
301322

302323

303-
def test_export_default_format_is_jsonl(stub_dataset_creation_results, tmp_path):
304-
"""export() defaults to JSONL when no format is given."""
324+
def test_export_infers_format_from_extension(stub_dataset_creation_results, stub_batch_dir, tmp_path) -> None:
325+
"""export() infers the output format from the file extension when format is omitted."""
326+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
305327
out = tmp_path / "out.jsonl"
306328
stub_dataset_creation_results.export(out)
307329
lines = out.read_text(encoding="utf-8").splitlines()
308-
# All lines must be valid JSON
309330
for line in lines:
310331
json.loads(line)
311332

312333

313-
def test_export_unsupported_format_raises(stub_dataset_creation_results, tmp_path):
314-
"""export() raises InvalidFileFormatError for unknown formats."""
334+
def test_export_explicit_format_overrides_extension(
335+
stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path
336+
) -> None:
337+
"""Passing format= explicitly overrides extension-based inference."""
338+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
339+
out = tmp_path / "data.txt"
340+
stub_dataset_creation_results.export(out, format="jsonl")
341+
lines = out.read_text(encoding="utf-8").splitlines()
342+
assert len(lines) == len(stub_dataframe)
343+
for line in lines:
344+
json.loads(line)
345+
346+
347+
def test_export_parquet_schema_unification(stub_dataset_creation_results, tmp_path) -> None:
348+
"""Parquet export unifies schemas across batches with diverging column types."""
349+
batch_dir = tmp_path / "parquet-files"
350+
batch_dir.mkdir()
351+
# Batch 0: 'value' as int64; Batch 1: 'value' as float64 (type drift)
352+
lazy.pd.DataFrame({"value": lazy.pd.array([1, 2], dtype="int64")}).to_parquet(
353+
batch_dir / "batch_00000.parquet", index=False
354+
)
355+
lazy.pd.DataFrame({"value": lazy.pd.array([3.0, 4.0], dtype="float64")}).to_parquet(
356+
batch_dir / "batch_00001.parquet", index=False
357+
)
358+
stub_dataset_creation_results.artifact_storage.final_dataset_path = batch_dir
359+
out = tmp_path / "out.parquet"
360+
stub_dataset_creation_results.export(out)
361+
loaded = lazy.pd.read_parquet(out)
362+
assert list(loaded["value"]) == [1.0, 2.0, 3.0, 4.0]
363+
364+
365+
def test_export_unknown_extension_raises(stub_dataset_creation_results, tmp_path) -> None:
366+
"""export() raises InvalidFileFormatError when the extension is not a supported format."""
315367
with pytest.raises(InvalidFileFormatError, match="Unsupported export format"):
316-
stub_dataset_creation_results.export(tmp_path / "out.xyz", format="xlsx") # type: ignore[arg-type]
368+
stub_dataset_creation_results.export(tmp_path / "out.xyz")
369+
370+
371+
def test_export_unsupported_explicit_format_raises(stub_dataset_creation_results, tmp_path) -> None:
372+
"""export() raises InvalidFileFormatError for an explicit unsupported format override."""
373+
with pytest.raises(InvalidFileFormatError, match="Unsupported export format"):
374+
stub_dataset_creation_results.export(tmp_path / "out.jsonl", format="xlsx") # type: ignore[arg-type]
375+
376+
377+
def test_export_no_batch_files_raises(stub_dataset_creation_results, tmp_path) -> None:
378+
"""export() raises ArtifactStorageError when the batch directory is empty."""
379+
empty_dir = tmp_path / "parquet-files"
380+
empty_dir.mkdir()
381+
stub_dataset_creation_results.artifact_storage.final_dataset_path = empty_dir
382+
with pytest.raises(ArtifactStorageError, match="No batch parquet files found"):
383+
stub_dataset_creation_results.export(tmp_path / "out.jsonl")
317384

318385

319-
def test_export_returns_path_object(stub_dataset_creation_results, tmp_path):
386+
def test_export_returns_path_object(stub_dataset_creation_results, stub_batch_dir, tmp_path) -> None:
320387
"""export() returns a Path regardless of whether str or Path was passed."""
388+
stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir
321389
out = tmp_path / "out.jsonl"
322390
result = stub_dataset_creation_results.export(str(out))
323391
assert isinstance(result, Path)

0 commit comments

Comments
 (0)