diff --git a/dataflow/pipeline/Pipeline.py b/dataflow/pipeline/Pipeline.py index 9748c4c8..df2e76cf 100644 --- a/dataflow/pipeline/Pipeline.py +++ b/dataflow/pipeline/Pipeline.py @@ -526,6 +526,13 @@ def _compiled_forward(self, resume_step: int=0): storage=op_node.storage, **op_node.kwargs ) + # Auto-release actor resources (e.g. GPU memory) held by + # RayAcceleratedOperator after each compiled stage. Data has + # already been persisted to storage inside run(), so it is safe + # to tear down the actors before the next stage starts. + if hasattr(op_node.op_obj, "shutdown"): + self.logger.debug(f"Auto-shutting down {op_node.op_name} to release actor resources.") + op_node.op_obj.shutdown() if op_node.llm_serving != None: self.llm_serving_counter[self.active_llm_serving] -= 1 if self.llm_serving_counter[self.active_llm_serving] == 0: @@ -599,6 +606,10 @@ def _compiled_forward(self, resume_step: int=0, batch_size: int|None=None, resum resume_batch = op_node.storage.batch_step if batch_size is not None else 0 with open(cache_path, "w") as f: f.write(f"{idx-1},{resume_batch}\n") + # Auto-release actor resources after all batches complete + if hasattr(op_node.op_obj, "shutdown"): + self.logger.debug(f"Auto-shutting down {op_node.op_name} to release actor resources.") + op_node.op_obj.shutdown() if resume_from_last: resume_batch = 0 # reset for next op_node with open(cache_path, "w") as f: @@ -685,6 +696,10 @@ def _compiled_forward(self, resume_step: int=0, batch_size: int|None=None, resum resume_batch = op_node.storage.batch_step if batch_size is not None else 0 with open(cache_path, "w") as f: f.write(f"{idx-1},{resume_batch}\n") + # Auto-release actor resources after all stream batches complete + if hasattr(op_node.op_obj, "shutdown"): + self.logger.debug(f"Auto-shutting down {op_node.op_name} to release actor resources.") + op_node.op_obj.shutdown() if resume_from_last: resume_batch = 0 # reset for next op_node with open(cache_path, "w") as f: diff --git a/dataflow/rayorch/__init__.py b/dataflow/rayorch/__init__.py new file mode 100644 index 00000000..028d1d0e --- /dev/null +++ b/dataflow/rayorch/__init__.py @@ -0,0 +1,20 @@ +"""RayOrch integration for DataFlow — transparent data-parallel acceleration. + +Usage:: + + from dataflow.rayorch import RayAcceleratedOperator + + scorer = RayAcceleratedOperator( + FineWebEduSampleEvaluator, + replicas=4, + num_gpus_per_replica=0.25, + ).op_cls_init(device="cuda") + scorer.run(storage, input_key="text", output_key="edu_score") +""" + +from .accelerated_op import RayAcceleratedOperator +from .memory_storage import InMemoryStorage as _InMemoryStorage + +__all__ = [ + "RayAcceleratedOperator", +] diff --git a/dataflow/rayorch/_test_ops.py b/dataflow/rayorch/_test_ops.py new file mode 100644 index 00000000..e6e4bb5c --- /dev/null +++ b/dataflow/rayorch/_test_ops.py @@ -0,0 +1,44 @@ +"""Dummy CPU-only operators for testing RayAcceleratedOperator. + +These are intentionally trivial, deterministic, and row-independent so +they can be used in CI without GPU resources. Prefixed with underscore +to signal internal/test-only usage. +""" +from __future__ import annotations + +from dataflow.core.operator import OperatorABC +from dataflow.utils.storage import DataFlowStorage + + +class DummyDoubleOp(OperatorABC): + """Multiplies a numeric column by 2.""" + + def __init__(self): + super().__init__() + + def run( + self, + storage: DataFlowStorage, + input_key: str = "value", + output_key: str = "doubled", + ): + df = storage.read("dataframe") + df[output_key] = df[input_key] * 2 + storage.write(df) + + +class DummyIncrementOp(OperatorABC): + """Adds 1 to a numeric column.""" + + def __init__(self): + super().__init__() + + def run( + self, + storage: DataFlowStorage, + input_key: str = "doubled", + output_key: str = "incremented", + ): + df = storage.read("dataframe") + df[output_key] = df[input_key] + 1 + storage.write(df) diff --git a/dataflow/rayorch/accelerated_op.py b/dataflow/rayorch/accelerated_op.py new file mode 100644 index 00000000..d42d5e69 --- /dev/null +++ b/dataflow/rayorch/accelerated_op.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import inspect +from typing import Any, Generic, Optional, Protocol, Type, ParamSpec + +import pandas as pd + +from dataflow.core.operator import OperatorABC +from dataflow.utils.storage import DataFlowStorage + +from .memory_storage import InMemoryStorage + + +_INITP = ParamSpec("_INITP") +_RUNP = ParamSpec("_RUNP") + + +class _OperatorProto(Protocol[_INITP, _RUNP]): + """Structural type that captures both ``__init__`` and ``run`` signatures. + + Pyright / Pylance infers ``_INITP`` and ``_RUNP`` from the concrete + operator so that :meth:`op_cls_init` and :meth:`run` expose the + original parameter lists for IDE auto-complete. + """ + + def __init__(self, *args: _INITP.args, **kwargs: _INITP.kwargs) -> None: ... + + def run( + self, + storage: DataFlowStorage, + *args: _RUNP.args, + **kwargs: _RUNP.kwargs, + ) -> Any: ... + + +class _OpRunner: + """Actor-side worker: each replica holds an independent operator instance. + + Receives a chunk of records (``list[dict]``), wraps it in + :class:`InMemoryStorage`, delegates to the DataFlow operator's ``run``, + and returns the result as ``list[dict]``. + """ + + def __init__(self, op_cls: type, op_init_args: tuple, op_init_kwargs: dict): + self.op = op_cls(*op_init_args, **op_init_kwargs) + + def run(self, records: list[dict], run_params: dict) -> list[dict]: + if not records: + return [] + df = pd.DataFrame(records) + storage = InMemoryStorage(df) + self.op.run(storage, *run_params.get("args", ()), **run_params.get("kwargs", {})) + return storage.result.to_dict("records") + + +class RayAcceleratedOperator(OperatorABC, Generic[_INITP, _RUNP]): + """DataFlow operator backed by RayOrch for transparent data-parallel execution. + + From the pipeline's perspective this is a normal :class:`OperatorABC`: + it reads from and writes to :class:`DataFlowStorage` sequentially. + Internally it fans the DataFrame out to *replicas* Ray actors, + each holding an independent copy of the wrapped operator (and its model). + + Actors are created **lazily** on the first ``run()`` call so that + pipeline ``compile()`` does not trigger heavyweight model loading. + + Only suitable for **row-independent (map-style)** operators. Operators + that need cross-row global state (e.g. semantic dedup with a full + similarity matrix) should *not* use this wrapper. + + Both ``op_cls_init`` and ``run`` have their signatures inferred from + ``op_cls`` via ``ParamSpec``, giving full IDE auto-complete. + + Parameters + ---------- + op_cls: + The DataFlow operator class to parallelize. + replicas: + Number of parallel actor replicas. + num_gpus_per_replica: + Fractional GPU allocation per replica (e.g. ``0.25`` to share one + GPU across four replicas). + env: + Optional RayOrch ``EnvRegistry`` key for a custom ``runtime_env``. + + Example + ------- + :: + + from dataflow.rayorch import RayAcceleratedOperator + from dataflow.operators.text_pt.eval import FineWebEduSampleEvaluator + + scorer = RayAcceleratedOperator( + FineWebEduSampleEvaluator, + replicas=4, + num_gpus_per_replica=0.25, + ).op_cls_init(device="cuda") # ← IDE shows __init__ params + + scorer.run(storage, input_key="text") # ← IDE shows run params + """ + + def __init__( + self, + op_cls: Type[_OperatorProto[_INITP, _RUNP]], + *, + replicas: int = 1, + num_gpus_per_replica: float = 0.0, + env: Optional[str] = None, + ): + super().__init__() + self._op_cls = op_cls + self._op_init_args: tuple = () + self._op_init_kwargs: dict = {} + self._replicas = replicas + self._num_gpus_per_replica = num_gpus_per_replica + self._env = env + self._module = None # created lazily + + # PipelineABC.compile() compatibility: + # compile() → AutoOP uses inspect.signature(operator.run) to bind() + # call arguments. Our class-level run(storage, *args, **kwargs) would + # cause bind() to dump extra params into *args, which later gets + # serialised as an "args" key and leaks into the inner operator on + # _compiled_forward replay. Installing the inner operator's named + # signature on the instance avoids this entirely. + self._install_inner_run_signature(op_cls) + + def op_cls_init( + self, + *args: _INITP.args, + **kwargs: _INITP.kwargs, + ) -> RayAcceleratedOperator[_INITP, _RUNP]: + """Configure how the wrapped operator is constructed inside each actor. + + Parameters match ``op_cls.__init__``, so IDE auto-complete works. + May be omitted if the operator's defaults are sufficient. + """ + self._op_init_args = args + self._op_init_kwargs = kwargs + return self + + def _ensure_initialized(self) -> None: + if self._module is not None: + return + from rayorch import Dispatch, RayModule + + self._module = RayModule( + _OpRunner, + replicas=self._replicas, + num_gpus_per_replica=self._num_gpus_per_replica, + dispatch_mode=Dispatch.SHARD_CONTIGUOUS, + env=self._env, + ) + self._module.pre_init( + op_cls=self._op_cls, + op_init_args=self._op_init_args, + op_init_kwargs=self._op_init_kwargs, + ) + + # --- inner signature propagation --- + + def _install_inner_run_signature(self, op_cls: type) -> None: + """Replace ``self.run`` with a thin proxy carrying ``op_cls.run``'s + ``__signature__``. + + Why: ``PipelineABC.compile()`` → ``AutoOP`` uses + ``inspect.signature(operator.run)`` to ``bind()`` the call arguments. + If the signature is the generic ``(storage, *args, **kwargs)`` from + this wrapper, positional-overflow values land in ``*args`` and get + serialised as an ``"args"`` key in the kwargs dict. On replay via + ``_compiled_forward(**kwargs)``, that ``"args"`` key leaks into the + inner operator as an unexpected keyword argument. + + By exposing the inner operator's **named** parameters here, + ``bind()`` resolves every argument to a keyword — no ``*args`` + residue, no downstream pollution. Only this file changes; DataFlow + core is untouched. + """ + inner_sig = inspect.signature(op_cls.run) + params = [p for p in inner_sig.parameters.values() if p.name != "self"] + + impl = self._run_impl + + def run(*args: Any, **kwargs: Any) -> None: + return impl(*args, **kwargs) + + run.__signature__ = inspect.Signature(params) # type: ignore[attr-defined] + run.__doc__ = getattr(op_cls.run, "__doc__", None) + run.__name__ = "run" + run.__qualname__ = f"{type(self).__qualname__}.run" + self.run = run # type: ignore[assignment] + + # --- DataFlow OperatorABC interface --- + # Two-level design for compile() compatibility: + # 1. Class-level `run` — satisfies OperatorABC's abstract method so the + # class can be instantiated. Delegates to `_run_impl`. + # 2. Instance-level `run` (proxy) — installed by + # `_install_inner_run_signature` in __init__, carries the inner + # operator's __signature__ so AutoOP.bind() resolves args to keywords. + # Python attribute lookup checks instance __dict__ before the class, + # so the proxy always wins at runtime. + + def run( # type: ignore[override] + self, + storage: DataFlowStorage, + *args: _RUNP.args, + **kwargs: _RUNP.kwargs, + ) -> None: + return self._run_impl(storage, *args, **kwargs) + + def _run_impl( + self, + storage: DataFlowStorage, + *args: _RUNP.args, + **kwargs: _RUNP.kwargs, + ) -> None: + self._ensure_initialized() + df = storage.read("dataframe") + records: list[dict] = df.to_dict("records") + run_params: dict = {"args": args, "kwargs": kwargs} + result_records = self._module(records, run_params) + storage.write(pd.DataFrame(result_records)) + + # --- lifecycle helpers --- + + def shutdown(self) -> None: + """Terminate all Ray actors held by this operator.""" + if self._module is None: + return + import ray + + for actor in self._module.actors: + ray.kill(actor) + self._module = None + + def __repr__(self) -> str: + state = "initialized" if self._module is not None else "lazy" + return ( + f"RayAcceleratedOperator({self._op_cls.__name__}, " + f"replicas={self._replicas}, state={state})" + ) diff --git a/dataflow/rayorch/memory_storage.py b/dataflow/rayorch/memory_storage.py new file mode 100644 index 00000000..924dda55 --- /dev/null +++ b/dataflow/rayorch/memory_storage.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from typing import Any, Literal + +import pandas as pd + +from dataflow.utils.storage import DataFlowStorage + + +class InMemoryStorage(DataFlowStorage): + """Lightweight in-memory ``DataFlowStorage`` for use inside Ray actors. + + Avoids filesystem I/O and step-file coupling so that each actor + replica can independently read a DataFrame chunk and write results back. + + Typical lifecycle inside ``_OpRunner.run``:: + + storage = InMemoryStorage(df_chunk) + some_dataflow_op.run(storage, input_key="text", output_key="score") + result = storage.result # DataFrame written by the operator + + This storage does **not** participate in ``PipelineABC.compile()``, + so ``step()`` simply returns ``self`` (no copy needed). + A single ``_df`` is mutated in-place throughout the lifecycle: + ``write()`` replaces it, ``read()`` returns it. + """ + + def __init__(self, df: pd.DataFrame): + self._df = df + self.operator_step = 0 + + # --- DataFlowStorage ABC --- + + def read(self, output_type: Literal["dataframe", "dict"] = "dataframe") -> Any: + if output_type == "dataframe": + return self._df + if output_type == "dict": + return self._df.to_dict("records") + raise ValueError(f"Unsupported output_type: {output_type}") + + def write(self, data: Any) -> Any: + if isinstance(data, pd.DataFrame): + self._df = data + elif isinstance(data, list): + self._df = pd.DataFrame(data) + else: + raise ValueError(f"Unsupported data type for write: {type(data)}") + return None + + def get_keys_from_dataframe(self) -> list[str]: + return self._df.columns.tolist() + + def step(self): + self.operator_step += 1 + return self + + # --- helpers --- + + @property + def result(self) -> pd.DataFrame: + """Return the current DataFrame (after any writes).""" + return self._df diff --git a/pyproject.toml b/pyproject.toml index d1f125e3..70ecfe6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ dependencies = {file = "requirements.txt"} [project.optional-dependencies] -test = ["flask", "setuptools<=81.0.0"] # https://setuptools.pypa.io/en/latest/pkg_resources.html & https://github.com/dgtlmoon/changedetection.io/pull/2424 +test = ["flask", "setuptools<=81.0.0", "rayorch"] # https://setuptools.pypa.io/en/latest/pkg_resources.html & https://github.com/dgtlmoon/changedetection.io/pull/2424 vllm =["vllm>=0.7.0,<=0.9.2", "numpy<2.0.0"] vllm07 = ["vllm<0.8", "numpy<2.0.0"] vllm08 = ["vllm<0.9"] @@ -80,3 +80,4 @@ rag = ["lightrag-hku", "asyncio"] pdf2vqa=["mineru[vlm]>=2.5.0,<2.7.0"] flash-mineru = ["flash-mineru"] +ray = ["rayorch"] diff --git a/requirements.txt b/requirements.txt index 8e482431..fc7c0f8e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -88,3 +88,4 @@ db-dtypes google-cloud-bigquery-storage distflow + diff --git a/test/rayorch/.gitignore b/test/rayorch/.gitignore new file mode 100644 index 00000000..1019f4e5 --- /dev/null +++ b/test/rayorch/.gitignore @@ -0,0 +1 @@ +bench_results.json \ No newline at end of file diff --git a/test/rayorch/README-zh.md b/test/rayorch/README-zh.md new file mode 100644 index 00000000..6374476e --- /dev/null +++ b/test/rayorch/README-zh.md @@ -0,0 +1,145 @@ +# RayOrch 加速 DataFlow 算子 + +`RayAcceleratedOperator` 为 DataFlow 的行独立(map-style)算子提供透明的数据并行加速。 +只需包装已有算子即可将推理分发到多张 GPU——无需修改算子本身或 pipeline 的 `FileStorage`。 + +## 最简使用示例 + +```python +from dataflow.rayorch import RayAcceleratedOperator +from dataflow.operators.text_sft.eval.superfiltering_sample_evaluator import ( + SuperfilteringSampleEvaluator, +) +from dataflow.utils.storage import FileStorage + +# 1. 包装算子 — replicas × GPU +scorer = RayAcceleratedOperator( + SuperfilteringSampleEvaluator, # 任意 DataFlow OperatorABC + replicas=4, # 4 个并行 actor + num_gpus_per_replica=1.0, # 每个 actor 1 张 GPU +).op_cls_init(device="cuda", max_length=512) # 原始 __init__ 参数 + +# 2. 和原始算子完全一样使用 +storage = FileStorage( + first_entry_file_name="data/input.jsonl", + cache_path="./cache", + file_name_prefix="step", + cache_type="jsonl", +) +scorer.run( + storage=storage.step(), + input_instruction_key="instruction", + input_output_key="output", +) + +# 3. 用完释放资源(见下方 shutdown 说明) +scorer.shutdown() +``` + +Actor 在首次 `run()` 时**懒加载**创建,pipeline compile 阶段不会触发模型加载。 + +### 关于 `shutdown()` + +| 场景 | 是否必须调用 | +|------|-------------| +| 单算子 / 脚本结束退出 | **可选**。进程退出时 Ray 自动清理所有 actor。 | +| 多个 `RayAcceleratedOperator` 串联且占 GPU | **必须**。前一个 stage 的 actor 虽然空闲但仍持有 GPU 资源预留,后续 stage 创建新 actor 时会因资源不足而**永久阻塞**。 | +| `num_gpus_per_replica=0`(纯 CPU) | **可选**。CPU 资源充裕不会阻塞,但 actor 仍占用内存(已加载的模型权重等)。 | + +> **Note**: 使用 `PipelineABC.compile()` 时,`_compiled_forward` 会在每个 stage 结束后**自动调用** `shutdown()`,无需手动释放。 + +## 环境要求 + +```bash +# 以下二选一: +# 在 DataFlow 目录下重新安装(已包含 rayorch 依赖) +pip install -e . + +# 或仅额外安装 RayOrch +pip install rayorch==0.0.1 +``` + +## 测试文件 + +| 文件 | 说明 | +|------|------| +| `test_compile_cpu.py` | **CI 用例**(pytest, CPU):三种 Pipeline 类型 × compile × 多算子链 × 顺序/内容校验 | +| `test_accelerated_op.py` | Dummy sleep 算子——验证正确性和调度逻辑(仅需 CPU) | +| `test_pipeline_compile.py` | 真实算子(Superfiltering)compile 路径集成测试(需 GPU) | +| `test_real_operators.py` | 真实算子 benchmark,支持 argparse 传参,外层使用 `FileStorage` | + +## 快速开始 + +```bash +cd /path/to/DataFlow +export HF_ENDPOINT=https://hf-mirror.com + +# Dummy 算子测试(CPU 即可,约 30s) +python test/rayorch/test_accelerated_op.py + +# Superfiltering benchmark — 4096 行,2/4/8 卡并行 +python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 --replicas 2 4 8 + +# Deita Quality — 256 行,2/4 卡并行 +python test/rayorch/test_real_operators.py --op deita --rows 256 --replicas 2 4 + +# 跑所有算子 +python test/rayorch/test_real_operators.py --op all --rows 1024 + +# 自定义 GPU 分配(如每个 replica 0.5 卡) +python test/rayorch/test_real_operators.py --op superfiltering --rows 2048 --replicas 4 8 16 --gpus-per-replica 0.5 + +# 结果保存到指定路径 +python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 --save-json my_results.json +``` + +## CLI 参数 + +``` +--op {superfiltering,deita,all} 算子选择 (default: superfiltering) +--rows N Alpaca 数据行数 (default: 4096) +--replicas R [R ...] 并行副本数列表 (default: 2 4 8) +--gpus-per-replica G 每个副本的 GPU 数 (default: 1.0) +--save-json PATH JSON 结果保存路径 (default: bench_results.json) +``` + +串行 baseline(1 GPU)始终自动包含,无需手动指定。 + +## 输出说明 + +每个算子输出: +1. **Serial** — 单 GPU 串行时间(baseline) +2. **Parallel xN** — N 副本并行时间(分 cold start 和 warm compute) +3. **Correctness** — 并行结果与串行逐行对比(rtol=1e-3) +4. **Speedup** — warm compute 相对串行的加速倍数 + +结果同时保存为 JSON (`bench_results.json`),方便后续分析。 + +## Benchmark 结果 + +> 以下结果在 8× NVIDIA A800-SXM4-80GB 上测得。 + +### SuperfilteringSampleEvaluator (gpt2, 124 M) + +数据集:`tatsu-lab/alpaca`,4096 行,外层 storage = `FileStorage` + +| 配置 | Warm 耗时 | 加速比 | Cold Start | 正确性 | +|------|-----------|--------|------------|--------| +| Serial (1 GPU) | 66.41s | 1.0x | — | baseline | +| 2 GPUs | 36.48s | 1.8x | 12.1s | ✓ match | +| 4 GPUs | 18.45s | 3.6x | 13.0s | ✓ match | +| 8 GPUs | 10.80s | 6.1x | 18.4s | ✓ match | + +Superfiltering 基于 GPT-2 (124M),单行推理极快(~60 it/s),Ray 调度/序列化开销 +占比相对较高。更重的模型(如 Deita 7B)加速比会更接近线性。 + +### DeitaQualitySampleEvaluator (Llama-based, 7 B) + +_待补充:需要先缓存 `hkust-nlp/deita-quality-scorer` 模型(~14 GB)_ + +## 设计说明 + +- **Cold start**:首次 `run()` 调用创建 Ray Actor 并加载模型(约 12-20s)。后续调用复用 warm actor。 +- **数据量与加速比**:数据量越大,Ray 通信开销占比越低,加速比越接近线性。推荐 ≥1024 行做 benchmark。 +- **Storage 分层**:外层 pipeline 使用 `FileStorage`(标准 DataFlow)。`InMemoryStorage` 仅在 Ray actor 内部的 `_OpRunner` 中使用——对调用方完全透明。 +- **仅支持行独立算子**:需要跨行全局状态的算子(如基于完整相似度矩阵的语义去重)**不应**使用此 wrapper。 diff --git a/test/rayorch/README.md b/test/rayorch/README.md new file mode 100644 index 00000000..22a8bd0d --- /dev/null +++ b/test/rayorch/README.md @@ -0,0 +1,150 @@ +# RayOrch Acceleration for DataFlow + +`RayAcceleratedOperator` provides transparent data-parallel acceleration for +DataFlow's row-independent (map-style) operators. Wrap any existing operator +to distribute inference across multiple GPUs — no changes to the operator +itself or the pipeline's `FileStorage` are required. + +## Minimal Usage Example + +```python +from dataflow.rayorch import RayAcceleratedOperator +from dataflow.operators.text_sft.eval.superfiltering_sample_evaluator import ( + SuperfilteringSampleEvaluator, +) +from dataflow.utils.storage import FileStorage + +# 1. Wrap the operator — replicas × GPUs +scorer = RayAcceleratedOperator( + SuperfilteringSampleEvaluator, # any DataFlow OperatorABC + replicas=4, # 4 parallel actors + num_gpus_per_replica=1.0, # 1 GPU each +).op_cls_init(device="cuda", max_length=512) # original __init__ args + +# 2. Use it exactly like the original operator +storage = FileStorage( + first_entry_file_name="data/input.jsonl", + cache_path="./cache", + file_name_prefix="step", + cache_type="jsonl", +) +scorer.run( + storage=storage.step(), + input_instruction_key="instruction", + input_output_key="output", +) + +# 3. Clean up when done (see shutdown notes below) +scorer.shutdown() +``` + +Actors are created **lazily** on the first `run()` call, so pipeline +compilation does not trigger model loading. + +### About `shutdown()` + +| Scenario | Must call? | +|----------|-----------| +| Single operator / script exits normally | **Optional**. Ray cleans up all actors on process exit. | +| Multiple `RayAcceleratedOperator` stages with GPUs | **Required**. Idle actors from earlier stages still hold GPU resource reservations — later stages will **hang indefinitely** waiting for resources. | +| `num_gpus_per_replica=0` (CPU-only) | **Optional**. CPU resources are abundant so no hang, but actors still consume memory (loaded model weights, etc.). | + +> **Note**: When using `PipelineABC.compile()`, `_compiled_forward` **automatically** calls `shutdown()` after each stage completes — no manual cleanup is needed. + +## Prerequisites + +```bash +# Pick one: +# Re-install DataFlow (includes rayorch as a dependency) +pip install -e . + +# Or install RayOrch separately +pip install rayorch==0.0.1 +``` + +## Test Files + +| File | Description | +|------|-------------| +| `test_compile_cpu.py` | **CI suite** (pytest, CPU): all 3 Pipeline types × compile × multi-op chains × ordering/content checks | +| `test_accelerated_op.py` | Dummy sleep operator — validates correctness & scheduling (CPU only) | +| `test_pipeline_compile.py` | Real operator (Superfiltering) compile-path integration test (GPU required) | +| `test_real_operators.py` | Real operator benchmark with argparse, using `FileStorage` externally | + +## Quick Start + +```bash +cd /path/to/DataFlow +export HF_ENDPOINT=https://hf-mirror.com + +# Dummy operator test (CPU, ~30s) +python test/rayorch/test_accelerated_op.py + +# Superfiltering benchmark — 4096 rows, 2/4/8 GPU parallel +python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 --replicas 2 4 8 + +# Deita Quality — 256 rows, 2/4 GPU parallel +python test/rayorch/test_real_operators.py --op deita --rows 256 --replicas 2 4 + +# All operators +python test/rayorch/test_real_operators.py --op all --rows 1024 + +# Fractional GPU allocation (e.g. 0.5 GPU per replica) +python test/rayorch/test_real_operators.py --op superfiltering --rows 2048 --replicas 4 8 16 --gpus-per-replica 0.5 + +# Save results to a custom path +python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 --save-json my_results.json +``` + +## CLI Arguments + +``` +--op {superfiltering,deita,all} Operator to benchmark (default: superfiltering) +--rows N Number of Alpaca rows (default: 4096) +--replicas R [R ...] Parallel replica counts (default: 2 4 8) +--gpus-per-replica G GPUs per replica (default: 1.0) +--save-json PATH JSON output path (default: bench_results.json) +``` + +A serial baseline (1 GPU) is always included automatically. + +## Output + +For each operator the benchmark reports: +1. **Serial** — single-GPU wall time (baseline) +2. **Parallel xN** — N-replica wall time, split into cold start and warm compute +3. **Correctness** — row-by-row comparison against serial (rtol=1e-3) +4. **Speedup** — warm compute time relative to serial + +Results are also saved as JSON (`bench_results.json`). + +## Benchmark Results + +> Measured on 8× NVIDIA A800-SXM4-80GB. + +### SuperfilteringSampleEvaluator (gpt2, 124 M) + +Dataset: `tatsu-lab/alpaca`, 4096 rows, outer storage = `FileStorage` + +| Config | Warm Time | Speedup | Cold Start | Correct | +|--------|-----------|---------|------------|---------| +| Serial (1 GPU) | 66.41s | 1.0x | — | baseline | +| 2 GPUs | 36.48s | 1.8x | 12.1s | ✓ | +| 4 GPUs | 18.45s | 3.6x | 13.0s | ✓ | +| 8 GPUs | 10.80s | 6.1x | 18.4s | ✓ | + +Superfiltering is based on GPT-2 (124 M) which runs at ~60 it/s per row. +Because per-row compute is light, Ray scheduling/serialization overhead is +relatively significant. Heavier models (e.g. Deita 7 B) will show speedup +closer to linear. + +### DeitaQualitySampleEvaluator (Llama-based, 7 B) + +_Pending: requires `hkust-nlp/deita-quality-scorer` model (~14 GB) to be cached locally._ + +## Design Notes + +- **Cold start**: The first `run()` call creates Ray actors and loads models (~12-20 s). Subsequent calls reuse warm actors. +- **Data volume vs speedup**: Larger datasets amortize Ray communication overhead, pushing speedup closer to linear. Recommend ≥1024 rows for benchmarking. +- **Storage separation**: The outer pipeline uses `FileStorage` (standard DataFlow). `InMemoryStorage` is only used internally by `_OpRunner` inside Ray actors — fully transparent to the caller. +- **Row-independent only**: Operators that require cross-row global state (e.g. semantic dedup with a full similarity matrix) should **not** use this wrapper. diff --git a/test/rayorch/test_accelerated_op.py b/test/rayorch/test_accelerated_op.py new file mode 100644 index 00000000..8f52d08f --- /dev/null +++ b/test/rayorch/test_accelerated_op.py @@ -0,0 +1,159 @@ +""" +Serial vs Parallel timing comparison for RayAcceleratedOperator. + +A dummy operator that sleeps per row simulates per-sample model inference. +Pipeline-style test following DataFlow conventions. + +Usage: + python test/rayorch/test_accelerated_op.py +""" +from __future__ import annotations + +import time + +import pandas as pd + +from dataflow.core.operator import OperatorABC +from dataflow.rayorch import RayAcceleratedOperator +from dataflow.rayorch.memory_storage import InMemoryStorage +from dataflow.utils.storage import DataFlowStorage + + +# --------------------------------------------------------------------------- +# Dummy operator +# --------------------------------------------------------------------------- +class DummySleepScorer(OperatorABC): + def __init__(self, sleep_per_row: float = 0.1): + super().__init__() + self.sleep_per_row = sleep_per_row + + def run( + self, + storage: DataFlowStorage, + input_key: str = "text", + output_key: str = "score", + ) -> None: + df = storage.read("dataframe") + scores = [] + for text in df[input_key]: + time.sleep(self.sleep_per_row) + scores.append(len(str(text))) + df[output_key] = scores + storage.write(df) + + +# --------------------------------------------------------------------------- +# Pipeline: serial baseline +# --------------------------------------------------------------------------- +class DummySleepSerialPipeline: + def __init__(self, n_rows: int = 40, sleep_per_row: float = 0.1): + self.storage = InMemoryStorage( + pd.DataFrame({"text": [f"sample text {i}" for i in range(n_rows)]}) + ) + self.scorer = DummySleepScorer(sleep_per_row=sleep_per_row) + + def forward(self): + self.scorer.run( + storage=self.storage.step(), + input_key="text", + output_key="score", + ) + + +# --------------------------------------------------------------------------- +# Pipeline: Ray-accelerated parallel +# --------------------------------------------------------------------------- +class DummySleepRayPipeline: + def __init__(self, n_rows: int = 40, sleep_per_row: float = 0.1, replicas: int = 4): + self.storage = InMemoryStorage( + pd.DataFrame({"text": [f"sample text {i}" for i in range(n_rows)]}) + ) + self.scorer = RayAcceleratedOperator( + DummySleepScorer, + replicas=replicas, + ).op_cls_init(sleep_per_row=sleep_per_row) + + def forward(self): + self.scorer.run( + storage=self.storage.step(), + input_key="text", + output_key="score", + ) + + def shutdown(self): + self.scorer.shutdown() + + +# --------------------------------------------------------------------------- +# Verification +# --------------------------------------------------------------------------- +def verify_result(storage: InMemoryStorage, n_rows: int, label: str) -> None: + result = storage.result + assert len(result) == n_rows, f"[{label}] expected {n_rows} rows, got {len(result)}" + assert "score" in result.columns, f"[{label}] missing 'score' column" + expected = [len(f"sample text {i}") for i in range(n_rows)] + actual = result["score"].tolist() + assert actual == expected, ( + f"[{label}] score mismatch (row order may be wrong)\n" + f" expected[:5] = {expected[:5]}\n" + f" actual[:5] = {actual[:5]}" + ) + print(f" ✓ correctness check passed ({n_rows} rows, scores match)") + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- +if __name__ == "__main__": + import ray + + N_ROWS = 40 + SLEEP_PER_ROW = 0.1 + REPLICAS = 4 + + ray.init(ignore_reinit_error=True) + + # --- Serial --- + print(f"\n{'=' * 60}") + print(f"[Serial] {N_ROWS} rows, sleep={SLEEP_PER_ROW}s/row") + print(f"{'=' * 60}") + serial_pipe = DummySleepSerialPipeline(N_ROWS, SLEEP_PER_ROW) + t0 = time.perf_counter() + serial_pipe.forward() + serial_time = time.perf_counter() - t0 + verify_result(serial_pipe.storage, N_ROWS, "Serial") + print(f" Time: {serial_time:.2f}s (expected ≈{N_ROWS * SLEEP_PER_ROW:.2f}s)") + + # --- Parallel (cold) --- + print(f"\n{'=' * 60}") + print(f"[Parallel] {N_ROWS} rows, sleep={SLEEP_PER_ROW}s/row, replicas={REPLICAS}") + print(f"{'=' * 60}") + ray_pipe = DummySleepRayPipeline(N_ROWS, SLEEP_PER_ROW, REPLICAS) + t_cold = time.perf_counter() + ray_pipe.forward() + cold_time = time.perf_counter() - t_cold + print(f" Cold run (includes actor init): {cold_time:.2f}s") + + # --- Parallel (warm) --- + ray_pipe.storage = InMemoryStorage( + pd.DataFrame({"text": [f"sample text {i}" for i in range(N_ROWS)]}) + ) + t0 = time.perf_counter() + ray_pipe.forward() + parallel_time = time.perf_counter() - t0 + verify_result(ray_pipe.storage, N_ROWS, "Parallel-warm") + ideal = N_ROWS * SLEEP_PER_ROW / REPLICAS + print(f" Warm compute: {parallel_time:.2f}s (ideal ≈{ideal:.2f}s)") + + ray_pipe.shutdown() + + # --- Summary --- + print(f"\n{'=' * 60}") + print("[Summary] (parallel = warm compute only)") + print(f" Serial: {serial_time:.2f}s") + print(f" Parallel: {parallel_time:.2f}s") + speedup = serial_time / parallel_time if parallel_time > 0 else float("inf") + print(f" Speedup: {speedup:.1f}x (ideal {REPLICAS}x)") + print(f"{'=' * 60}") + + ray.shutdown() diff --git a/test/rayorch/test_compile_cpu.py b/test/rayorch/test_compile_cpu.py new file mode 100644 index 00000000..c043aa68 --- /dev/null +++ b/test/rayorch/test_compile_cpu.py @@ -0,0 +1,584 @@ +"""CPU-only pytest suite: RayAcceleratedOperator with PipelineABC.compile(). + +Verifies that RayAcceleratedOperator works correctly through compile() for +all three pipeline types, with multi-operator chains, deterministic content, +and preserved row ordering. + +Run: + pytest test/rayorch/test_compile_cpu.py -v -m cpu +""" +from __future__ import annotations + +import os + +import numpy as np +import pandas as pd +import pytest + +from dataflow.pipeline.Pipeline import ( + BatchedPipelineABC, + PipelineABC, + StreamBatchedPipelineABC, +) +from dataflow.rayorch import RayAcceleratedOperator +from dataflow.rayorch._test_ops import DummyDoubleOp, DummyIncrementOp + +N_ROWS = 100 +REPLICAS = 4 + + +# ===================================================================== +# Pipeline definitions for each ABC +# ===================================================================== +def _make_storage(cls, input_file: str, cache_path: str): + """Instantiate the correct FileStorage subclass.""" + return cls( + first_entry_file_name=input_file, + cache_path=cache_path, + file_name_prefix="step", + cache_type="jsonl", + ) + + +class _SerialPipeline(PipelineABC): + def __init__(self, input_file, cache_path, storage_cls): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = DummyDoubleOp() + self.incrementer = DummyIncrementOp() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _RayPipeline(PipelineABC): + def __init__(self, input_file, cache_path, storage_cls, replicas=REPLICAS): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = RayAcceleratedOperator( + DummyDoubleOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + self.incrementer = DummyIncrementOp() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _SerialBatched(BatchedPipelineABC): + def __init__(self, input_file, cache_path, storage_cls): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = DummyDoubleOp() + self.incrementer = DummyIncrementOp() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _RayBatched(BatchedPipelineABC): + def __init__(self, input_file, cache_path, storage_cls, replicas=REPLICAS): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = RayAcceleratedOperator( + DummyDoubleOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + self.incrementer = DummyIncrementOp() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _SerialStreamBatched(StreamBatchedPipelineABC): + def __init__(self, input_file, cache_path, storage_cls): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = DummyDoubleOp() + self.incrementer = DummyIncrementOp() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _RayStreamBatched(StreamBatchedPipelineABC): + def __init__(self, input_file, cache_path, storage_cls, replicas=REPLICAS): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = RayAcceleratedOperator( + DummyDoubleOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + self.incrementer = DummyIncrementOp() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +# ===================================================================== +# Fixtures +# ===================================================================== +@pytest.fixture(scope="module") +def ray_env(): + import ray + + ray.init(ignore_reinit_error=True, num_cpus=8) + yield + ray.shutdown() + + +@pytest.fixture() +def test_data(tmp_path): + """Create a deterministic input JSONL with a 'value' column [0..N_ROWS).""" + df = pd.DataFrame({"value": list(range(N_ROWS))}) + input_file = str(tmp_path / "input.jsonl") + df.to_json(input_file, orient="records", lines=True, force_ascii=False) + return input_file, tmp_path + + +# ===================================================================== +# Helpers +# ===================================================================== +def _read_final_output(cache_path: str) -> pd.DataFrame: + """Read the step-2 output produced by a two-operator pipeline.""" + return pd.read_json( + os.path.join(cache_path, "step_step2.jsonl"), lines=True, + ) + + +def _shutdown_ray_actors(pipe): + for op_node in getattr(pipe, "op_nodes_list", []): + if hasattr(op_node.op_obj, "shutdown"): + op_node.op_obj.shutdown() + + +def _assert_content(df: pd.DataFrame): + """Verify deterministic content: doubled == value*2, incremented == doubled+1.""" + np.testing.assert_array_equal(df["doubled"].values, df["value"].values * 2) + np.testing.assert_array_equal(df["incremented"].values, df["doubled"].values + 1) + + +def _assert_ordering(serial_df: pd.DataFrame, ray_df: pd.DataFrame): + """Verify row ordering: Ray output must match serial output exactly.""" + np.testing.assert_array_equal(ray_df["value"].values, serial_df["value"].values) + np.testing.assert_array_equal(ray_df["doubled"].values, serial_df["doubled"].values) + np.testing.assert_array_equal( + ray_df["incremented"].values, serial_df["incremented"].values, + ) + + +# ===================================================================== +# Tests — PipelineABC +# ===================================================================== +@pytest.mark.cpu +def test_pipeline_abc(ray_env, test_data): + """PipelineABC: compile → forward with Ray(DummyDouble) → DummyIncrement.""" + from dataflow.utils.storage import FileStorage + + input_file, tmp_path = test_data + + serial_cache = str(tmp_path / "pipe_serial") + os.makedirs(serial_cache, exist_ok=True) + serial_pipe = _SerialPipeline(input_file, serial_cache, FileStorage) + serial_pipe.compile() + serial_pipe.forward() + serial_df = _read_final_output(serial_cache) + + ray_cache = str(tmp_path / "pipe_ray") + os.makedirs(ray_cache, exist_ok=True) + ray_pipe = _RayPipeline(input_file, ray_cache, FileStorage) + ray_pipe.compile() + ray_pipe.forward() + ray_df = _read_final_output(ray_cache) + + _assert_content(ray_df) + _assert_ordering(serial_df, ray_df) + _shutdown_ray_actors(ray_pipe) + + +# ===================================================================== +# Tests — BatchedPipelineABC (no batching) +# ===================================================================== +@pytest.mark.cpu +def test_batched_pipeline_abc(ray_env, test_data): + """BatchedPipelineABC: compile → forward (batch_size=None).""" + from dataflow.utils.storage import BatchedFileStorage + + input_file, tmp_path = test_data + + serial_cache = str(tmp_path / "batched_serial") + os.makedirs(serial_cache, exist_ok=True) + serial_pipe = _SerialBatched(input_file, serial_cache, BatchedFileStorage) + serial_pipe.compile() + serial_pipe.forward(resume_from_last=False) + serial_df = _read_final_output(serial_cache) + + ray_cache = str(tmp_path / "batched_ray") + os.makedirs(ray_cache, exist_ok=True) + ray_pipe = _RayBatched(input_file, ray_cache, BatchedFileStorage) + ray_pipe.compile() + ray_pipe.forward(resume_from_last=False) + ray_df = _read_final_output(ray_cache) + + _assert_content(ray_df) + _assert_ordering(serial_df, ray_df) + _shutdown_ray_actors(ray_pipe) + + +# ===================================================================== +# Tests — BatchedPipelineABC WITH batch_size +# ===================================================================== +@pytest.mark.cpu +def test_batched_pipeline_abc_with_batch_size(ray_env, test_data): + """BatchedPipelineABC: compile → forward(batch_size=30). + + RayAcceleratedOperator is called multiple times (once per batch), + and the output must still be content-correct and order-preserving. + """ + from dataflow.utils.storage import BatchedFileStorage + + input_file, tmp_path = test_data + batch_size = 30 + + serial_cache = str(tmp_path / "batched_bs_serial") + os.makedirs(serial_cache, exist_ok=True) + serial_pipe = _SerialBatched(input_file, serial_cache, BatchedFileStorage) + serial_pipe.compile() + serial_pipe.forward(batch_size=batch_size, resume_from_last=False) + serial_df = _read_final_output(serial_cache) + + ray_cache = str(tmp_path / "batched_bs_ray") + os.makedirs(ray_cache, exist_ok=True) + ray_pipe = _RayBatched(input_file, ray_cache, BatchedFileStorage) + ray_pipe.compile() + ray_pipe.forward(batch_size=batch_size, resume_from_last=False) + ray_df = _read_final_output(ray_cache) + + assert len(ray_df) == N_ROWS, f"Expected {N_ROWS} rows, got {len(ray_df)}" + _assert_content(ray_df) + _assert_ordering(serial_df, ray_df) + _shutdown_ray_actors(ray_pipe) + + +# ===================================================================== +# Tests — StreamBatchedPipelineABC (no batching) +# ===================================================================== +@pytest.mark.cpu +def test_stream_batched_pipeline_abc(ray_env, test_data): + """StreamBatchedPipelineABC: compile → forward (batch_size=None).""" + from dataflow.utils.storage import StreamBatchedFileStorage + + input_file, tmp_path = test_data + + serial_cache = str(tmp_path / "stream_serial") + os.makedirs(serial_cache, exist_ok=True) + serial_pipe = _SerialStreamBatched(input_file, serial_cache, StreamBatchedFileStorage) + serial_pipe.compile() + serial_pipe.forward(resume_from_last=False) + serial_df = _read_final_output(serial_cache) + + ray_cache = str(tmp_path / "stream_ray") + os.makedirs(ray_cache, exist_ok=True) + ray_pipe = _RayStreamBatched(input_file, ray_cache, StreamBatchedFileStorage) + ray_pipe.compile() + ray_pipe.forward(resume_from_last=False) + ray_df = _read_final_output(ray_cache) + + _assert_content(ray_df) + _assert_ordering(serial_df, ray_df) + _shutdown_ray_actors(ray_pipe) + + +# ===================================================================== +# Tests — ordering with more replicas +# ===================================================================== +@pytest.mark.cpu +def test_ordering_many_replicas(ray_env, test_data): + """Verify SHARD_CONTIGUOUS preserves order even with many replicas.""" + from dataflow.utils.storage import FileStorage + + input_file, tmp_path = test_data + + for replicas in (2, 3, 7): + cache = str(tmp_path / f"order_{replicas}r") + os.makedirs(cache, exist_ok=True) + pipe = _RayPipeline(input_file, cache, FileStorage, replicas=replicas) + pipe.compile() + pipe.forward() + df = _read_final_output(cache) + + expected_values = list(range(N_ROWS)) + assert list(df["value"]) == expected_values, ( + f"Ordering broken with {replicas} replicas" + ) + _assert_content(df) + _shutdown_ray_actors(pipe) + + +# ===================================================================== +# Pipeline: both operators are RayAcceleratedOperator (for auto-shutdown test) +# ===================================================================== +class _AllRayPipeline(PipelineABC): + """Both stages are RayAcceleratedOperators.""" + + def __init__(self, input_file, cache_path, storage_cls, replicas=REPLICAS): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = RayAcceleratedOperator( + DummyDoubleOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + self.incrementer = RayAcceleratedOperator( + DummyIncrementOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _AllRayBatched(BatchedPipelineABC): + """Both stages are RayAcceleratedOperators (batched).""" + + def __init__(self, input_file, cache_path, storage_cls, replicas=REPLICAS): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = RayAcceleratedOperator( + DummyDoubleOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + self.incrementer = RayAcceleratedOperator( + DummyIncrementOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +class _AllRayStreamBatched(StreamBatchedPipelineABC): + """Both stages are RayAcceleratedOperators (stream batched).""" + + def __init__(self, input_file, cache_path, storage_cls, replicas=REPLICAS): + super().__init__() + self.storage = _make_storage(storage_cls, input_file, cache_path) + self.doubler = RayAcceleratedOperator( + DummyDoubleOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + self.incrementer = RayAcceleratedOperator( + DummyIncrementOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +# ===================================================================== +# Tests — serial-then-ray ordering +# ===================================================================== +class _SerialThenRayPipeline(PipelineABC): + """Chain: serial DummyDouble → Ray(DummyIncrement).""" + + def __init__(self, input_file, cache_path, replicas=REPLICAS): + super().__init__() + from dataflow.utils.storage import FileStorage + + self.storage = FileStorage(input_file, cache_path, "step", "jsonl") + self.doubler = DummyDoubleOp() + self.incrementer = RayAcceleratedOperator( + DummyIncrementOp, replicas=replicas, num_gpus_per_replica=0.0, + ).op_cls_init() + + def forward(self): + self.doubler.run( + storage=self.storage.step(), + input_key="value", + output_key="doubled", + ) + self.incrementer.run( + storage=self.storage.step(), + input_key="doubled", + output_key="incremented", + ) + + +@pytest.mark.cpu +def test_serial_then_ray(ray_env, test_data): + """DummyDouble(serial) → DummyIncrement(Ray): verify content and order.""" + from dataflow.utils.storage import FileStorage + + input_file, tmp_path = test_data + + serial_cache = str(tmp_path / "sr_serial") + os.makedirs(serial_cache, exist_ok=True) + serial_pipe = _SerialPipeline(input_file, serial_cache, FileStorage) + serial_pipe.compile() + serial_pipe.forward() + serial_df = _read_final_output(serial_cache) + + ray_cache = str(tmp_path / "sr_ray") + os.makedirs(ray_cache, exist_ok=True) + ray_pipe = _SerialThenRayPipeline(input_file, ray_cache) + ray_pipe.compile() + ray_pipe.forward() + ray_df = _read_final_output(ray_cache) + + _assert_content(ray_df) + _assert_ordering(serial_df, ray_df) + _shutdown_ray_actors(ray_pipe) + + +# ===================================================================== +# Tests — auto-shutdown of RayAcceleratedOperator in _compiled_forward +# ===================================================================== +def _get_ray_op_nodes(pipe) -> list: + """Return OperatorNode entries whose op_obj has a shutdown method + (i.e. is a RayAcceleratedOperator).""" + return [ + n for n in getattr(pipe, "op_nodes_list", []) + if hasattr(getattr(n, "op_obj", None), "shutdown") + ] + + +@pytest.mark.cpu +def test_auto_shutdown_pipeline_abc(ray_env, test_data): + """PipelineABC with two RayAcceleratedOperators: verify actors are + automatically shut down after each stage and output is still correct.""" + from dataflow.utils.storage import FileStorage + + input_file, tmp_path = test_data + + cache = str(tmp_path / "auto_sd_pipe") + os.makedirs(cache, exist_ok=True) + pipe = _AllRayPipeline(input_file, cache, FileStorage) + pipe.compile() + pipe.forward() + + df = _read_final_output(cache) + _assert_content(df) + assert len(df) == N_ROWS + + for node in _get_ray_op_nodes(pipe): + assert node.op_obj._module is None, ( + f"{node.op_name} was not auto-shutdown after compiled forward" + ) + + +@pytest.mark.cpu +def test_auto_shutdown_batched(ray_env, test_data): + """BatchedPipelineABC with two Ray stages: auto-shutdown after batches.""" + from dataflow.utils.storage import BatchedFileStorage + + input_file, tmp_path = test_data + + cache = str(tmp_path / "auto_sd_batched") + os.makedirs(cache, exist_ok=True) + pipe = _AllRayBatched(input_file, cache, BatchedFileStorage) + pipe.compile() + pipe.forward(batch_size=30, resume_from_last=False) + + df = _read_final_output(cache) + _assert_content(df) + assert len(df) == N_ROWS + + for node in _get_ray_op_nodes(pipe): + assert node.op_obj._module is None, ( + f"{node.op_name} was not auto-shutdown after compiled forward" + ) + + +@pytest.mark.cpu +def test_auto_shutdown_stream_batched(ray_env, test_data): + """StreamBatchedPipelineABC with two Ray stages: auto-shutdown after stream batches.""" + from dataflow.utils.storage import StreamBatchedFileStorage + + input_file, tmp_path = test_data + + cache = str(tmp_path / "auto_sd_stream") + os.makedirs(cache, exist_ok=True) + pipe = _AllRayStreamBatched(input_file, cache, StreamBatchedFileStorage) + pipe.compile() + pipe.forward(resume_from_last=False) + + df = _read_final_output(cache) + _assert_content(df) + assert len(df) == N_ROWS + + for node in _get_ray_op_nodes(pipe): + assert node.op_obj._module is None, ( + f"{node.op_name} was not auto-shutdown after compiled forward" + ) diff --git a/test/rayorch/test_pipeline_compile.py b/test/rayorch/test_pipeline_compile.py new file mode 100644 index 00000000..341d1d4f --- /dev/null +++ b/test/rayorch/test_pipeline_compile.py @@ -0,0 +1,232 @@ +""" +Test RayAcceleratedOperator under PipelineABC.compile() workflow. + +Verifies that: + 1. compile() correctly discovers and wraps the operator via AutoOP + 2. Key validation passes (input_* kwargs match dataset columns) + 3. _compiled_forward() triggers lazy actor init and produces correct results + 4. Results match a serial (non-Ray) compiled pipeline + +Uses SuperfilteringSampleEvaluator + Alpaca dataset + FileStorage. + +Usage: + python test/rayorch/test_pipeline_compile.py + python test/rayorch/test_pipeline_compile.py --rows 256 --replicas 4 +""" +from __future__ import annotations + +import argparse +import os +import shutil +import tempfile +import time + +import numpy as np +import pandas as pd + +os.environ.setdefault("HF_ENDPOINT", "https://hf-mirror.com") + +from dataflow.pipeline.Pipeline import PipelineABC +from dataflow.rayorch import RayAcceleratedOperator +from dataflow.utils.storage import FileStorage + + +# =================================================================== +# Pipelines (extend PipelineABC — participate in compile()) +# =================================================================== +class SerialCompiledPipeline(PipelineABC): + """Standard single-GPU pipeline that goes through compile().""" + + def __init__(self, input_file: str, cache_path: str, op_cls, init_kwargs: dict): + super().__init__() + self.storage = FileStorage( + first_entry_file_name=input_file, + cache_path=cache_path, + file_name_prefix="bench_step", + cache_type="jsonl", + ) + self.scorer = op_cls(**init_kwargs) + + def forward(self, **run_kwargs): + self.scorer.run( + storage=self.storage.step(), + input_instruction_key="instruction", + input_output_key="output", + ) + + +class RayCompiledPipeline(PipelineABC): + """Ray-accelerated pipeline that goes through compile().""" + + def __init__( + self, + input_file: str, + cache_path: str, + op_cls, + init_kwargs: dict, + replicas: int = 4, + num_gpus_per_replica: float = 1.0, + ): + super().__init__() + self.storage = FileStorage( + first_entry_file_name=input_file, + cache_path=cache_path, + file_name_prefix="bench_step", + cache_type="jsonl", + ) + self.scorer = RayAcceleratedOperator( + op_cls, + replicas=replicas, + num_gpus_per_replica=num_gpus_per_replica, + ).op_cls_init(**init_kwargs) + + def forward(self, **run_kwargs): + self.scorer.run( + storage=self.storage.step(), + input_instruction_key="instruction", + input_output_key="output", + ) + + def shutdown_actors(self): + """Shut down Ray actors after compiled pipeline is done.""" + for op_node in self.op_nodes_list: + if hasattr(op_node.op_obj, "shutdown"): + op_node.op_obj.shutdown() + + +# =================================================================== +# Helpers +# =================================================================== +def load_alpaca_subset(n: int) -> pd.DataFrame: + from datasets import load_dataset + + ds = load_dataset("tatsu-lab/alpaca", split=f"train[:{n}]") + df = ds.to_pandas() + df["input"] = df["input"].fillna("") + return df + + +def save_to_jsonl(df: pd.DataFrame, path: str) -> str: + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + df.to_json(path, orient="records", lines=True, force_ascii=False) + return path + + +def read_result(cache_path: str, output_key: str) -> list: + result_file = os.path.join(cache_path, "bench_step_step1.jsonl") + result_df = pd.read_json(result_file, lines=True) + return result_df[output_key].tolist() + + +# =================================================================== +# Main +# =================================================================== +def parse_args(): + p = argparse.ArgumentParser(description="Test RayAcceleratedOperator with pipeline.compile()") + p.add_argument("--rows", type=int, default=128, help="Alpaca rows (default: 128)") + p.add_argument("--replicas", type=int, default=4, help="Ray replicas (default: 4)") + return p.parse_args() + + +if __name__ == "__main__": + import ray + from dataflow.operators.text_sft.eval.superfiltering_sample_evaluator import ( + SuperfilteringSampleEvaluator, + ) + + args = parse_args() + N_ROWS = args.rows + REPLICAS = args.replicas + OUTPUT_KEY = "SuperfilteringScore" + INIT_KWARGS = {"device": "cuda", "max_length": 512} + + ray.init(ignore_reinit_error=True) + + df = load_alpaca_subset(N_ROWS) + print(f"Loaded {len(df)} rows from tatsu-lab/alpaca") + + tmp_root = tempfile.mkdtemp(prefix="rayorch_compile_test_") + input_file = save_to_jsonl(df, os.path.join(tmp_root, "input.jsonl")) + + # --------------------------------------------------------------- + # 1. Serial compiled pipeline + # --------------------------------------------------------------- + print(f"\n{'=' * 60}") + print(f"[Serial] compile() + forward() ({N_ROWS} rows)") + print(f"{'=' * 60}") + + serial_cache = os.path.join(tmp_root, "serial") + os.makedirs(serial_cache, exist_ok=True) + + serial_pipe = SerialCompiledPipeline(input_file, serial_cache, SuperfilteringSampleEvaluator, INIT_KWARGS) + serial_pipe.compile() + print(f" compile() OK — {len(serial_pipe.op_runtimes)} op(s) registered") + + t0 = time.perf_counter() + serial_pipe.forward() + serial_time = time.perf_counter() - t0 + + serial_scores = read_result(serial_cache, OUTPUT_KEY) + print(f" forward() OK — {serial_time:.2f}s") + print(f" Scores[:5]: {[round(s, 4) for s in serial_scores[:5]]}") + + # --------------------------------------------------------------- + # 2. Ray compiled pipeline + # --------------------------------------------------------------- + print(f"\n{'=' * 60}") + print(f"[Ray x{REPLICAS}] compile() + forward() ({N_ROWS} rows)") + print(f"{'=' * 60}") + + ray_cache = os.path.join(tmp_root, "ray") + os.makedirs(ray_cache, exist_ok=True) + + ray_pipe = RayCompiledPipeline( + input_file, ray_cache, + SuperfilteringSampleEvaluator, INIT_KWARGS, + replicas=REPLICAS, num_gpus_per_replica=1.0, + ) + ray_pipe.compile() + print(f" compile() OK — {len(ray_pipe.op_runtimes)} op(s) registered") + + t0 = time.perf_counter() + ray_pipe.forward() + ray_time = time.perf_counter() - t0 + + ray_scores = read_result(ray_cache, OUTPUT_KEY) + print(f" forward() OK — {ray_time:.2f}s (includes cold start)") + print(f" Scores[:5]: {[round(s, 4) for s in ray_scores[:5]]}") + + ray_pipe.shutdown_actors() + + # --------------------------------------------------------------- + # 3. Correctness check + # --------------------------------------------------------------- + print(f"\n{'=' * 60}") + print("[Correctness]") + print(f"{'=' * 60}") + + assert len(ray_scores) == len(serial_scores), ( + f"Row count mismatch: serial={len(serial_scores)}, ray={len(ray_scores)}" + ) + mismatches = sum( + 1 for s, p in zip(serial_scores, ray_scores) + if not (s is None and p is None) + and (s is None or p is None or not np.isclose(s, p, rtol=1e-3, equal_nan=True)) + ) + if mismatches: + print(f" ⚠ {mismatches}/{len(serial_scores)} mismatches") + else: + print(f" ✓ All {len(serial_scores)} scores match (serial vs Ray x{REPLICAS})") + + print(f"\n Serial: {serial_time:.2f}s") + print(f" Ray x{REPLICAS}: {ray_time:.2f}s") + if ray_time > 0: + print(f" Speedup: {serial_time / ray_time:.1f}x (includes cold start)") + + # --------------------------------------------------------------- + # Cleanup + # --------------------------------------------------------------- + ray.shutdown() + shutil.rmtree(tmp_root, ignore_errors=True) + print(f"\nCleaned up {tmp_root}") + print("PASSED" if mismatches == 0 else "FAILED") diff --git a/test/rayorch/test_real_operators.py b/test/rayorch/test_real_operators.py new file mode 100644 index 00000000..bebb7955 --- /dev/null +++ b/test/rayorch/test_real_operators.py @@ -0,0 +1,374 @@ +""" +Real-operator benchmark for RayAcceleratedOperator on the Alpaca dataset. + +Outer pipeline uses FileStorage (read file → process → write file), +matching real DataFlow deployment. InMemoryStorage is only used +internally by _OpRunner inside Ray actors — fully transparent here. + +Supports SuperfilteringSampleEvaluator and DeitaQualitySampleEvaluator. +Compares serial (1 GPU) vs Ray-parallel (multi-GPU) execution, verifying: + 1. Correctness — parallel results match serial baseline + 2. Speedup — approaches linear scaling as data volume grows + +Usage: + python test/rayorch/test_real_operators.py --help + python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 + python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 --replicas 2 4 8 + python test/rayorch/test_real_operators.py --op deita --rows 256 --replicas 2 4 + python test/rayorch/test_real_operators.py --op all --rows 1024 +""" +from __future__ import annotations + +import argparse +import json +import os +import shutil +import tempfile +import time +from pathlib import Path + +import numpy as np +import pandas as pd + +os.environ.setdefault("HF_ENDPOINT", "https://hf-mirror.com") + +from dataflow.rayorch import RayAcceleratedOperator +from dataflow.utils.storage import FileStorage + + +# =================================================================== +# Dataset helper +# =================================================================== +def load_alpaca_subset(n: int = 4096) -> pd.DataFrame: + from datasets import load_dataset + + ds = load_dataset("tatsu-lab/alpaca", split=f"train[:{n}]") + df = ds.to_pandas() + df["input"] = df["input"].fillna("") + return df + + +def save_to_jsonl(df: pd.DataFrame, path: str) -> str: + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + df.to_json(path, orient="records", lines=True, force_ascii=False) + return path + + +# =================================================================== +# Operator registry +# =================================================================== +OPERATORS: dict[str, dict] = {} + + +def _register_superfiltering(): + from dataflow.operators.text_sft.eval.superfiltering_sample_evaluator import ( + SuperfilteringSampleEvaluator, + ) + + OPERATORS["superfiltering"] = { + "cls": SuperfilteringSampleEvaluator, + "init_kwargs": {"device": "cuda", "max_length": 512}, + "run_kwargs": {"input_instruction_key": "instruction", "input_output_key": "output"}, + "output_key": "SuperfilteringScore", + "label": "SuperfilteringSampleEvaluator (gpt2, 124 M)", + } + + +def _register_deita(): + from dataflow.operators.text_sft.eval.deita_quality_sample_evaluator import ( + DeitaQualitySampleEvaluator, + ) + + OPERATORS["deita"] = { + "cls": DeitaQualitySampleEvaluator, + "init_kwargs": {"device": "cuda", "max_length": 512}, + "run_kwargs": {"input_instruction_key": "instruction", "input_output_key": "output"}, + "output_key": "DeitaQualityScore", + "label": "DeitaQualitySampleEvaluator (Llama-based, 7 B)", + } + + +_REGISTER_FNS = { + "superfiltering": _register_superfiltering, + "deita": _register_deita, +} + + +# =================================================================== +# Pipeline wrappers (DataFlow convention: __init__ + forward) +# Outer storage = FileStorage (real file I/O) +# Inner storage = InMemoryStorage (inside Ray actors, transparent) +# =================================================================== +class SerialPipeline: + """Single-GPU serial execution — standard DataFlow pattern.""" + + def __init__(self, input_file: str, cache_path: str, op_cls, init_kwargs: dict): + self.storage = FileStorage( + first_entry_file_name=input_file, + cache_path=cache_path, + file_name_prefix="bench_step", + cache_type="jsonl", + ) + self.scorer = op_cls(**init_kwargs) + + def forward(self, **run_kwargs): + self.scorer.run(storage=self.storage.step(), **run_kwargs) + + +class RayPipeline: + """Multi-GPU Ray-accelerated — drop-in replacement of the operator.""" + + def __init__( + self, + input_file: str, + cache_path: str, + op_cls, + init_kwargs: dict, + replicas: int = 8, + num_gpus_per_replica: float = 1.0, + ): + self.storage = FileStorage( + first_entry_file_name=input_file, + cache_path=cache_path, + file_name_prefix="bench_step", + cache_type="jsonl", + ) + self.scorer = RayAcceleratedOperator( + op_cls, + replicas=replicas, + num_gpus_per_replica=num_gpus_per_replica, + ).op_cls_init(**init_kwargs) + + def forward(self, **run_kwargs): + self.scorer.run(storage=self.storage.step(), **run_kwargs) + + def shutdown(self): + self.scorer.shutdown() + + +# =================================================================== +# Helpers +# =================================================================== +def _read_result(cache_path: str, output_key: str) -> list: + """Read the operator output from FileStorage step-1 cache.""" + result_file = os.path.join(cache_path, "bench_step_step1.jsonl") + result_df = pd.read_json(result_file, lines=True) + return result_df[output_key].tolist() + + +# =================================================================== +# Benchmark harness +# =================================================================== +def bench( + label: str, + op_cls, + init_kwargs: dict, + run_kwargs: dict, + input_file: str, + warmup_file: str, + output_key: str, + n_rows: int, + tmp_root: str, + replicas_list: list[int], + num_gpus_per_replica: float = 1.0, +) -> dict: + """Run serial + parallel benchmarks and return structured results.""" + print(f"\n{'#' * 70}") + print(f"# {label} ({n_rows} rows, FileStorage)") + print(f"{'#' * 70}") + + results: dict[int, dict] = {} + + for replicas in replicas_list: + tag = "Serial" if replicas == 1 else f"Parallel x{replicas}" + print(f"\n--- [{tag}] ---") + + if replicas == 1: + cache_dir = os.path.join(tmp_root, "serial") + os.makedirs(cache_dir, exist_ok=True) + + pipe = SerialPipeline(input_file, cache_dir, op_cls, init_kwargs) + t0 = time.perf_counter() + pipe.forward(**run_kwargs) + elapsed = time.perf_counter() - t0 + scores = _read_result(cache_dir, output_key) + results[replicas] = {"time": elapsed, "scores": scores, "cold": 0.0} + else: + cold_cache = os.path.join(tmp_root, f"ray_x{replicas}_cold") + warm_cache = os.path.join(tmp_root, f"ray_x{replicas}_warm") + os.makedirs(cold_cache, exist_ok=True) + os.makedirs(warm_cache, exist_ok=True) + + # Cold run — creates actors + loads model + pipe = RayPipeline( + warmup_file, cold_cache, op_cls, init_kwargs, + replicas=replicas, num_gpus_per_replica=num_gpus_per_replica, + ) + t_cold = time.perf_counter() + pipe.forward(**run_kwargs) + cold_time = time.perf_counter() - t_cold + print(f" Cold start: {cold_time:.1f}s") + + # Warm run — reuse actors, fresh FileStorage with full data + pipe.storage = FileStorage( + first_entry_file_name=input_file, + cache_path=warm_cache, + file_name_prefix="bench_step", + cache_type="jsonl", + ) + t0 = time.perf_counter() + pipe.forward(**run_kwargs) + elapsed = time.perf_counter() - t0 + scores = _read_result(warm_cache, output_key) + results[replicas] = {"time": elapsed, "scores": scores, "cold": cold_time} + pipe.shutdown() + + print(f" Time: {elapsed:.2f}s") + print(f" Scores[:5]: {[round(s, 4) if s is not None else None for s in scores[:5]]}") + + # --- correctness --- + serial_time = results[replicas_list[0]]["time"] + serial_scores = results[replicas_list[0]]["scores"] + + for r in replicas_list[1:]: + par_scores = results[r]["scores"] + assert len(par_scores) == len(serial_scores), "Row count mismatch" + mismatches = sum( + 1 for s, p in zip(serial_scores, par_scores) + if not (s is None and p is None) + and (s is None or p is None or not np.isclose(s, p, rtol=1e-3, equal_nan=True)) + ) + tag = f"serial vs x{r}" + print(f" {'⚠ ' + str(mismatches) + ' mismatches' if mismatches else '✓ scores match'} ({tag})") + + # --- speedup summary --- + print(f"\n {'Speedup summary':=^40}") + summary_rows = [] + for r in replicas_list: + t = results[r]["time"] + speedup = serial_time / t if t > 0 else 0 + tag = "serial" if r == 1 else f"x{r}" + cold = results[r]["cold"] + cold_str = f" (cold {cold:.1f}s)" if cold > 0 else "" + print(f" {tag:>10s}: {t:7.2f}s ({speedup:.1f}x){cold_str}") + match_ok = True + if r > 1: + par_scores = results[r]["scores"] + mismatch_cnt = sum( + 1 for s, p in zip(serial_scores, par_scores) + if not (s is None and p is None) + and (s is None or p is None or not np.isclose(s, p, rtol=1e-3, equal_nan=True)) + ) + match_ok = mismatch_cnt == 0 + summary_rows.append({ + "replicas": r, + "time_s": round(t, 2), + "speedup": round(speedup, 1), + "correct": match_ok, + }) + + return { + "label": label, + "rows": n_rows, + "serial_time_s": round(serial_time, 2), + "details": summary_rows, + } + + +# =================================================================== +# Argparse +# =================================================================== +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="RayAcceleratedOperator benchmark on Alpaca dataset (FileStorage)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 + python test/rayorch/test_real_operators.py --op superfiltering --rows 4096 --replicas 2 4 8 + python test/rayorch/test_real_operators.py --op deita --rows 256 --replicas 2 4 + python test/rayorch/test_real_operators.py --op all --rows 1024 + """, + ) + p.add_argument( + "--op", type=str, default="superfiltering", + choices=["superfiltering", "deita", "all"], + help="Operator to benchmark (default: superfiltering)", + ) + p.add_argument( + "--rows", type=int, default=4096, + help="Number of Alpaca rows to use (default: 4096)", + ) + p.add_argument( + "--replicas", type=int, nargs="+", default=[2, 4, 8], + help="Parallel replica counts to test (default: 2 4 8)", + ) + p.add_argument( + "--gpus-per-replica", type=float, default=1.0, + help="GPUs per replica (default: 1.0)", + ) + p.add_argument( + "--save-json", type=str, default=None, + help="Path to save JSON results (default: test/rayorch/bench_results.json)", + ) + return p.parse_args() + + +# =================================================================== +# main +# =================================================================== +if __name__ == "__main__": + import ray + + args = parse_args() + + ops_to_run = list(_REGISTER_FNS.keys()) if args.op == "all" else [args.op] + for name in ops_to_run: + _REGISTER_FNS[name]() + + ray.init(ignore_reinit_error=True) + + # Load data and write to temp JSONL files (input for FileStorage) + df = load_alpaca_subset(args.rows) + print(f"Loaded {len(df)} rows from tatsu-lab/alpaca") + print(f"Columns: {list(df.columns)}") + + tmp_root = tempfile.mkdtemp(prefix="rayorch_bench_") + input_file = save_to_jsonl(df, os.path.join(tmp_root, "alpaca_input.jsonl")) + warmup_file = save_to_jsonl(df.iloc[:2], os.path.join(tmp_root, "alpaca_warmup.jsonl")) + print(f"Temp dir: {tmp_root}") + + replicas_list = sorted(set([1] + args.replicas)) + all_results = [] + + for name in ops_to_run: + cfg = OPERATORS[name] + op_tmp = os.path.join(tmp_root, name) + os.makedirs(op_tmp, exist_ok=True) + + result = bench( + label=cfg["label"], + op_cls=cfg["cls"], + init_kwargs=cfg["init_kwargs"], + run_kwargs=cfg["run_kwargs"], + input_file=input_file, + warmup_file=warmup_file, + output_key=cfg["output_key"], + n_rows=len(df), + tmp_root=op_tmp, + replicas_list=replicas_list, + num_gpus_per_replica=args.gpus_per_replica, + ) + all_results.append(result) + + ray.shutdown() + + # Clean up temp files + shutil.rmtree(tmp_root, ignore_errors=True) + print(f"Cleaned up {tmp_root}") + + default_json = str(Path(__file__).parent / "bench_results.json") + out_path = args.save_json or default_json + with open(out_path, "w") as f: + json.dump(all_results, f, indent=2, ensure_ascii=False) + print(f"\nResults saved to {out_path}")