diff --git a/python/ray/data/_internal/planner/checkpoint/__init__.py b/python/ray/data/_internal/planner/checkpoint/__init__.py index 6ee2137c2e9b..51d0c6157c7d 100644 --- a/python/ray/data/_internal/planner/checkpoint/__init__.py +++ b/python/ray/data/_internal/planner/checkpoint/__init__.py @@ -1,3 +1,4 @@ +from .plan_read_files_op import plan_read_files_op_with_checkpoint_filter from .plan_read_op import ( create_checkpoint_filter_op, plan_read_op_with_checkpoint_filter, @@ -5,6 +6,7 @@ from .plan_write_op import plan_write_op_with_checkpoint_writer __all__ = [ + "plan_read_files_op_with_checkpoint_filter", "create_checkpoint_filter_op", "plan_read_op_with_checkpoint_filter", "plan_write_op_with_checkpoint_writer", diff --git a/python/ray/data/_internal/planner/checkpoint/plan_read_files_op.py b/python/ray/data/_internal/planner/checkpoint/plan_read_files_op.py new file mode 100644 index 000000000000..19fcc1bb5089 --- /dev/null +++ b/python/ray/data/_internal/planner/checkpoint/plan_read_files_op.py @@ -0,0 +1,47 @@ +"""Checkpoint-aware planner for the V2 ``ReadFiles`` logical operator. + +Mirrors :func:`plan_read_op_with_checkpoint_filter` (the V1 ``Read`` +variant) so V2 uses the same wrapping ActorPool ``CheckpointFilter`` +``MapOperator`` downstream of the read: same +``_CheckpointFilterFn`` / ``_get_checkpoint_map_transformer``, same +memory reservation formula, and ``supports_fusion=False`` so the +filter stays a distinct op. + +Registered via ``Planner._get_plan_fns_for_checkpointing`` so it only +runs when ``DataContext.checkpoint_config`` is set *and* the logical +plan is a ``Write`` or ``StreamingSplit`` with a ``ReadFiles`` at the +leaf. V2's plain ``plan_read_files_op`` stays checkpoint-unaware; this +file is the only place V2 reads pick up a checkpoint filter. +""" +from typing import List, Optional + +import pyarrow +import pyarrow.fs + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.logical.operators import ReadFiles +from ray.data._internal.planner.checkpoint.plan_read_op import ( + create_checkpoint_filter_op, +) +from ray.data._internal.planner.plan_read_files_op import plan_read_files_op +from ray.data.context import DataContext + + +def plan_read_files_op_with_checkpoint_filter( + data_file_dir: Optional[str], + data_file_filesystem: Optional["pyarrow.fs.FileSystem"], + op: ReadFiles, + physical_children: List[PhysicalOperator], + data_context: DataContext, +) -> PhysicalOperator: + """Wrap a V2 ``ReadFiles`` physical op with a ``CheckpointFilter``. + + Defers all wrapping behavior to V1's :func:`create_checkpoint_filter_op` + so the not-found short-circuit, ``IdColumnCheckpointManager.load_checkpoint`` + invocation, actor-pool sizing, and ``supports_fusion=False`` placement stay + in one place across the V1 and V2 read paths. + """ + physical_read_op = plan_read_files_op(op, physical_children, data_context) + return create_checkpoint_filter_op( + physical_read_op, data_context, data_file_dir, data_file_filesystem + ) diff --git a/python/ray/data/_internal/planner/planner.py b/python/ray/data/_internal/planner/planner.py index e8b369b3ad5c..c87c22113392 100644 --- a/python/ray/data/_internal/planner/planner.py +++ b/python/ray/data/_internal/planner/planner.py @@ -46,6 +46,7 @@ Zip, ) from ray.data._internal.planner.checkpoint import ( + plan_read_files_op_with_checkpoint_filter, plan_read_op_with_checkpoint_filter, plan_write_op_with_checkpoint_writer, ) @@ -191,7 +192,7 @@ class Planner: Download: plan_download_op, } # Operators that support checkpoint filtering. Subclasses can override. - _CHECKPOINT_FILTER_OPS = (Read,) + _CHECKPOINT_FILTER_OPS = (Read, ReadFiles) def __init__(self): self._supports_checkpointing = False @@ -331,6 +332,11 @@ def _get_plan_fns_for_checkpointing( Read: partial( plan_read_op_with_checkpoint_filter, data_file_dir, data_file_filesystem ), + ReadFiles: partial( + plan_read_files_op_with_checkpoint_filter, + data_file_dir, + data_file_filesystem, + ), Write: plan_write_op_with_checkpoint_writer, } return plan_fns diff --git a/python/ray/data/checkpoint/checkpoint_filter.py b/python/ray/data/checkpoint/checkpoint_filter.py index b2cf5104d559..2a4d447ffdbf 100644 --- a/python/ray/data/checkpoint/checkpoint_filter.py +++ b/python/ray/data/checkpoint/checkpoint_filter.py @@ -215,6 +215,24 @@ def load_checkpoint( if data_file_dir is not None: self._clean_pending_checkpoints(data_file_dir, data_file_filesystem) + # If the checkpoint directory has no remaining data files (e.g., all + # entries were pending checkpoints that were just cleaned up), skip + # the inner ``read_parquet``. V2's ``read_parquet`` raises on empty + # directories while V1 returned a zero-row dataset; this pre-check + # keeps ``load_checkpoint`` behaving the same under both. + # Recurse when a partition filter is configured because committed + # files live under Hive-partitioned subdirectories rather than at + # the top level. + entries = self.filesystem.get_file_info( + FileSelector( + self.checkpoint_path_unwrapped, + recursive=self.checkpoint_path_partition_filter is not None, + allow_not_found=True, + ) + ) + if not any(f.type == FileType.File for f in entries): + return None, 0 + # Load the checkpoint data checkpoint_ds: ray.data.Dataset = ray.data.read_parquet( self.checkpoint_path,