fix: preserve ordered seed dataset position on resume#710
Conversation
Preserve planned row-group start offsets during resume so ordered seed datasets continue from the next seed row instead of replaying already-consumed rows. Fixes #709
- Simplify _run_batch context-var setup so current_row_group is set consistently in fresh and resumed sync runs (matches the (x/X) log prefix the async path already emits) and add a docstring spelling out which ContextVars the function owns. - Document RowGroupResumePlan and build_row_group_resume_plan, and make the plan dataclass frozen+slots since it is a one-shot value. - Comment the modulo cycling logic in _index_range_at_offset. - Add a scheduler test verifying fresh async runs auto-derive the per-row-group offsets from row-group sizes (no caller-supplied offsets) so ordered generators stay parallel-safe across row groups. - Add a wraparound regression test that resumes past a full seed cycle, exercising the relative_offset == 0 branch the original #709 regression test missed.
- update current_row_group ContextVar comment to reflect that both the async scheduler and the sync engine's _run_batch set it - move pre_batch_snapshot capture (and ran_pre_batch flag) inside the try/finally in _run_batch so a failure between ContextVar.set and the snapshot call still resets the tokens - add a direct unit test for the relative_offset == 0 wraparound branch in _index_range_at_offset to lock in the fresh-cycle restart behavior
Code Review: PR #710 — fix: preserve ordered seed dataset position on resumeSummaryFixes #709, where The change is well-scoped: a new FindingsCorrectness
Async fresh-run behavioral changeThe async scheduler now seeks to per-row-group offsets on fresh runs as well as resumed runs. The PR description frames this as removing an implicit shared-reader assumption (parallel-safe).
Style / project conventions
Minor nits (non-blocking)
Test coverageStrong. Coverage spans:
Test refactors ( Security / performanceNo security concerns. No new I/O, no string interpolation against external data, no deserialization. Performance impact negligible: one extra VerdictLooks good. The fix targets the reported bug, the design (per-row-group offsets via ContextVar) is the right shape — order-dependent generators stay decoupled from the engine — and test coverage is thorough on both sync and async paths. Recommend a manual smoke test of fresh async ORDERED seed generation against a real on-disk seed dataset before release to validate the behavioral change called out under "Async fresh-run behavioral change" above. Otherwise no blocking concerns. |
Greptile SummaryThis PR fixes a bug where resuming an
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py | Adds offset-based seek for ORDERED strategy: reads current_row_group_start_offset and calls _index_range_at_offset to compute the correct IndexRange, including modulo wraparound on full cycles; _df_remaining is cleared on every seek-based entry so stale leftovers never bleed into a resumed batch. |
| packages/data-designer-engine/src/data_designer/engine/context.py | Adds current_row_group_start_offset ContextVar with default None; comment updated to reflect that both engines now set current_row_group. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Accepts optional row_group_start_offsets dict and initial_completed_records; auto-computes offsets from row-group sizes when not supplied; sets and resets current_row_group_start_offset per task in LIFO order inside the existing finally block. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Introduces RowGroupResumePlan and build_row_group_resume_plan for stable per-group offsets across resume holes; threads row_group_start_offset through _run_batch; refactored _run_batch correctly wraps both ContextVars in a single try/finally with LIFO reset order. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py | Adds initial_completed parameter; pre-advances next_log_at past it; rate computed only over records completed since the current run started. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py | Adds optional scheduled_records to log_start; uses remaining scheduled work rather than the full tracker total when available, giving accurate task counts in resume log messages. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py | Adds three end-to-end regression tests (basic #709 repro, cycle-boundary extension, PartitionBlock resume) and a build_row_group_resume_plan unit test; replaces heavy mock pyramids with real _make_sampler_only_builder builds for IF_POSSIBLE tests. |
Sequence Diagram
sequenceDiagram
participant Engine as Sync/Async Engine
participant RB as _run_batch / _execute_task
participant CV as current_row_group_start_offset (ContextVar)
participant Gen as SeedDatasetColumnGenerator
participant IR as _index_range_at_offset
Engine->>RB: "row_group_start_offset = sum(sizes[:batch_idx])"
RB->>CV: set(row_group_start_offset)
RB->>Gen: generate_from_scratch(num_records)
Gen->>CV: get() → row_group_start_offset
alt ORDERED and offset is not None
Gen->>Gen: "_df_remaining = None"
Gen->>IR: _index_range_at_offset(offset)
IR-->>Gen: IndexRange(start + relative_offset, end)
Gen->>Gen: _reset_batch_reader(num_records, record_offset)
else batch_reader is None (fresh sync)
Gen->>Gen: _reset_batch_reader(num_records)
end
Gen-->>RB: DataFrame
RB->>CV: reset(token)
Reviews (5): Last reviewed commit: "Merge branch 'main' into nmulepati/fix-7..." | Re-trigger Greptile
Companion to the existing IndexRange resume test. Locks in correct behavior when the seed selection comes from PartitionBlock — its to_index_range produces a contiguous range today, but nothing else asserts that contract. The test crosses a cycle boundary inside the partition (4 records over a 2-row partition) so it exercises both the offset-into-partition branch and the relative_offset == 0 wraparound branch end-to-end.
|
Nice work on this one, @nabinchha; thanks for the careful resume regression coverage. SummaryThis PR preserves planned row-group start offsets across sync and async resume so ordered seed datasets continue at the correct seed row instead of replaying completed rows. The implementation matches the PR description: offsets are computed from the original row-group plan, threaded through scheduler/builder context, and consumed only by ordered seed readers while shuffle behavior remains unchanged. FindingsNo findings. What Looks Good
VerdictShip it. This review was generated by an AI assistant. |
📋 Summary
ResumeMode.ALWAYSreplayed the first seed row(s) for ordered seed datasets after an interrupted run, because the resumed process'sSeedDatasetColumnGeneratoralways started reading at the configuredIndexRange.start. This PR threads each row group's planned start offset through both the sync and async engines so ordered seed readers seek to the correct position when resuming, and adds regression coverage that mirrors the issue's minimal repro.Scope. This fixes resume for
SamplingStrategy.ORDEREDwith or without a selection strategy (IndexRange,PartitionBlock, or none).SamplingStrategy.SHUFFLEis unchanged: its underlyingORDER BY RANDOM()query is unseeded, so a resumed shuffled run already produces a fresh random order with potential row duplication. That's a pre-existing limitation, not introduced or addressed here.🔗 Related Issue
Fixes #709
🔄 Changes
current_row_group_start_offsetContextVar so order-dependent generators can observe each row group's planned start offset without coupling to the engine. Comment oncurrent_row_groupupdated to reflect that both engines now set it.SeedDatasetColumnGenerator._reset_batch_readeraccepts arecord_offsetand computes the seek range via_index_range_at_offset, including modulo wraparound for cycling through the selection.RowGroupResumePlan(frozen + slots) andbuild_row_group_resume_plan. The plan is computed against the original row-group layout so per-group offsets stay stable when resume has holes.AsyncTaskSchedulerauto-derives offsets for fresh runs (parallel-safe; removes the implicit shared-reader assumption).row_group_start_offsetthrough_run_batch. The method also setscurrent_row_groupfor both fresh and resumed runs so the(x/X)log prefix is consistent across engines, owns both ContextVars inside a singletry/finally, and capturespre_batch_snapshotinside thattryso any failure betweensetand the snapshot still resets tokens.🧪 Testing
pytest tests/engine/column_generators/generators/test_seed_dataset.py tests/engine/dataset_builders/test_async_scheduler.py tests/engine/dataset_builders/test_dataset_builder.py→ 251 passedtest_seed_dataset.py:test_seed_dataset_column_generator_reset_batch_reader_applies_record_offset,test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary,test_seed_dataset_column_generator_ordered_generation_uses_row_group_offsettest_async_scheduler.py:test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs(fresh async runs auto-derive per-row-group offsets across multiple row groups, including a non-aligned last group)test_dataset_builder.py: regression test mirroring the Resume replays ordered seed rows after completed checkpoints #709 minimal repro (sync resume after a simulated interruption);test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundaryexercising the cycle-boundary branch end-to-end;test_build_resume_ordered_seed_dataset_with_partition_block_continues_within_partitioncovering ORDERED + PartitionBlock resume and exercising both the offset-into-partition and wraparound branches✅ Checklist
🔍 Attention Areas
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py—RowGroupResumePlansemantics (offsets snapshot the original plan, not the remaining list, so they stay stable when there are holes) and the_run_batchContextVar lifecycle.packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py—_index_range_at_offsetmodulo wraparound:record_offset % selected_size == 0must return the original full range so the next read restarts a fresh cycle (not a degenerate empty range).