diff --git a/packages/data-designer/src/data_designer/cli/commands/create.py b/packages/data-designer/src/data_designer/cli/commands/create.py index 3bf3265f..ec0aa357 100644 --- a/packages/data-designer/src/data_designer/cli/commands/create.py +++ b/packages/data-designer/src/data_designer/cli/commands/create.py @@ -3,10 +3,12 @@ from __future__ import annotations +import click import typer from data_designer.cli.controllers.generation_controller import GenerationController from data_designer.config.utils.constants import DEFAULT_NUM_RECORDS +from data_designer.interface.results import SUPPORTED_EXPORT_FORMATS def create_command( @@ -35,6 +37,17 @@ def create_command( "-o", help="Path where generated artifacts will be stored. Defaults to ./artifacts.", ), + output_format: str | None = typer.Option( + None, + "--output-format", + "-f", + click_type=click.Choice(list(SUPPORTED_EXPORT_FORMATS)), + help=( + "Export the dataset to a single file after generation. " + "Supported formats: jsonl, csv, parquet. " + "The file is written to //dataset.." + ), + ), ) -> None: """Create a full dataset and save results to disk. @@ -60,4 +73,5 @@ def create_command( num_records=num_records, dataset_name=dataset_name, artifact_path=artifact_path, + output_format=output_format, ) diff --git a/packages/data-designer/src/data_designer/cli/controllers/generation_controller.py b/packages/data-designer/src/data_designer/cli/controllers/generation_controller.py index 74a44c3c..07f8c995 100644 --- a/packages/data-designer/src/data_designer/cli/controllers/generation_controller.py +++ b/packages/data-designer/src/data_designer/cli/controllers/generation_controller.py @@ -116,6 +116,7 @@ def run_create( num_records: int, dataset_name: str, artifact_path: str | None, + output_format: str | None = None, ) -> None: """Load config, create a full dataset, and save results to disk. @@ -124,7 +125,17 @@ def run_create( num_records: Number of records to generate. dataset_name: Name for the generated dataset folder. artifact_path: Path where generated artifacts will be stored, or None for default. + output_format: If set, export the dataset to a single file in this format after + generation. One of 'jsonl', 'csv', 'parquet'. """ + from data_designer.interface.results import SUPPORTED_EXPORT_FORMATS + + if output_format is not None and output_format not in SUPPORTED_EXPORT_FORMATS: + print_error( + f"Unsupported export format: {output_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}." + ) + raise typer.Exit(code=1) + config_builder = self._load_config(config_source) resolved_artifact_path = Path(artifact_path) if artifact_path else Path.cwd() / "artifacts" @@ -147,7 +158,7 @@ def run_create( print_error(f"Dataset creation failed: {e}") raise typer.Exit(code=1) - dataset = results.load_dataset() + num_records = len(results.load_dataset()) analysis = results.load_analysis() if analysis is not None: @@ -155,8 +166,19 @@ def run_create( analysis.to_report() console.print() - print_success(f"Dataset created — {len(dataset)} record(s) generated") console.print(f" Artifacts saved to: [bold]{results.artifact_storage.base_dataset_path}[/bold]") + + if output_format is not None: + export_path = results.artifact_storage.base_dataset_path / f"dataset.{output_format}" + try: + results.export(export_path) + except Exception as e: + print_error(f"Export failed: {e}") + raise typer.Exit(code=1) + console.print(f" Exported to: [bold]{export_path}[/bold]") + + console.print() + print_success(f"Dataset created — {num_records} record(s) generated") console.print() def _load_config(self, config_source: str) -> DataDesignerConfigBuilder: diff --git a/packages/data-designer/src/data_designer/interface/results.py b/packages/data-designer/src/data_designer/interface/results.py index 07692ff0..5670f354 100644 --- a/packages/data-designer/src/data_designer/interface/results.py +++ b/packages/data-designer/src/data_designer/interface/results.py @@ -4,11 +4,13 @@ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal, get_args +import data_designer.lazy_heavy_imports as lazy from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults from data_designer.config.config_builder import DataDesignerConfigBuilder from data_designer.config.dataset_metadata import DatasetMetadata +from data_designer.config.errors import InvalidFileFormatError from data_designer.config.utils.visualization import WithRecordSamplerMixin from data_designer.engine.dataset_builders.errors import ArtifactStorageError from data_designer.engine.storage.artifact_storage import ArtifactStorage @@ -19,6 +21,9 @@ from data_designer.engine.dataset_builders.utils.task_model import TaskTrace +ExportFormat = Literal["jsonl", "csv", "parquet"] +SUPPORTED_EXPORT_FORMATS: tuple[str, ...] = get_args(ExportFormat) + class DatasetCreationResults(WithRecordSamplerMixin): """Results container for a Data Designer dataset creation run. @@ -95,6 +100,57 @@ def get_path_to_processor_artifacts(self, processor_name: str) -> Path: raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.") return self.artifact_storage.processors_outputs_path / processor_name + def export(self, path: Path | str, *, format: ExportFormat | None = None) -> Path: + """Export the generated dataset to a single file by streaming batch files. + + The output format is inferred from the file extension when *format* is + omitted. Pass *format* explicitly to override the extension (e.g. write a + ``.txt`` file as JSONL). + + Unlike :meth:`load_dataset`, this method never materialises the full dataset + in memory — it reads batch parquet files one at a time and appends each to + the output file, keeping peak memory proportional to a single batch. + + Args: + path: Output file path. The exact path is used as-is; the extension is + not rewritten. + format: Output format. One of ``'jsonl'``, ``'csv'``, or ``'parquet'``. + When omitted, the format is inferred from the file extension. + + Returns: + Path to the written file. + + Raises: + InvalidFileFormatError: If the format cannot be determined or is not + one of the supported values. + ArtifactStorageError: If no batch parquet files are found. + + Example: + >>> results = data_designer.create(config, num_records=1000) + >>> results.export("output.jsonl") + PosixPath('output.jsonl') + >>> results.export("output.csv") + PosixPath('output.csv') + >>> results.export("output.txt", format="jsonl") + PosixPath('output.txt') + """ + path = Path(path) + resolved_format: str = format if format is not None else path.suffix.lstrip(".") + if resolved_format not in SUPPORTED_EXPORT_FORMATS: + raise InvalidFileFormatError( + f"Unsupported export format: {resolved_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}." + ) + batch_files = sorted(self.artifact_storage.final_dataset_path.glob("batch_*.parquet")) + if not batch_files: + raise ArtifactStorageError("No batch parquet files found to export.") + if resolved_format == "jsonl": + _export_jsonl(batch_files, path) + elif resolved_format == "csv": + _export_csv(batch_files, path) + elif resolved_format == "parquet": + _export_parquet(batch_files, path) + return path + def push_to_hub( self, repo_id: str, @@ -140,3 +196,39 @@ def push_to_hub( description=description, tags=tags, ) + + +def _export_jsonl(batch_files: list[Path], output: Path) -> None: + """Write *batch_files* to *output* as JSONL, one record per line. + + Each batch is appended in turn so peak memory stays proportional to one batch. + """ + with output.open("w", encoding="utf-8") as f: + for batch_file in batch_files: + chunk = lazy.pd.read_parquet(batch_file) + content = chunk.to_json(orient="records", lines=True, force_ascii=False, date_format="iso") + f.write(content) + if not content.endswith("\n"): + f.write("\n") + + +def _export_csv(batch_files: list[Path], output: Path) -> None: + """Write *batch_files* to *output* as CSV with a single header row.""" + for i, batch_file in enumerate(batch_files): + chunk = lazy.pd.read_parquet(batch_file) + chunk.to_csv(output, mode="a" if i > 0 else "w", header=(i == 0), index=False) + + +def _export_parquet(batch_files: list[Path], output: Path) -> None: + """Write *batch_files* to *output* as a single Parquet file. + + Schemas are unified across batches before writing so that columns with minor + type drift (e.g. ``int64`` vs ``float64`` across batches) are cast to a + consistent schema rather than causing a write error. + """ + schemas = [lazy.pq.read_schema(f) for f in batch_files] + unified_schema = lazy.pa.unify_schemas(schemas) + with lazy.pq.ParquetWriter(output, unified_schema) as writer: + for batch_file in batch_files: + table = lazy.pq.read_table(batch_file) + writer.write_table(table.cast(unified_schema)) diff --git a/packages/data-designer/tests/cli/commands/test_create_command.py b/packages/data-designer/tests/cli/commands/test_create_command.py index 30cae9bc..fc779df7 100644 --- a/packages/data-designer/tests/cli/commands/test_create_command.py +++ b/packages/data-designer/tests/cli/commands/test_create_command.py @@ -18,7 +18,9 @@ def test_create_command_delegates_to_controller(mock_ctrl_cls: MagicMock) -> Non mock_ctrl = MagicMock() mock_ctrl_cls.return_value = mock_ctrl - create_command(config_source="config.yaml", num_records=10, dataset_name="dataset", artifact_path=None) + create_command( + config_source="config.yaml", num_records=10, dataset_name="dataset", artifact_path=None, output_format=None + ) mock_ctrl_cls.assert_called_once() mock_ctrl.run_create.assert_called_once_with( @@ -26,6 +28,7 @@ def test_create_command_delegates_to_controller(mock_ctrl_cls: MagicMock) -> Non num_records=10, dataset_name="dataset", artifact_path=None, + output_format=None, ) @@ -40,6 +43,7 @@ def test_create_command_passes_custom_options(mock_ctrl_cls: MagicMock) -> None: num_records=100, dataset_name="my_data", artifact_path="/custom/output", + output_format=None, ) mock_ctrl.run_create.assert_called_once_with( @@ -47,6 +51,7 @@ def test_create_command_passes_custom_options(mock_ctrl_cls: MagicMock) -> None: num_records=100, dataset_name="my_data", artifact_path="/custom/output", + output_format=None, ) @@ -56,11 +61,37 @@ def test_create_command_default_artifact_path_is_none(mock_ctrl_cls: MagicMock) mock_ctrl = MagicMock() mock_ctrl_cls.return_value = mock_ctrl - create_command(config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None) + create_command( + config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None, output_format=None + ) mock_ctrl.run_create.assert_called_once_with( config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None, + output_format=None, + ) + + +@patch("data_designer.cli.commands.create.GenerationController") +def test_create_command_passes_output_format(mock_ctrl_cls: MagicMock) -> None: + """Test create_command forwards --output-format to the controller.""" + mock_ctrl = MagicMock() + mock_ctrl_cls.return_value = mock_ctrl + + create_command( + config_source="config.yaml", + num_records=10, + dataset_name="dataset", + artifact_path=None, + output_format="jsonl", + ) + + mock_ctrl.run_create.assert_called_once_with( + config_source="config.yaml", + num_records=10, + dataset_name="dataset", + artifact_path=None, + output_format="jsonl", ) diff --git a/packages/data-designer/tests/cli/controllers/test_generation_controller.py b/packages/data-designer/tests/cli/controllers/test_generation_controller.py index de4918cf..87408bde 100644 --- a/packages/data-designer/tests/cli/controllers/test_generation_controller.py +++ b/packages/data-designer/tests/cli/controllers/test_generation_controller.py @@ -772,3 +772,64 @@ def test_run_create_skips_report_when_analysis_is_none(mock_load_config: MagicMo # load_analysis() returns None, so to_report() must not be called. # If the code ignores the None check, an AttributeError propagates and the test fails. mock_results.load_analysis.assert_called_once() + + +@patch(f"{_CTRL}.DataDesigner") +@patch(f"{_CTRL}.load_config_builder") +def test_run_create_with_output_format_happy_path(mock_load_config: MagicMock, mock_dd_cls: MagicMock) -> None: + """export() is called with the correct path and format when --output-format is given.""" + mock_load_config.return_value = MagicMock(spec=DataDesignerConfigBuilder) + mock_dd = MagicMock() + mock_dd_cls.return_value = mock_dd + mock_results = _make_mock_create_results(5) + mock_dd.create.return_value = mock_results + + controller = GenerationController() + controller.run_create( + config_source="config.yaml", + num_records=5, + dataset_name="dataset", + artifact_path=None, + output_format="jsonl", + ) + + mock_results.export.assert_called_once_with( + Path("/output/artifacts/dataset") / "dataset.jsonl", + ) + + +def test_run_create_invalid_output_format_exits() -> None: + """Bad --output-format exits with code 1 before generation starts.""" + controller = GenerationController() + with pytest.raises(typer.Exit) as exc_info: + controller.run_create( + config_source="config.yaml", + num_records=10, + dataset_name="dataset", + artifact_path=None, + output_format="xlsx", + ) + assert exc_info.value.exit_code == 1 + + +@patch(f"{_CTRL}.DataDesigner") +@patch(f"{_CTRL}.load_config_builder") +def test_run_create_export_failure_exits(mock_load_config: MagicMock, mock_dd_cls: MagicMock) -> None: + """If export() raises, run_create exits with code 1.""" + mock_load_config.return_value = MagicMock(spec=DataDesignerConfigBuilder) + mock_dd = MagicMock() + mock_dd_cls.return_value = mock_dd + mock_results = _make_mock_create_results(5) + mock_results.export.side_effect = RuntimeError("disk full") + mock_dd.create.return_value = mock_results + + controller = GenerationController() + with pytest.raises(typer.Exit) as exc_info: + controller.run_create( + config_source="config.yaml", + num_records=5, + dataset_name="dataset", + artifact_path=None, + output_format="csv", + ) + assert exc_info.value.exit_code == 1 diff --git a/packages/data-designer/tests/cli/test_main.py b/packages/data-designer/tests/cli/test_main.py index 7a4ec555..83866c22 100644 --- a/packages/data-designer/tests/cli/test_main.py +++ b/packages/data-designer/tests/cli/test_main.py @@ -40,4 +40,5 @@ def test_app_dispatches_lazy_create_command(mock_controller_cls: Mock) -> None: num_records=DEFAULT_NUM_RECORDS, dataset_name="dataset", artifact_path=None, + output_format=None, ) diff --git a/packages/data-designer/tests/interface/test_results.py b/packages/data-designer/tests/interface/test_results.py index a28dd987..f0946ef4 100644 --- a/packages/data-designer/tests/interface/test_results.py +++ b/packages/data-designer/tests/interface/test_results.py @@ -3,6 +3,8 @@ from __future__ import annotations +import json +from pathlib import Path from unittest.mock import MagicMock, patch import pytest @@ -11,9 +13,11 @@ from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults from data_designer.config.config_builder import DataDesignerConfigBuilder from data_designer.config.dataset_metadata import DatasetMetadata +from data_designer.config.errors import InvalidFileFormatError from data_designer.config.preview_results import PreviewResults from data_designer.config.utils.errors import DatasetSampleDisplayError from data_designer.config.utils.visualization import display_sample_record as display_fn +from data_designer.engine.dataset_builders.errors import ArtifactStorageError from data_designer.engine.storage.artifact_storage import ArtifactStorage from data_designer.interface.results import DatasetCreationResults @@ -259,6 +263,134 @@ def test_load_dataset_independent_of_record_sampler_cache(stub_dataset_creation_ stub_artifact_storage.load_dataset.assert_called_once() +@pytest.fixture +def stub_batch_dir(stub_dataframe, tmp_path): + """Directory with two batch parquet files split from stub_dataframe. + + Splitting into two batches exercises the multi-batch streaming path in export(). + """ + batch_dir = tmp_path / "parquet-files" + batch_dir.mkdir() + mid = len(stub_dataframe) // 2 + stub_dataframe.iloc[:mid].to_parquet(batch_dir / "batch_00000.parquet", index=False) + stub_dataframe.iloc[mid:].to_parquet(batch_dir / "batch_00001.parquet", index=False) + return batch_dir + + +@pytest.mark.parametrize("fmt", ["jsonl", "csv", "parquet"]) +def test_export_writes_file(stub_dataset_creation_results, stub_batch_dir, tmp_path, fmt) -> None: + """export() writes a non-empty file for each supported format.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / f"out.{fmt}" + result = stub_dataset_creation_results.export(out) + assert result == out + assert out.exists() + assert out.stat().st_size > 0 + + +def test_export_jsonl_content(stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path) -> None: + """JSONL export writes one valid JSON object per line, covering all records.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / "out.jsonl" + stub_dataset_creation_results.export(out) + lines = out.read_text(encoding="utf-8").splitlines() + assert len(lines) == len(stub_dataframe) + for line in lines: + json.loads(line) + + +def test_export_csv_content(stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path) -> None: + """CSV export produces a single header row and one data row per record.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / "out.csv" + stub_dataset_creation_results.export(out) + loaded = lazy.pd.read_csv(out) + assert list(loaded.columns) == list(stub_dataframe.columns) + assert len(loaded) == len(stub_dataframe) + + +def test_export_parquet_content(stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path) -> None: + """Parquet export round-trips to the original DataFrame across two batches.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / "out.parquet" + stub_dataset_creation_results.export(out) + loaded = lazy.pd.read_parquet(out) + lazy.pd.testing.assert_frame_equal( + loaded.reset_index(drop=True), + stub_dataframe.reset_index(drop=True), + ) + + +def test_export_infers_format_from_extension(stub_dataset_creation_results, stub_batch_dir, tmp_path) -> None: + """export() infers the output format from the file extension when format is omitted.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / "out.jsonl" + stub_dataset_creation_results.export(out) + lines = out.read_text(encoding="utf-8").splitlines() + for line in lines: + json.loads(line) + + +def test_export_explicit_format_overrides_extension( + stub_dataset_creation_results, stub_dataframe, stub_batch_dir, tmp_path +) -> None: + """Passing format= explicitly overrides extension-based inference.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / "data.txt" + stub_dataset_creation_results.export(out, format="jsonl") + lines = out.read_text(encoding="utf-8").splitlines() + assert len(lines) == len(stub_dataframe) + for line in lines: + json.loads(line) + + +def test_export_parquet_schema_unification(stub_dataset_creation_results, tmp_path) -> None: + """Parquet export unifies schemas across batches with diverging column types.""" + batch_dir = tmp_path / "parquet-files" + batch_dir.mkdir() + # Batch 0: 'value' as int64; Batch 1: 'value' as float64 (type drift) + lazy.pd.DataFrame({"value": lazy.pd.array([1, 2], dtype="int64")}).to_parquet( + batch_dir / "batch_00000.parquet", index=False + ) + lazy.pd.DataFrame({"value": lazy.pd.array([3.0, 4.0], dtype="float64")}).to_parquet( + batch_dir / "batch_00001.parquet", index=False + ) + stub_dataset_creation_results.artifact_storage.final_dataset_path = batch_dir + out = tmp_path / "out.parquet" + stub_dataset_creation_results.export(out) + loaded = lazy.pd.read_parquet(out) + assert list(loaded["value"]) == [1.0, 2.0, 3.0, 4.0] + + +def test_export_unknown_extension_raises(stub_dataset_creation_results, tmp_path) -> None: + """export() raises InvalidFileFormatError when the extension is not a supported format.""" + with pytest.raises(InvalidFileFormatError, match="Unsupported export format"): + stub_dataset_creation_results.export(tmp_path / "out.xyz") + + +def test_export_unsupported_explicit_format_raises(stub_dataset_creation_results, tmp_path) -> None: + """export() raises InvalidFileFormatError for an explicit unsupported format override.""" + with pytest.raises(InvalidFileFormatError, match="Unsupported export format"): + stub_dataset_creation_results.export(tmp_path / "out.jsonl", format="xlsx") # type: ignore[arg-type] + + +def test_export_no_batch_files_raises(stub_dataset_creation_results, tmp_path) -> None: + """export() raises ArtifactStorageError when the batch directory is empty.""" + empty_dir = tmp_path / "parquet-files" + empty_dir.mkdir() + stub_dataset_creation_results.artifact_storage.final_dataset_path = empty_dir + with pytest.raises(ArtifactStorageError, match="No batch parquet files found"): + stub_dataset_creation_results.export(tmp_path / "out.jsonl") + + +def test_export_returns_path_object(stub_dataset_creation_results, stub_batch_dir, tmp_path) -> None: + """export() returns a Path regardless of whether str or Path was passed.""" + stub_dataset_creation_results.artifact_storage.final_dataset_path = stub_batch_dir + out = tmp_path / "out.jsonl" + result = stub_dataset_creation_results.export(str(out)) + assert isinstance(result, Path) + + def test_preview_results_dataset_metadata() -> None: """Test that PreviewResults uses DatasetMetadata in display_sample_record.""" config_builder = MagicMock(spec=DataDesignerConfigBuilder)