-
Notifications
You must be signed in to change notification settings - Fork 165
Expand file tree
/
Copy pathresults.py
More file actions
234 lines (192 loc) · 9.69 KB
/
results.py
File metadata and controls
234 lines (192 loc) · 9.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Literal, get_args
import data_designer.lazy_heavy_imports as lazy
from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults
from data_designer.config.config_builder import DataDesignerConfigBuilder
from data_designer.config.dataset_metadata import DatasetMetadata
from data_designer.config.errors import InvalidFileFormatError
from data_designer.config.utils.visualization import WithRecordSamplerMixin
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
from data_designer.engine.storage.artifact_storage import ArtifactStorage
from data_designer.integrations.huggingface.client import HuggingFaceHubClient
if TYPE_CHECKING:
import pandas as pd
from data_designer.engine.dataset_builders.utils.task_model import TaskTrace
ExportFormat = Literal["jsonl", "csv", "parquet"]
SUPPORTED_EXPORT_FORMATS: tuple[str, ...] = get_args(ExportFormat)
class DatasetCreationResults(WithRecordSamplerMixin):
"""Results container for a Data Designer dataset creation run.
This class provides access to the generated dataset, profiling analysis, and
visualization utilities. It is returned by the DataDesigner.create() method
and implements ResultsProtocol of the DataDesigner interface.
"""
def __init__(
self,
*,
artifact_storage: ArtifactStorage,
analysis: DatasetProfilerResults,
config_builder: DataDesignerConfigBuilder,
dataset_metadata: DatasetMetadata,
task_traces: list[TaskTrace] | None = None,
):
"""Creates a new instance with results based on a dataset creation run.
Args:
artifact_storage: Storage manager for accessing generated artifacts.
analysis: Profiling results for the generated dataset.
config_builder: Configuration builder used to create the dataset.
dataset_metadata: Metadata about the generated dataset (e.g., seed column names).
task_traces: Optional list of TaskTrace objects from the async scheduler.
"""
self.artifact_storage = artifact_storage
self._analysis = analysis
self._config_builder = config_builder
self.dataset_metadata = dataset_metadata
self.task_traces: list[TaskTrace] = task_traces or []
def load_analysis(self) -> DatasetProfilerResults:
"""Load the profiling analysis results for the generated dataset.
Returns:
DatasetProfilerResults containing statistical analysis and quality metrics
for each column in the generated dataset.
"""
return self._analysis
def load_dataset(self) -> pd.DataFrame:
"""Load the generated dataset as a pandas DataFrame.
Returns:
A pandas DataFrame containing the full generated dataset.
"""
return self.artifact_storage.load_dataset()
def load_processor_dataset(self, processor_name: str) -> pd.DataFrame:
"""Load the dataset generated by a processor.
This only works for processors that write their artifacts in Parquet format.
Args:
processor_name: The name of the processor to load the dataset from.
Returns:
A pandas DataFrame containing the dataset generated by the processor.
"""
return self.artifact_storage.load_processor_dataset(processor_name)
def get_path_to_processor_artifacts(self, processor_name: str) -> Path:
"""Get the path to the artifacts generated by a processor.
Args:
processor_name: The name of the processor to load the artifact from.
Returns:
The path to the artifacts.
"""
if not self.artifact_storage.processors_outputs_path.exists():
raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.")
return self.artifact_storage.processors_outputs_path / processor_name
def export(self, path: Path | str, *, format: ExportFormat | None = None) -> Path:
"""Export the generated dataset to a single file by streaming batch files.
The output format is inferred from the file extension when *format* is
omitted. Pass *format* explicitly to override the extension (e.g. write a
``.txt`` file as JSONL).
Unlike :meth:`load_dataset`, this method never materialises the full dataset
in memory — it reads batch parquet files one at a time and appends each to
the output file, keeping peak memory proportional to a single batch.
Args:
path: Output file path. The exact path is used as-is; the extension is
not rewritten.
format: Output format. One of ``'jsonl'``, ``'csv'``, or ``'parquet'``.
When omitted, the format is inferred from the file extension.
Returns:
Path to the written file.
Raises:
InvalidFileFormatError: If the format cannot be determined or is not
one of the supported values.
ArtifactStorageError: If no batch parquet files are found.
Example:
>>> results = data_designer.create(config, num_records=1000)
>>> results.export("output.jsonl")
PosixPath('output.jsonl')
>>> results.export("output.csv")
PosixPath('output.csv')
>>> results.export("output.txt", format="jsonl")
PosixPath('output.txt')
"""
path = Path(path)
resolved_format: str = format if format is not None else path.suffix.lstrip(".")
if resolved_format not in SUPPORTED_EXPORT_FORMATS:
raise InvalidFileFormatError(
f"Unsupported export format: {resolved_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}."
)
batch_files = sorted(self.artifact_storage.final_dataset_path.glob("batch_*.parquet"))
if not batch_files:
raise ArtifactStorageError("No batch parquet files found to export.")
if resolved_format == "jsonl":
_export_jsonl(batch_files, path)
elif resolved_format == "csv":
_export_csv(batch_files, path)
elif resolved_format == "parquet":
_export_parquet(batch_files, path)
return path
def push_to_hub(
self,
repo_id: str,
description: str,
*,
token: str | None = None,
private: bool = False,
tags: list[str] | None = None,
) -> str:
"""Push dataset to HuggingFace Hub.
Uploads all artifacts including:
- Main parquet batch files (data subset)
- Processor output batch files ({processor_name} subsets)
- Configuration (builder_config.json)
- Metadata (metadata.json)
- Auto-generated dataset card (README.md)
Args:
repo_id: HuggingFace repo ID (e.g., "username/my-dataset")
description: Custom description text for the dataset card.
Appears after the title.
token: HuggingFace API token. If None, the token is automatically
resolved from HF_TOKEN environment variable or cached credentials
from `hf auth login`.
private: Create private repo
tags: Additional custom tags for the dataset.
Returns:
URL to the uploaded dataset
Example:
>>> results = data_designer.create(config, num_records=1000)
>>> description = "This dataset contains synthetic conversations for training chatbots."
>>> results.push_to_hub("username/my-synthetic-dataset", description, tags=["chatbot", "conversation"])
'https://huggingface.co/datasets/username/my-synthetic-dataset'
"""
client = HuggingFaceHubClient(token=token)
return client.upload_dataset(
repo_id=repo_id,
base_dataset_path=self.artifact_storage.base_dataset_path,
private=private,
description=description,
tags=tags,
)
def _export_jsonl(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as JSONL, one record per line.
Each batch is appended in turn so peak memory stays proportional to one batch.
"""
with output.open("w", encoding="utf-8") as f:
for batch_file in batch_files:
chunk = lazy.pd.read_parquet(batch_file)
content = chunk.to_json(orient="records", lines=True, force_ascii=False, date_format="iso")
f.write(content)
if not content.endswith("\n"):
f.write("\n")
def _export_csv(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as CSV with a single header row."""
for i, batch_file in enumerate(batch_files):
chunk = lazy.pd.read_parquet(batch_file)
chunk.to_csv(output, mode="a" if i > 0 else "w", header=(i == 0), index=False)
def _export_parquet(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as a single Parquet file.
Schemas are unified across batches before writing so that columns with minor
type drift (e.g. ``int64`` vs ``float64`` across batches) are cast to a
consistent schema rather than causing a write error.
"""
schemas = [lazy.pq.read_schema(f) for f in batch_files]
unified_schema = lazy.pa.unify_schemas(schemas)
with lazy.pq.ParquetWriter(output, unified_schema) as writer:
for batch_file in batch_files:
table = lazy.pq.read_table(batch_file)
writer.write_table(table.cast(unified_schema))