Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/data/_internal/planner/checkpoint/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .plan_read_files_op import plan_read_files_op_with_checkpoint_filter
from .plan_read_op import (
create_checkpoint_filter_op,
plan_read_op_with_checkpoint_filter,
)
from .plan_write_op import plan_write_op_with_checkpoint_writer

__all__ = [
"plan_read_files_op_with_checkpoint_filter",
"create_checkpoint_filter_op",
"plan_read_op_with_checkpoint_filter",
"plan_write_op_with_checkpoint_writer",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Checkpoint-aware planner for the V2 ``ReadFiles`` logical operator.

Mirrors :func:`plan_read_op_with_checkpoint_filter` (the V1 ``Read``
variant) so V2 uses the same wrapping ActorPool ``CheckpointFilter``
``MapOperator`` downstream of the read: same
``_CheckpointFilterFn`` / ``_get_checkpoint_map_transformer``, same
memory reservation formula, and ``supports_fusion=False`` so the
filter stays a distinct op.

Registered via ``Planner._get_plan_fns_for_checkpointing`` so it only
runs when ``DataContext.checkpoint_config`` is set *and* the logical
plan is a ``Write`` or ``StreamingSplit`` with a ``ReadFiles`` at the
leaf. V2's plain ``plan_read_files_op`` stays checkpoint-unaware; this
file is the only place V2 reads pick up a checkpoint filter.
"""
from typing import List, Optional

import pyarrow
import pyarrow.fs

from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.logical.operators import ReadFiles
from ray.data._internal.planner.checkpoint.plan_read_op import (
create_checkpoint_filter_op,
)
from ray.data._internal.planner.plan_read_files_op import plan_read_files_op
from ray.data.context import DataContext


def plan_read_files_op_with_checkpoint_filter(
data_file_dir: Optional[str],
data_file_filesystem: Optional["pyarrow.fs.FileSystem"],
op: ReadFiles,
physical_children: List[PhysicalOperator],
data_context: DataContext,
) -> PhysicalOperator:
"""Wrap a V2 ``ReadFiles`` physical op with a ``CheckpointFilter``.

Defers all wrapping behavior to V1's :func:`create_checkpoint_filter_op`
so the not-found short-circuit, ``IdColumnCheckpointManager.load_checkpoint``
invocation, actor-pool sizing, and ``supports_fusion=False`` placement stay
in one place across the V1 and V2 read paths.
"""
physical_read_op = plan_read_files_op(op, physical_children, data_context)
return create_checkpoint_filter_op(
physical_read_op, data_context, data_file_dir, data_file_filesystem
)
8 changes: 7 additions & 1 deletion python/ray/data/_internal/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
Zip,
)
from ray.data._internal.planner.checkpoint import (
plan_read_files_op_with_checkpoint_filter,
plan_read_op_with_checkpoint_filter,
plan_write_op_with_checkpoint_writer,
)
Expand Down Expand Up @@ -191,7 +192,7 @@ class Planner:
Download: plan_download_op,
}
# Operators that support checkpoint filtering. Subclasses can override.
_CHECKPOINT_FILTER_OPS = (Read,)
_CHECKPOINT_FILTER_OPS = (Read, ReadFiles)

def __init__(self):
self._supports_checkpointing = False
Expand Down Expand Up @@ -331,6 +332,11 @@ def _get_plan_fns_for_checkpointing(
Read: partial(
plan_read_op_with_checkpoint_filter, data_file_dir, data_file_filesystem
),
ReadFiles: partial(
plan_read_files_op_with_checkpoint_filter,
data_file_dir,
data_file_filesystem,
),
Write: plan_write_op_with_checkpoint_writer,
}
return plan_fns
Expand Down
18 changes: 18 additions & 0 deletions python/ray/data/checkpoint/checkpoint_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,24 @@ def load_checkpoint(
if data_file_dir is not None:
self._clean_pending_checkpoints(data_file_dir, data_file_filesystem)

# If the checkpoint directory has no remaining data files (e.g., all
# entries were pending checkpoints that were just cleaned up), skip
# the inner ``read_parquet``. V2's ``read_parquet`` raises on empty
# directories while V1 returned a zero-row dataset; this pre-check
# keeps ``load_checkpoint`` behaving the same under both.
# Recurse when a partition filter is configured because committed
# files live under Hive-partitioned subdirectories rather than at
# the top level.
entries = self.filesystem.get_file_info(
FileSelector(
self.checkpoint_path_unwrapped,
recursive=self.checkpoint_path_partition_filter is not None,
allow_not_found=True,
)
)
if not any(f.type == FileType.File for f in entries):
return None, 0
Comment thread
cursor[bot] marked this conversation as resolved.

# Load the checkpoint data
checkpoint_ds: ray.data.Dataset = ray.data.read_parquet(
self.checkpoint_path,
Expand Down
Loading