|
1 | 1 | import asyncio |
| 2 | +import json |
2 | 3 | import os |
| 4 | +import tempfile |
3 | 5 | from collections.abc import Callable |
4 | 6 | from typing import Any |
5 | 7 |
|
6 | 8 | from alphatrion.runtime.contextvars import current_exp_id, current_run_id |
7 | 9 | from alphatrion.runtime.runtime import global_runtime |
8 | 10 | from alphatrion.snapshot.snapshot import ( |
9 | | - ExecutionKind, |
10 | | - build_run_execution, |
11 | 11 | checkpoint_path, |
12 | | - snapshot_path, |
13 | 12 | ) |
14 | 13 |
|
15 | 14 | BEST_RESULT_PATH = "best_result_path" |
@@ -151,79 +150,48 @@ async def log_metrics(metrics: dict[str, float]) -> bool: |
151 | 150 | return is_best_metric |
152 | 151 |
|
153 | 152 |
|
154 | | -# log_result is used to log the result of a run/experiment, |
155 | | -# including both input and output, e.g. you want to save the code snippet. |
156 | | -# It will be stored in the object storage as a JSON file if object storage |
157 | | -# is enabled or locally otherwise. |
158 | | -# NOTE: will be deprecated in the v0.3.0, use log_dataset instead. |
159 | | -async def log_result( |
160 | | - output: dict[str, Any], |
161 | | - input: dict[str, Any] | None = None, |
162 | | - phase: str = "success", |
163 | | - kind: ExecutionKind = ExecutionKind.RUN, |
164 | | -): |
165 | | - result = None |
166 | | - |
167 | | - if kind == ExecutionKind.RUN: |
168 | | - result = build_run_execution(output=output, input=input, phase=phase) |
169 | | - else: |
170 | | - raise NotImplementedError( |
171 | | - f"Logging record of kind {result.kind} is not implemented yet." |
172 | | - ) |
173 | | - |
174 | | - # Can I get the file size to store in the database? |
| 153 | +# log_records is used to log a list of records, which is similar to log_metrics |
| 154 | +# but for tracing the execution of the code. |
| 155 | +# async def log_records(): |
175 | 156 |
|
176 | | - path = snapshot_path() |
177 | | - if os.path.exists(path) is False: |
178 | | - os.makedirs(path, exist_ok=True) |
179 | 157 |
|
180 | | - # Will eventually be cleanup on Experiment done() if AUTO_CLEANUP is enabled. |
181 | | - # Considering the record file is small, we just save it locally first. |
182 | | - # If this changes in the future, we should delete them after uploading. |
183 | | - with open(os.path.join(path, "result.json"), "w") as f: |
184 | | - f.write(result.model_dump_json()) |
| 158 | +async def log_dataset( |
| 159 | + name: str, |
| 160 | + data: dict[str, Any], |
| 161 | +): |
| 162 | + """ |
| 163 | + Log dataset to the database and artifact registry. |
185 | 164 |
|
186 | | - file_size = os.path.getsize(os.path.join(path, "result.json")) |
| 165 | + :param name: the name of the dataset. |
| 166 | + :param data: the data to be logged, currently support dict only, |
| 167 | + will support more types in the future. |
| 168 | + """ |
187 | 169 | runtime = global_runtime() |
188 | 170 |
|
189 | | - # If not enabled, only save to local disk. |
190 | | - if runtime.artifact_storage_enabled(): |
191 | | - path = await log_artifact( |
192 | | - paths=os.path.join(path, "result.json"), |
193 | | - repo_name="execution", |
194 | | - ) |
195 | | - runtime.metadb.update_run( |
196 | | - run_id=current_run_id.get(), |
197 | | - meta={ |
198 | | - EXECUTION_RESULT: { |
199 | | - "path": path, |
200 | | - "size": file_size, |
201 | | - "file_name": "result.json", |
202 | | - } |
203 | | - }, |
204 | | - ) |
205 | | - |
206 | | - |
207 | | -# log_records is used to log a list of records, which is similar to log_metrics |
208 | | -# but for tracing the execution of the code. |
209 | | -# async def log_records(): |
210 | | - |
211 | | -# log_dataset will store sometime in the artifacts als record in the database. |
212 | | -# async def log_dataset( |
213 | | -# name: str, |
214 | | -# paths: str | list[str], |
215 | | -# version: str | None = None, |
216 | | -# ): |
217 | | -# path = await log_artifact( |
218 | | -# paths=paths, |
219 | | -# repo_name="dataset", |
220 | | -# version=version, |
221 | | -# ) |
222 | | - |
223 | | -# runtime = global_runtime() |
224 | | -# runtime.metadb.create_dataset( |
225 | | -# name=name, |
226 | | -# team_id=runtime._team_id, |
227 | | -# path=path, |
228 | | -# version=version, |
229 | | -# ) |
| 171 | + if isinstance(data, dict): |
| 172 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 173 | + os.chdir(tmpdir) |
| 174 | + with open(name, "w") as f: |
| 175 | + f.write(json.dumps(data)) |
| 176 | + |
| 177 | + file_size = os.path.getsize(name) |
| 178 | + |
| 179 | + path = await log_artifact( |
| 180 | + paths=name, |
| 181 | + repo_name="dataset", |
| 182 | + ) |
| 183 | + |
| 184 | + runtime.metadb.create_dataset( |
| 185 | + name=name, |
| 186 | + team_id=runtime.team_id, |
| 187 | + user_id=runtime.user_id, |
| 188 | + path=path, |
| 189 | + experiment_id=current_exp_id.get(), |
| 190 | + run_id=current_run_id.get(), |
| 191 | + meta={"size": file_size}, |
| 192 | + ) |
| 193 | + return |
| 194 | + |
| 195 | + raise NotImplementedError( |
| 196 | + f"Logging dataset of type {type(data)} is not implemented yet." |
| 197 | + ) |
0 commit comments