Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/data-designer/src/data_designer/cli/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from __future__ import annotations

import click
import typer

from data_designer.cli.controllers.generation_controller import GenerationController
from data_designer.config.utils.constants import DEFAULT_NUM_RECORDS
from data_designer.interface.results import SUPPORTED_EXPORT_FORMATS


def create_command(
Expand Down Expand Up @@ -35,6 +37,17 @@ def create_command(
"-o",
help="Path where generated artifacts will be stored. Defaults to ./artifacts.",
),
output_format: str | None = typer.Option(
None,
"--output-format",
"-f",
click_type=click.Choice(list(SUPPORTED_EXPORT_FORMATS)),
help=(
"Export the dataset to a single file after generation. "
"Supported formats: jsonl, csv, parquet. "
"The file is written to <artifact-path>/<dataset-name>/dataset.<format>."
),
),
) -> None:
"""Create a full dataset and save results to disk.
Expand All @@ -60,4 +73,5 @@ def create_command(
num_records=num_records,
dataset_name=dataset_name,
artifact_path=artifact_path,
output_format=output_format,
)
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def run_create(
num_records: int,
dataset_name: str,
artifact_path: str | None,
output_format: str | None = None,
) -> None:
"""Load config, create a full dataset, and save results to disk.

Expand All @@ -124,7 +125,17 @@ def run_create(
num_records: Number of records to generate.
dataset_name: Name for the generated dataset folder.
artifact_path: Path where generated artifacts will be stored, or None for default.
output_format: If set, export the dataset to a single file in this format after
generation. One of 'jsonl', 'csv', 'parquet'.
"""
from data_designer.interface.results import SUPPORTED_EXPORT_FORMATS

if output_format is not None and output_format not in SUPPORTED_EXPORT_FORMATS:
Comment thread
przemekboruta marked this conversation as resolved.
print_error(
f"Unsupported export format: {output_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}."
)
raise typer.Exit(code=1)

config_builder = self._load_config(config_source)

resolved_artifact_path = Path(artifact_path) if artifact_path else Path.cwd() / "artifacts"
Expand All @@ -147,16 +158,27 @@ def run_create(
print_error(f"Dataset creation failed: {e}")
raise typer.Exit(code=1)

dataset = results.load_dataset()
num_records = len(results.load_dataset())

analysis = results.load_analysis()
if analysis is not None:
console.print()
analysis.to_report()

console.print()
print_success(f"Dataset created — {len(dataset)} record(s) generated")
console.print(f" Artifacts saved to: [bold]{results.artifact_storage.base_dataset_path}[/bold]")

if output_format is not None:
Comment thread
przemekboruta marked this conversation as resolved.
export_path = results.artifact_storage.base_dataset_path / f"dataset.{output_format}"
try:
results.export(export_path)
except Exception as e:
print_error(f"Export failed: {e}")
raise typer.Exit(code=1)
console.print(f" Exported to: [bold]{export_path}[/bold]")

console.print()
print_success(f"Dataset created — {num_records} record(s) generated")
console.print()

def _load_config(self, config_source: str) -> DataDesignerConfigBuilder:
Expand Down
94 changes: 93 additions & 1 deletion packages/data-designer/src/data_designer/interface/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal, get_args

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults
from data_designer.config.config_builder import DataDesignerConfigBuilder
from data_designer.config.dataset_metadata import DatasetMetadata
from data_designer.config.errors import InvalidFileFormatError
from data_designer.config.utils.visualization import WithRecordSamplerMixin
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
from data_designer.engine.storage.artifact_storage import ArtifactStorage
Expand All @@ -19,6 +21,9 @@

from data_designer.engine.dataset_builders.utils.task_model import TaskTrace

ExportFormat = Literal["jsonl", "csv", "parquet"]
SUPPORTED_EXPORT_FORMATS: tuple[str, ...] = get_args(ExportFormat)


class DatasetCreationResults(WithRecordSamplerMixin):
"""Results container for a Data Designer dataset creation run.
Expand Down Expand Up @@ -95,6 +100,57 @@ def get_path_to_processor_artifacts(self, processor_name: str) -> Path:
raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.")
return self.artifact_storage.processors_outputs_path / processor_name

def export(self, path: Path | str, *, format: ExportFormat | None = None) -> Path:
"""Export the generated dataset to a single file by streaming batch files.

The output format is inferred from the file extension when *format* is
omitted. Pass *format* explicitly to override the extension (e.g. write a
``.txt`` file as JSONL).

Unlike :meth:`load_dataset`, this method never materialises the full dataset
in memory — it reads batch parquet files one at a time and appends each to
the output file, keeping peak memory proportional to a single batch.

Args:
path: Output file path. The exact path is used as-is; the extension is
not rewritten.
format: Output format. One of ``'jsonl'``, ``'csv'``, or ``'parquet'``.
When omitted, the format is inferred from the file extension.

Returns:
Path to the written file.

Raises:
InvalidFileFormatError: If the format cannot be determined or is not
one of the supported values.
ArtifactStorageError: If no batch parquet files are found.

Example:
>>> results = data_designer.create(config, num_records=1000)
>>> results.export("output.jsonl")
PosixPath('output.jsonl')
>>> results.export("output.csv")
PosixPath('output.csv')
Comment thread
przemekboruta marked this conversation as resolved.
>>> results.export("output.txt", format="jsonl")
PosixPath('output.txt')
"""
path = Path(path)
resolved_format: str = format if format is not None else path.suffix.lstrip(".")
if resolved_format not in SUPPORTED_EXPORT_FORMATS:
raise InvalidFileFormatError(
f"Unsupported export format: {resolved_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}."
)
batch_files = sorted(self.artifact_storage.final_dataset_path.glob("batch_*.parquet"))
if not batch_files:
raise ArtifactStorageError("No batch parquet files found to export.")
if resolved_format == "jsonl":
_export_jsonl(batch_files, path)
elif resolved_format == "csv":
_export_csv(batch_files, path)
elif resolved_format == "parquet":
_export_parquet(batch_files, path)
return path

def push_to_hub(
self,
repo_id: str,
Expand Down Expand Up @@ -140,3 +196,39 @@ def push_to_hub(
description=description,
tags=tags,
)


def _export_jsonl(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as JSONL, one record per line.

Each batch is appended in turn so peak memory stays proportional to one batch.
"""
with output.open("w", encoding="utf-8") as f:
for batch_file in batch_files:
chunk = lazy.pd.read_parquet(batch_file)
content = chunk.to_json(orient="records", lines=True, force_ascii=False, date_format="iso")
f.write(content)
if not content.endswith("\n"):
f.write("\n")


def _export_csv(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as CSV with a single header row."""
for i, batch_file in enumerate(batch_files):
chunk = lazy.pd.read_parquet(batch_file)
chunk.to_csv(output, mode="a" if i > 0 else "w", header=(i == 0), index=False)


def _export_parquet(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as a single Parquet file.

Schemas are unified across batches before writing so that columns with minor
type drift (e.g. ``int64`` vs ``float64`` across batches) are cast to a
consistent schema rather than causing a write error.
"""
schemas = [lazy.pq.read_schema(f) for f in batch_files]
unified_schema = lazy.pa.unify_schemas(schemas)
with lazy.pq.ParquetWriter(output, unified_schema) as writer:
for batch_file in batch_files:
table = lazy.pq.read_table(batch_file)
writer.write_table(table.cast(unified_schema))
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ def test_create_command_delegates_to_controller(mock_ctrl_cls: MagicMock) -> Non
mock_ctrl = MagicMock()
mock_ctrl_cls.return_value = mock_ctrl

create_command(config_source="config.yaml", num_records=10, dataset_name="dataset", artifact_path=None)
create_command(
config_source="config.yaml", num_records=10, dataset_name="dataset", artifact_path=None, output_format=None
)

mock_ctrl_cls.assert_called_once()
mock_ctrl.run_create.assert_called_once_with(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format=None,
)


Expand All @@ -40,13 +43,15 @@ def test_create_command_passes_custom_options(mock_ctrl_cls: MagicMock) -> None:
num_records=100,
dataset_name="my_data",
artifact_path="/custom/output",
output_format=None,
)

mock_ctrl.run_create.assert_called_once_with(
config_source="config.py",
num_records=100,
dataset_name="my_data",
artifact_path="/custom/output",
output_format=None,
)


Expand All @@ -56,11 +61,37 @@ def test_create_command_default_artifact_path_is_none(mock_ctrl_cls: MagicMock)
mock_ctrl = MagicMock()
mock_ctrl_cls.return_value = mock_ctrl

create_command(config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None)
create_command(
config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None, output_format=None
)

mock_ctrl.run_create.assert_called_once_with(
config_source="config.yaml",
num_records=5,
dataset_name="ds",
artifact_path=None,
output_format=None,
)


@patch("data_designer.cli.commands.create.GenerationController")
def test_create_command_passes_output_format(mock_ctrl_cls: MagicMock) -> None:
"""Test create_command forwards --output-format to the controller."""
mock_ctrl = MagicMock()
mock_ctrl_cls.return_value = mock_ctrl

create_command(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format="jsonl",
)

mock_ctrl.run_create.assert_called_once_with(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format="jsonl",
)
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,64 @@ def test_run_create_skips_report_when_analysis_is_none(mock_load_config: MagicMo
# load_analysis() returns None, so to_report() must not be called.
# If the code ignores the None check, an AttributeError propagates and the test fails.
mock_results.load_analysis.assert_called_once()


@patch(f"{_CTRL}.DataDesigner")
@patch(f"{_CTRL}.load_config_builder")
def test_run_create_with_output_format_happy_path(mock_load_config: MagicMock, mock_dd_cls: MagicMock) -> None:
"""export() is called with the correct path and format when --output-format is given."""
mock_load_config.return_value = MagicMock(spec=DataDesignerConfigBuilder)
mock_dd = MagicMock()
mock_dd_cls.return_value = mock_dd
mock_results = _make_mock_create_results(5)
mock_dd.create.return_value = mock_results

controller = GenerationController()
controller.run_create(
config_source="config.yaml",
num_records=5,
dataset_name="dataset",
artifact_path=None,
output_format="jsonl",
)

mock_results.export.assert_called_once_with(
Path("/output/artifacts/dataset") / "dataset.jsonl",
)


def test_run_create_invalid_output_format_exits() -> None:
"""Bad --output-format exits with code 1 before generation starts."""
controller = GenerationController()
with pytest.raises(typer.Exit) as exc_info:
controller.run_create(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format="xlsx",
)
assert exc_info.value.exit_code == 1


@patch(f"{_CTRL}.DataDesigner")
@patch(f"{_CTRL}.load_config_builder")
def test_run_create_export_failure_exits(mock_load_config: MagicMock, mock_dd_cls: MagicMock) -> None:
"""If export() raises, run_create exits with code 1."""
mock_load_config.return_value = MagicMock(spec=DataDesignerConfigBuilder)
mock_dd = MagicMock()
mock_dd_cls.return_value = mock_dd
mock_results = _make_mock_create_results(5)
mock_results.export.side_effect = RuntimeError("disk full")
mock_dd.create.return_value = mock_results

controller = GenerationController()
with pytest.raises(typer.Exit) as exc_info:
controller.run_create(
config_source="config.yaml",
num_records=5,
dataset_name="dataset",
artifact_path=None,
output_format="csv",
)
assert exc_info.value.exit_code == 1
1 change: 1 addition & 0 deletions packages/data-designer/tests/cli/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ def test_app_dispatches_lazy_create_command(mock_controller_cls: Mock) -> None:
num_records=DEFAULT_NUM_RECORDS,
dataset_name="dataset",
artifact_path=None,
output_format=None,
)
Loading
Loading