Skip to content

Commit 8324b66

Browse files
[data] DataSourceV2: V2 checkpoint filter integration (#63172)
Wire V2 ReadFiles into the checkpoint-filter dispatch path so checkpointed reads behave the same way under V1 and V2. - planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and register ``plan_read_files_op_with_checkpoint_filter`` in ``_get_plan_fns_for_checkpointing``. Plain ``plan_read_files_op`` stays checkpoint-unaware; attaching the filter is solely the planner's responsibility. - planner/checkpoint/plan_read_files_op.py (new): defers to V1's ``create_checkpoint_filter_op`` for the wrapping ActorPool ``CheckpointFilter`` MapOperator. Same memory contract, ``supports_fusion=False``, and not-found short-circuit V1 uses. - planner/checkpoint/__init__.py: re-export. - checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on ``IdColumnCheckpointManager.load_checkpoint`` plus an ``OSError`` narrowing on the post-clean file-info probe — V2's ``read_parquet`` raises on empty dirs (by design) while V1 returned a zero-row dataset, so this keeps ``load_checkpoint`` uniform under both paths. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 110e83b commit 8324b66

4 files changed

Lines changed: 74 additions & 1 deletion

File tree

python/ray/data/_internal/planner/checkpoint/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
from .plan_read_files_op import plan_read_files_op_with_checkpoint_filter
12
from .plan_read_op import (
23
create_checkpoint_filter_op,
34
plan_read_op_with_checkpoint_filter,
45
)
56
from .plan_write_op import plan_write_op_with_checkpoint_writer
67

78
__all__ = [
9+
"plan_read_files_op_with_checkpoint_filter",
810
"create_checkpoint_filter_op",
911
"plan_read_op_with_checkpoint_filter",
1012
"plan_write_op_with_checkpoint_writer",
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Checkpoint-aware planner for the V2 ``ReadFiles`` logical operator.
2+
3+
Mirrors :func:`plan_read_op_with_checkpoint_filter` (the V1 ``Read``
4+
variant) so V2 uses the same wrapping ActorPool ``CheckpointFilter``
5+
``MapOperator`` downstream of the read: same
6+
``_CheckpointFilterFn`` / ``_get_checkpoint_map_transformer``, same
7+
memory reservation formula, and ``supports_fusion=False`` so the
8+
filter stays a distinct op.
9+
10+
Registered via ``Planner._get_plan_fns_for_checkpointing`` so it only
11+
runs when ``DataContext.checkpoint_config`` is set *and* the logical
12+
plan is a ``Write`` or ``StreamingSplit`` with a ``ReadFiles`` at the
13+
leaf. V2's plain ``plan_read_files_op`` stays checkpoint-unaware; this
14+
file is the only place V2 reads pick up a checkpoint filter.
15+
"""
16+
from typing import List, Optional
17+
18+
import pyarrow
19+
import pyarrow.fs
20+
21+
from ray.data._internal.execution.interfaces import PhysicalOperator
22+
from ray.data._internal.logical.operators import ReadFiles
23+
from ray.data._internal.planner.checkpoint.plan_read_op import (
24+
create_checkpoint_filter_op,
25+
)
26+
from ray.data._internal.planner.plan_read_files_op import plan_read_files_op
27+
from ray.data.context import DataContext
28+
29+
30+
def plan_read_files_op_with_checkpoint_filter(
31+
data_file_dir: Optional[str],
32+
data_file_filesystem: Optional["pyarrow.fs.FileSystem"],
33+
op: ReadFiles,
34+
physical_children: List[PhysicalOperator],
35+
data_context: DataContext,
36+
) -> PhysicalOperator:
37+
"""Wrap a V2 ``ReadFiles`` physical op with a ``CheckpointFilter``.
38+
39+
Defers all wrapping behavior to V1's :func:`create_checkpoint_filter_op`
40+
so the not-found short-circuit, ``IdColumnCheckpointManager.load_checkpoint``
41+
invocation, actor-pool sizing, and ``supports_fusion=False`` placement stay
42+
in one place across the V1 and V2 read paths.
43+
"""
44+
physical_read_op = plan_read_files_op(op, physical_children, data_context)
45+
return create_checkpoint_filter_op(
46+
physical_read_op, data_context, data_file_dir, data_file_filesystem
47+
)

python/ray/data/_internal/planner/planner.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
Zip,
4747
)
4848
from ray.data._internal.planner.checkpoint import (
49+
plan_read_files_op_with_checkpoint_filter,
4950
plan_read_op_with_checkpoint_filter,
5051
plan_write_op_with_checkpoint_writer,
5152
)
@@ -191,7 +192,7 @@ class Planner:
191192
Download: plan_download_op,
192193
}
193194
# Operators that support checkpoint filtering. Subclasses can override.
194-
_CHECKPOINT_FILTER_OPS = (Read,)
195+
_CHECKPOINT_FILTER_OPS = (Read, ReadFiles)
195196

196197
def __init__(self):
197198
self._supports_checkpointing = False
@@ -331,6 +332,11 @@ def _get_plan_fns_for_checkpointing(
331332
Read: partial(
332333
plan_read_op_with_checkpoint_filter, data_file_dir, data_file_filesystem
333334
),
335+
ReadFiles: partial(
336+
plan_read_files_op_with_checkpoint_filter,
337+
data_file_dir,
338+
data_file_filesystem,
339+
),
334340
Write: plan_write_op_with_checkpoint_writer,
335341
}
336342
return plan_fns

python/ray/data/checkpoint/checkpoint_filter.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,24 @@ def load_checkpoint(
215215
if data_file_dir is not None:
216216
self._clean_pending_checkpoints(data_file_dir, data_file_filesystem)
217217

218+
# If the checkpoint directory has no remaining data files (e.g., all
219+
# entries were pending checkpoints that were just cleaned up), skip
220+
# the inner ``read_parquet``. V2's ``read_parquet`` raises on empty
221+
# directories while V1 returned a zero-row dataset; this pre-check
222+
# keeps ``load_checkpoint`` behaving the same under both.
223+
# Recurse when a partition filter is configured because committed
224+
# files live under Hive-partitioned subdirectories rather than at
225+
# the top level.
226+
entries = self.filesystem.get_file_info(
227+
FileSelector(
228+
self.checkpoint_path_unwrapped,
229+
recursive=self.checkpoint_path_partition_filter is not None,
230+
allow_not_found=True,
231+
)
232+
)
233+
if not any(f.type == FileType.File for f in entries):
234+
return None, 0
235+
218236
# Load the checkpoint data
219237
checkpoint_ds: ray.data.Dataset = ray.data.read_parquet(
220238
self.checkpoint_path,

0 commit comments

Comments
 (0)