Skip to content

[data] DataSourceV2: V2 checkpoint filter integration#63172

Merged
goutamvenkat-anyscale merged 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:pr63172
May 11, 2026
Merged

[data] DataSourceV2: V2 checkpoint filter integration#63172
goutamvenkat-anyscale merged 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:pr63172

Conversation

@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.

  • planner.py: add ReadFiles to _CHECKPOINT_FILTER_OPS and
    register plan_read_files_op_with_checkpoint_filter in
    _get_plan_fns_for_checkpointing. Plain
    plan_read_files_op stays checkpoint-unaware; attaching the
    filter is solely the planner's responsibility.
  • planner/checkpoint/plan_read_files_op.py (new): defers to V1's
    create_checkpoint_filter_op for the wrapping ActorPool
    CheckpointFilter MapOperator. Same memory contract,
    supports_fusion=False, and not-found short-circuit V1 uses.
  • planner/checkpoint/init.py: re-export.
  • checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on
    IdColumnCheckpointManager.load_checkpoint plus an
    OSError narrowing on the post-clean file-info probe — V2's
    read_parquet raises on empty dirs (by design) while V1
    returned a zero-row dataset, so this keeps load_checkpoint
    uniform under both paths.

Signed-off-by: Goutam goutam@anyscale.com
Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com

@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner May 7, 2026 00:33
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enables checkpoint filtering for the V2 ReadFiles operator by introducing a new planning function and updating the Planner configuration. Additionally, it adds a pre-check in load_checkpoint to handle empty directories. The reviewer identified that the current implementation of this pre-check incorrectly handles partitioned and single-file checkpoints and provided a corrected implementation to ensure robust behavior.

Comment thread python/ray/data/checkpoint/checkpoint_filter.py Outdated
Comment thread python/ray/data/checkpoint/checkpoint_filter.py Outdated
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label May 7, 2026
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 622a016. Configure here.

Comment thread python/ray/data/checkpoint/checkpoint_filter.py
@goutamvenkat-anyscale goutamvenkat-anyscale added the go add ONLY when ready to merge, run all tests label May 7, 2026
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW why not just use v1 read_parquet for checkpoint loading?
I can't think of the benefits there other than consistency.

This will make the checkpoint less divergence.

Comment thread python/ray/data/checkpoint/checkpoint_filter.py
@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor Author

goutamvenkat-anyscale commented May 8, 2026

BTW why not just use v1 read_parquet for checkpoint loading? I can't think of the benefits there other than consistency.

This will make the checkpoint less divergence.

We're going to be deprecating v1 read_parquet. The point of this change is to have DsV2 be able to do row based checkpointing

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.

- planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and
  register ``plan_read_files_op_with_checkpoint_filter`` in
  ``_get_plan_fns_for_checkpointing``. Plain
  ``plan_read_files_op`` stays checkpoint-unaware; attaching the
  filter is solely the planner's responsibility.
- planner/checkpoint/plan_read_files_op.py (new): defers to V1's
  ``create_checkpoint_filter_op`` for the wrapping ActorPool
  ``CheckpointFilter`` MapOperator. Same memory contract,
  ``supports_fusion=False``, and not-found short-circuit V1 uses.
- planner/checkpoint/__init__.py: re-export.
- checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on
  ``IdColumnCheckpointManager.load_checkpoint`` plus an
  ``OSError`` narrowing on the post-clean file-info probe — V2's
  ``read_parquet`` raises on empty dirs (by design) while V1
  returned a zero-row dataset, so this keeps ``load_checkpoint``
  uniform under both paths.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhishekverma-ray
Copy link
Copy Markdown

Please consider the OSError handling changes suggested by the reviewer bot above.

Also, recommend adding a test or two.

@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor Author

Please consider the OSError handling changes suggested by the reviewer bot above.

Also, recommend adding a test or two.

Also these will use the existing tests in test_checkpoint and test_parquet when I switch the flag.

Copy link
Copy Markdown

@abhishekverma-ray abhishekverma-ray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@goutamvenkat-anyscale goutamvenkat-anyscale merged commit 8324b66 into ray-project:master May 11, 2026
7 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the pr63172 branch May 11, 2026 18:00
dancingactor pushed a commit to dancingactor/ray that referenced this pull request May 13, 2026
)

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.

- planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and
  register ``plan_read_files_op_with_checkpoint_filter`` in
  ``_get_plan_fns_for_checkpointing``. Plain
  ``plan_read_files_op`` stays checkpoint-unaware; attaching the
  filter is solely the planner's responsibility.
- planner/checkpoint/plan_read_files_op.py (new): defers to V1's
  ``create_checkpoint_filter_op`` for the wrapping ActorPool
  ``CheckpointFilter`` MapOperator. Same memory contract,
  ``supports_fusion=False``, and not-found short-circuit V1 uses.
- planner/checkpoint/__init__.py: re-export.
- checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on
  ``IdColumnCheckpointManager.load_checkpoint`` plus an
  ``OSError`` narrowing on the post-clean file-info probe — V2's
  ``read_parquet`` raises on empty dirs (by design) while V1
  returned a zero-row dataset, so this keeps ``load_checkpoint``
  uniform under both paths.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
am-kinetica pushed a commit to kineticadb/ray that referenced this pull request May 14, 2026
)

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.

- planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and
  register ``plan_read_files_op_with_checkpoint_filter`` in
  ``_get_plan_fns_for_checkpointing``. Plain
  ``plan_read_files_op`` stays checkpoint-unaware; attaching the
  filter is solely the planner's responsibility.
- planner/checkpoint/plan_read_files_op.py (new): defers to V1's
  ``create_checkpoint_filter_op`` for the wrapping ActorPool
  ``CheckpointFilter`` MapOperator. Same memory contract,
  ``supports_fusion=False``, and not-found short-circuit V1 uses.
- planner/checkpoint/__init__.py: re-export.
- checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on
  ``IdColumnCheckpointManager.load_checkpoint`` plus an
  ``OSError`` narrowing on the post-clean file-info probe — V2's
  ``read_parquet`` raises on empty dirs (by design) while V1
  returned a zero-row dataset, so this keeps ``load_checkpoint``
  uniform under both paths.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
)

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.

- planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and
  register ``plan_read_files_op_with_checkpoint_filter`` in
  ``_get_plan_fns_for_checkpointing``. Plain
  ``plan_read_files_op`` stays checkpoint-unaware; attaching the
  filter is solely the planner's responsibility.
- planner/checkpoint/plan_read_files_op.py (new): defers to V1's
  ``create_checkpoint_filter_op`` for the wrapping ActorPool
  ``CheckpointFilter`` MapOperator. Same memory contract,
  ``supports_fusion=False``, and not-found short-circuit V1 uses.
- planner/checkpoint/__init__.py: re-export.
- checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on
  ``IdColumnCheckpointManager.load_checkpoint`` plus an
  ``OSError`` narrowing on the post-clean file-info probe — V2's
  ``read_parquet`` raises on empty dirs (by design) while V1
  returned a zero-row dataset, so this keeps ``load_checkpoint``
  uniform under both paths.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
alexandrplashchinsky pushed a commit to alexandrplashchinsky/ray-alex that referenced this pull request May 29, 2026
)

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.

- planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and
  register ``plan_read_files_op_with_checkpoint_filter`` in
  ``_get_plan_fns_for_checkpointing``. Plain
  ``plan_read_files_op`` stays checkpoint-unaware; attaching the
  filter is solely the planner's responsibility.
- planner/checkpoint/plan_read_files_op.py (new): defers to V1's
  ``create_checkpoint_filter_op`` for the wrapping ActorPool
  ``CheckpointFilter`` MapOperator. Same memory contract,
  ``supports_fusion=False``, and not-found short-circuit V1 uses.
- planner/checkpoint/__init__.py: re-export.
- checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on
  ``IdColumnCheckpointManager.load_checkpoint`` plus an
  ``OSError`` narrowing on the post-clean file-info probe — V2's
  ``read_parquet`` raises on empty dirs (by design) while V1
  returned a zero-row dataset, so this keeps ``load_checkpoint``
  uniform under both paths.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants