Skip to content

Commit 2c2eb90

Browse files
committed
fix: address sync removal review
1 parent 5dad024 commit 2c2eb90

4 files changed

Lines changed: 41 additions & 90 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ Data Designer helps you create synthetic datasets that go beyond simple LLM prom
2222

2323
---
2424

25-
### 📣 Heads-up: async engine is now the default
25+
### 📣 Heads-up: async engine
2626

2727
Data Designer now runs pipelines on a cell-level async engine that overlaps independent columns and adapts concurrency per (provider, model). On most pipelines this is faster with no config changes; on slow self-hosted endpoints, set `inference_parameters.timeout` to your real per-request latency. See [Architecture & Performance → Async Engine](https://docs.nvidia.com/nemo/datadesigner/concepts/architecture-performance#async-engine) for the behaviors worth knowing about.
2828

29-
If you hit anything unexpected, fall back to the legacy sync engine for one transitional release with `DATA_DESIGNER_ASYNC_ENGINE=0`, and please [open an issue](https://github.com/NVIDIA-NeMo/DataDesigner/issues/new) so we can fix the async path.
29+
If you hit anything unexpected, please [open an issue](https://github.com/NVIDIA-NeMo/DataDesigner/issues/new).
3030

3131
---
3232

packages/data-designer/src/data_designer/interface/data_designer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,11 @@ def get_models(self, model_aliases: list[str]) -> dict[str, ModelFacade]:
615615
Dict mapping alias to ModelFacade instance.
616616
"""
617617
config_builder = DataDesignerConfigBuilder()
618-
resource_provider = self._create_resource_provider("dev", config_builder)
618+
resource_provider = self._create_resource_provider(
619+
"dev",
620+
config_builder,
621+
client_concurrency_mode=ClientConcurrencyMode.SYNC,
622+
)
619623
return {alias: resource_provider.model_registry.get_model(model_alias=alias) for alias in model_aliases}
620624

621625
def _resolve_model_providers(self, model_providers: list[ModelProvider] | None) -> list[ModelProvider]:
@@ -668,6 +672,7 @@ def _create_resource_provider(
668672
*,
669673
resume: ResumeMode = ResumeMode.NEVER,
670674
artifact_path: Path | None = None,
675+
client_concurrency_mode: ClientConcurrencyMode = ClientConcurrencyMode.ASYNC,
671676
) -> ResourceProvider:
672677
artifact_path = artifact_path or self._artifact_path
673678
ArtifactStorage.mkdir_if_needed(artifact_path)
@@ -687,7 +692,7 @@ def _create_resource_provider(
687692
run_config=self._run_config,
688693
mcp_providers=self._mcp_providers,
689694
tool_configs=config_builder.tool_configs,
690-
client_concurrency_mode=ClientConcurrencyMode.ASYNC,
695+
client_concurrency_mode=client_concurrency_mode,
691696
request_admission=self._request_admission,
692697
)
693698

packages/data-designer/tests/interface/test_data_designer.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,20 @@ def test_run_config_normalizes_error_rate_when_disabled(stub_artifact_path, stub
538538
assert data_designer.run_config.shutdown_error_rate == 1.0
539539

540540

541+
def test_get_models_uses_sync_clients(stub_artifact_path, stub_model_providers):
542+
data_designer = DataDesigner(artifact_path=stub_artifact_path, model_providers=stub_model_providers)
543+
544+
with patch.object(data_designer, "_create_resource_provider") as mock_resource_provider_method:
545+
mock_resource_provider = MagicMock()
546+
mock_resource_provider.model_registry.get_model.return_value = MagicMock()
547+
mock_resource_provider_method.return_value = mock_resource_provider
548+
549+
data_designer.get_models(["stub-model"])
550+
551+
_, kwargs = mock_resource_provider_method.call_args
552+
assert kwargs["client_concurrency_mode"] == ClientConcurrencyMode.SYNC
553+
554+
541555
def test_create_forwards_on_batch_complete_callback(
542556
stub_artifact_path: Path,
543557
stub_model_providers: list[ModelProvider],

scripts/benchmarks/benchmark_engine_v2.py

Lines changed: 18 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import hashlib
1212
import json
1313
import math
14-
import os
1514
import random
1615
import statistics
1716
import subprocess
@@ -277,18 +276,6 @@ def _format_stats(stats: MetricStats, *, unit: str, precision: int = 3) -> str:
277276
return f"{mean}{unit} ± {ci}{unit} (stdev {stdev}{unit}, n={stats.n})"
278277

279278

280-
def _format_speed_stats(stats: MetricStats, *, precision: int = 2) -> str:
281-
fmt = f"{{:.{precision}f}}"
282-
mean = fmt.format(stats.mean)
283-
ci = fmt.format(stats.ci_half_width)
284-
stdev = fmt.format(stats.stdev)
285-
return f"{mean}x ± {ci}x (stdev {stdev}x, n={stats.n})"
286-
287-
288-
def _significant_diff(stats: MetricStats) -> bool:
289-
return stats.n > 1 and abs(stats.mean) > stats.ci_half_width
290-
291-
292279
def _json_default(value: Any) -> Any:
293280
if isinstance(value, np.generic):
294281
return value.item()
@@ -589,13 +576,12 @@ def _dataset_fingerprint(df: pd.DataFrame) -> str:
589576

590577

591578
def _run_single_benchmark(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkResult:
592-
# Imports are deferred so engine selection respects DATA_DESIGNER_ASYNC_ENGINE.
593-
from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage
594579
from data_designer.engine.dataset_builders.dataset_builder import DatasetBuilder
595580
from data_designer.engine.model_provider import resolve_model_provider_registry
596581
from data_designer.engine.resources.resource_provider import create_resource_provider
597582
from data_designer.engine.resources.seed_reader import SeedReaderRegistry
598583
from data_designer.engine.secret_resolver import CompositeResolver, EnvironmentResolver, PlaintextResolver
584+
from data_designer.engine.storage.artifact_storage import ArtifactStorage
599585

600586
random.seed(settings.seed)
601587
np.random.seed(settings.seed)
@@ -630,7 +616,6 @@ def _run_single_benchmark(settings: BenchmarkSettings, engine_mode: str) -> Benc
630616
secret_resolver=secret_resolver,
631617
model_provider_registry=model_provider_registry,
632618
seed_reader_registry=SeedReaderRegistry(readers=[]),
633-
blob_storage=None,
634619
seed_dataset_source=None,
635620
run_config=run_config,
636621
mcp_providers=[mcp_provider],
@@ -664,15 +649,9 @@ def _run_single_benchmark(settings: BenchmarkSettings, engine_mode: str) -> Benc
664649

665650

666651
def _run_subprocess(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkResult:
667-
env = os.environ.copy()
668-
if engine_mode == "async":
669-
env["DATA_DESIGNER_ASYNC_ENGINE"] = "1"
670-
else:
671-
env.pop("DATA_DESIGNER_ASYNC_ENGINE", None)
672-
673652
script_path = Path(__file__).resolve()
674653
cmd = [sys.executable, str(script_path), "--mode", "run", "--engine", engine_mode, *settings.to_cli_args()]
675-
completed = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False)
654+
completed = subprocess.run(cmd, capture_output=True, text=True, check=False)
676655

677656
if completed.returncode != 0:
678657
raise RuntimeError(f"Benchmark subprocess failed.\nstdout:\n{completed.stdout}\nstderr:\n{completed.stderr}")
@@ -687,12 +666,6 @@ def _run_subprocess(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkR
687666
)
688667

689668

690-
def _format_speedup(sync_time: float, async_time: float) -> str:
691-
if async_time <= 0:
692-
return "n/a"
693-
return f"{(sync_time / async_time):.2f}x"
694-
695-
696669
def _run_with_progress(settings: BenchmarkSettings, engine_mode: str, iteration: int, total: int) -> BenchmarkResult:
697670
print(f"[{iteration}/{total}] Running {engine_mode} benchmark...", end="", flush=True)
698671
result = _run_subprocess(settings, engine_mode)
@@ -701,68 +674,32 @@ def _run_with_progress(settings: BenchmarkSettings, engine_mode: str, iteration:
701674

702675

703676
def _compare_runs(settings: BenchmarkSettings, iterations: int) -> int:
704-
sync_results: list[BenchmarkResult] = []
705-
async_results: list[BenchmarkResult] = []
677+
results: list[BenchmarkResult] = []
706678
expected_hash: str | None = None
707679

708680
for iteration in range(1, iterations + 1):
709-
sync_result = _run_with_progress(settings, "sync", iteration, iterations)
710-
async_result = _run_with_progress(settings, "async", iteration, iterations)
711-
712-
if sync_result.dataset_hash != async_result.dataset_hash:
713-
print(
714-
"Content mismatch detected: "
715-
f"sync hash {sync_result.dataset_hash} vs async hash {async_result.dataset_hash}"
716-
)
717-
return 1
681+
result = _run_with_progress(settings, "async", iteration, iterations)
718682

719683
if expected_hash is None:
720-
expected_hash = sync_result.dataset_hash
721-
elif expected_hash != sync_result.dataset_hash or expected_hash != async_result.dataset_hash:
684+
expected_hash = result.dataset_hash
685+
elif expected_hash != result.dataset_hash:
722686
print("Content mismatch detected across iterations.")
723687
return 1
724688

725-
sync_results.append(sync_result)
726-
async_results.append(async_result)
727-
728-
build_sync = [result.build_time_sec for result in sync_results]
729-
build_async = [result.build_time_sec for result in async_results]
730-
total_sync = [result.total_time_sec for result in sync_results]
731-
total_async = [result.total_time_sec for result in async_results]
689+
results.append(result)
732690

733-
build_speedups = [sync / async_ for sync, async_ in zip(build_sync, build_async)]
734-
total_speedups = [sync / async_ for sync, async_ in zip(total_sync, total_async)]
735-
build_diffs = [sync - async_ for sync, async_ in zip(build_sync, build_async)]
736-
total_diffs = [sync - async_ for sync, async_ in zip(total_sync, total_async)]
691+
build_times = [result.build_time_sec for result in results]
692+
total_times = [result.total_time_sec for result in results]
737693

738-
build_sync_stats = _compute_stats(build_sync)
739-
build_async_stats = _compute_stats(build_async)
740-
total_sync_stats = _compute_stats(total_sync)
741-
total_async_stats = _compute_stats(total_async)
742-
743-
build_speed_stats = _compute_stats(build_speedups)
744-
total_speed_stats = _compute_stats(total_speedups)
745-
build_diff_stats = _compute_stats(build_diffs)
746-
total_diff_stats = _compute_stats(total_diffs)
694+
build_stats = _compute_stats(build_times)
695+
total_stats = _compute_stats(total_times)
747696

748697
latency_label = "on" if settings.simulated_latency else "off"
749-
print("\nEngine benchmark summary (95% CI)")
698+
print("\nAsync engine benchmark summary (95% CI)")
750699
print(f"- runs: {iterations} | content match: yes | hash {expected_hash}")
751700
print(f"- simulated latency: {latency_label}")
752-
print(f"- build time sync: {_format_stats(build_sync_stats, unit='s')}")
753-
print(f"- build time async: {_format_stats(build_async_stats, unit='s')}")
754-
print(
755-
f"- build speedup: {_format_speed_stats(build_speed_stats)} | "
756-
f"paired diff {_format_stats(build_diff_stats, unit='s')} | "
757-
f"significant: {'yes' if _significant_diff(build_diff_stats) else 'no'}"
758-
)
759-
print(f"- total time sync: {_format_stats(total_sync_stats, unit='s')}")
760-
print(f"- total time async: {_format_stats(total_async_stats, unit='s')}")
761-
print(
762-
f"- total speedup: {_format_speed_stats(total_speed_stats)} | "
763-
f"paired diff {_format_stats(total_diff_stats, unit='s')} | "
764-
f"significant: {'yes' if _significant_diff(total_diff_stats) else 'no'}"
765-
)
701+
print(f"- build time: {_format_stats(build_stats, unit='s')}")
702+
print(f"- total time: {_format_stats(total_stats, unit='s')}")
766703

767704
return 0
768705

@@ -776,13 +713,13 @@ def _parse_args() -> argparse.Namespace:
776713
type=str,
777714
choices=("compare", "run"),
778715
default="compare",
779-
help="Run both engines in subprocesses, or run once in the current process.",
716+
help="Run repeated async benchmarks in subprocesses, or run once in the current process.",
780717
)
781718
parser.add_argument(
782719
"--engine",
783720
type=str,
784-
choices=("sync", "async"),
785-
default="sync",
721+
choices=("async",),
722+
default="async",
786723
help="Engine mode for --mode run.",
787724
)
788725
parser.add_argument("--num-records", type=int, default=DEFAULT_NUM_RECORDS, help="Records to generate.")
@@ -792,7 +729,7 @@ def _parse_args() -> argparse.Namespace:
792729
"--iterations",
793730
type=int,
794731
default=DEFAULT_ITERATIONS,
795-
help="Number of sync/async runs to include in the compare mode.",
732+
help="Number of async runs to include in compare mode.",
796733
)
797734
parser.add_argument(
798735
"--max-parallel-requests",
@@ -829,11 +766,6 @@ def main() -> None:
829766
if args.mode == "compare":
830767
sys.exit(_compare_runs(settings, args.iterations))
831768

832-
if args.engine == "async":
833-
os.environ["DATA_DESIGNER_ASYNC_ENGINE"] = "1"
834-
else:
835-
os.environ.pop("DATA_DESIGNER_ASYNC_ENGINE", None)
836-
837769
print(f"Running {args.engine} benchmark...")
838770
result = _run_single_benchmark(settings, args.engine)
839771
print(f"{RESULT_PREFIX}{json.dumps(result.to_dict(), sort_keys=True)}")

0 commit comments

Comments
 (0)