[data] DataSourceV2: V2 checkpoint filter integration#63172
Conversation
There was a problem hiding this comment.
Code Review
This pull request enables checkpoint filtering for the V2 ReadFiles operator by introducing a new planning function and updating the Planner configuration. Additionally, it adds a pre-check in load_checkpoint to handle empty directories. The reviewer identified that the current implementation of this pre-check incorrectly handles partitioned and single-file checkpoints and provided a corrected implementation to ensure robust behavior.
444b119 to
622a016
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 622a016. Configure here.
owenowenisme
left a comment
There was a problem hiding this comment.
BTW why not just use v1 read_parquet for checkpoint loading?
I can't think of the benefits there other than consistency.
This will make the checkpoint less divergence.
We're going to be deprecating v1 read_parquet. The point of this change is to have DsV2 be able to do row based checkpointing |
Wire V2 ReadFiles into the checkpoint-filter dispatch path so checkpointed reads behave the same way under V1 and V2. - planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and register ``plan_read_files_op_with_checkpoint_filter`` in ``_get_plan_fns_for_checkpointing``. Plain ``plan_read_files_op`` stays checkpoint-unaware; attaching the filter is solely the planner's responsibility. - planner/checkpoint/plan_read_files_op.py (new): defers to V1's ``create_checkpoint_filter_op`` for the wrapping ActorPool ``CheckpointFilter`` MapOperator. Same memory contract, ``supports_fusion=False``, and not-found short-circuit V1 uses. - planner/checkpoint/__init__.py: re-export. - checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on ``IdColumnCheckpointManager.load_checkpoint`` plus an ``OSError`` narrowing on the post-clean file-info probe — V2's ``read_parquet`` raises on empty dirs (by design) while V1 returned a zero-row dataset, so this keeps ``load_checkpoint`` uniform under both paths. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
622a016 to
a2d844d
Compare
|
Please consider the Also, recommend adding a test or two. |
Also these will use the existing tests in |
) Wire V2 ReadFiles into the checkpoint-filter dispatch path so checkpointed reads behave the same way under V1 and V2. - planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and register ``plan_read_files_op_with_checkpoint_filter`` in ``_get_plan_fns_for_checkpointing``. Plain ``plan_read_files_op`` stays checkpoint-unaware; attaching the filter is solely the planner's responsibility. - planner/checkpoint/plan_read_files_op.py (new): defers to V1's ``create_checkpoint_filter_op`` for the wrapping ActorPool ``CheckpointFilter`` MapOperator. Same memory contract, ``supports_fusion=False``, and not-found short-circuit V1 uses. - planner/checkpoint/__init__.py: re-export. - checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on ``IdColumnCheckpointManager.load_checkpoint`` plus an ``OSError`` narrowing on the post-clean file-info probe — V2's ``read_parquet`` raises on empty dirs (by design) while V1 returned a zero-row dataset, so this keeps ``load_checkpoint`` uniform under both paths. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
) Wire V2 ReadFiles into the checkpoint-filter dispatch path so checkpointed reads behave the same way under V1 and V2. - planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and register ``plan_read_files_op_with_checkpoint_filter`` in ``_get_plan_fns_for_checkpointing``. Plain ``plan_read_files_op`` stays checkpoint-unaware; attaching the filter is solely the planner's responsibility. - planner/checkpoint/plan_read_files_op.py (new): defers to V1's ``create_checkpoint_filter_op`` for the wrapping ActorPool ``CheckpointFilter`` MapOperator. Same memory contract, ``supports_fusion=False``, and not-found short-circuit V1 uses. - planner/checkpoint/__init__.py: re-export. - checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on ``IdColumnCheckpointManager.load_checkpoint`` plus an ``OSError`` narrowing on the post-clean file-info probe — V2's ``read_parquet`` raises on empty dirs (by design) while V1 returned a zero-row dataset, so this keeps ``load_checkpoint`` uniform under both paths. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
) Wire V2 ReadFiles into the checkpoint-filter dispatch path so checkpointed reads behave the same way under V1 and V2. - planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and register ``plan_read_files_op_with_checkpoint_filter`` in ``_get_plan_fns_for_checkpointing``. Plain ``plan_read_files_op`` stays checkpoint-unaware; attaching the filter is solely the planner's responsibility. - planner/checkpoint/plan_read_files_op.py (new): defers to V1's ``create_checkpoint_filter_op`` for the wrapping ActorPool ``CheckpointFilter`` MapOperator. Same memory contract, ``supports_fusion=False``, and not-found short-circuit V1 uses. - planner/checkpoint/__init__.py: re-export. - checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on ``IdColumnCheckpointManager.load_checkpoint`` plus an ``OSError`` narrowing on the post-clean file-info probe — V2's ``read_parquet`` raises on empty dirs (by design) while V1 returned a zero-row dataset, so this keeps ``load_checkpoint`` uniform under both paths. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
) Wire V2 ReadFiles into the checkpoint-filter dispatch path so checkpointed reads behave the same way under V1 and V2. - planner.py: add ReadFiles to ``_CHECKPOINT_FILTER_OPS`` and register ``plan_read_files_op_with_checkpoint_filter`` in ``_get_plan_fns_for_checkpointing``. Plain ``plan_read_files_op`` stays checkpoint-unaware; attaching the filter is solely the planner's responsibility. - planner/checkpoint/plan_read_files_op.py (new): defers to V1's ``create_checkpoint_filter_op`` for the wrapping ActorPool ``CheckpointFilter`` MapOperator. Same memory contract, ``supports_fusion=False``, and not-found short-circuit V1 uses. - planner/checkpoint/__init__.py: re-export. - checkpoint/checkpoint_filter.py: defensive empty-dir pre-check on ``IdColumnCheckpointManager.load_checkpoint`` plus an ``OSError`` narrowing on the post-clean file-info probe — V2's ``read_parquet`` raises on empty dirs (by design) while V1 returned a zero-row dataset, so this keeps ``load_checkpoint`` uniform under both paths. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>

Wire V2 ReadFiles into the checkpoint-filter dispatch path so
checkpointed reads behave the same way under V1 and V2.
_CHECKPOINT_FILTER_OPSandregister
plan_read_files_op_with_checkpoint_filterin_get_plan_fns_for_checkpointing. Plainplan_read_files_opstays checkpoint-unaware; attaching thefilter is solely the planner's responsibility.
create_checkpoint_filter_opfor the wrapping ActorPoolCheckpointFilterMapOperator. Same memory contract,supports_fusion=False, and not-found short-circuit V1 uses.IdColumnCheckpointManager.load_checkpointplus anOSErrornarrowing on the post-clean file-info probe — V2'sread_parquetraises on empty dirs (by design) while V1returned a zero-row dataset, so this keeps
load_checkpointuniform under both paths.
Signed-off-by: Goutam goutam@anyscale.com
Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com