Skip to content

fix: preserve ordered seed dataset position on resume#710

Merged
nabinchha merged 8 commits into
mainfrom
nmulepati/fix-709-seed-resume-offset
Jun 1, 2026
Merged

fix: preserve ordered seed dataset position on resume#710
nabinchha merged 8 commits into
mainfrom
nmulepati/fix-709-seed-resume-offset

Conversation

@nabinchha
Copy link
Copy Markdown
Contributor

@nabinchha nabinchha commented May 28, 2026

📋 Summary

ResumeMode.ALWAYS replayed the first seed row(s) for ordered seed datasets after an interrupted run, because the resumed process's SeedDatasetColumnGenerator always started reading at the configured IndexRange.start. This PR threads each row group's planned start offset through both the sync and async engines so ordered seed readers seek to the correct position when resuming, and adds regression coverage that mirrors the issue's minimal repro.

Scope. This fixes resume for SamplingStrategy.ORDERED with or without a selection strategy (IndexRange, PartitionBlock, or none). SamplingStrategy.SHUFFLE is unchanged: its underlying ORDER BY RANDOM() query is unseeded, so a resumed shuffled run already produces a fresh random order with potential row duplication. That's a pre-existing limitation, not introduced or addressed here.

🔗 Related Issue

Fixes #709

🔄 Changes

  • Add current_row_group_start_offset ContextVar so order-dependent generators can observe each row group's planned start offset without coupling to the engine. Comment on current_row_group updated to reflect that both engines now set it.
  • SeedDatasetColumnGenerator._reset_batch_reader accepts a record_offset and computes the seek range via _index_range_at_offset, including modulo wraparound for cycling through the selection.
  • Async resume: introduce RowGroupResumePlan (frozen + slots) and build_row_group_resume_plan. The plan is computed against the original row-group layout so per-group offsets stay stable when resume has holes. AsyncTaskScheduler auto-derives offsets for fresh runs (parallel-safe; removes the implicit shared-reader assumption).
  • Sync resume: thread row_group_start_offset through _run_batch. The method also sets current_row_group for both fresh and resumed runs so the (x/X) log prefix is consistent across engines, owns both ContextVars inside a single try/finally, and captures pre_batch_snapshot inside that try so any failure between set and the snapshot still resets tokens.

🧪 Testing

  • Targeted engine suites pass: pytest tests/engine/column_generators/generators/test_seed_dataset.py tests/engine/dataset_builders/test_async_scheduler.py tests/engine/dataset_builders/test_dataset_builder.py → 251 passed
  • Unit tests added/updated:
    • test_seed_dataset.py: test_seed_dataset_column_generator_reset_batch_reader_applies_record_offset, test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary, test_seed_dataset_column_generator_ordered_generation_uses_row_group_offset
    • test_async_scheduler.py: test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs (fresh async runs auto-derive per-row-group offsets across multiple row groups, including a non-aligned last group)
    • test_dataset_builder.py: regression test mirroring the Resume replays ordered seed rows after completed checkpoints #709 minimal repro (sync resume after a simulated interruption); test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundary exercising the cycle-boundary branch end-to-end; test_build_resume_ordered_seed_dataset_with_partition_block_continues_within_partition covering ORDERED + PartitionBlock resume and exercising both the offset-into-partition and wraparound branches
  • E2E tests added/updated — N/A (covered by unit + integration regressions above)

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO) — will sign off via the dco-assistant bot comment after the PR is open
  • Architecture docs updated — N/A (fix preserves the existing public API; no architectural shift)

🔍 Attention Areas

⚠️ Reviewers: Please pay special attention to the following:

nabinchha added 4 commits May 28, 2026 09:52
Preserve planned row-group start offsets during resume so ordered seed datasets continue from the next seed row instead of replaying already-consumed rows.

Fixes #709
- Simplify _run_batch context-var setup so current_row_group is set
  consistently in fresh and resumed sync runs (matches the (x/X) log
  prefix the async path already emits) and add a docstring spelling
  out which ContextVars the function owns.
- Document RowGroupResumePlan and build_row_group_resume_plan, and
  make the plan dataclass frozen+slots since it is a one-shot value.
- Comment the modulo cycling logic in _index_range_at_offset.
- Add a scheduler test verifying fresh async runs auto-derive the
  per-row-group offsets from row-group sizes (no caller-supplied
  offsets) so ordered generators stay parallel-safe across row groups.
- Add a wraparound regression test that resumes past a full seed
  cycle, exercising the relative_offset == 0 branch the original
  #709 regression test missed.
- update current_row_group ContextVar comment to reflect that both the
  async scheduler and the sync engine's _run_batch set it
- move pre_batch_snapshot capture (and ran_pre_batch flag) inside the
  try/finally in _run_batch so a failure between ContextVar.set and the
  snapshot call still resets the tokens
- add a direct unit test for the relative_offset == 0 wraparound branch
  in _index_range_at_offset to lock in the fresh-cycle restart behavior
@nabinchha nabinchha requested a review from a team as a code owner May 28, 2026 19:37
@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #710 — fix: preserve ordered seed dataset position on resume

Summary

Fixes #709, where ResumeMode.ALWAYS replayed the first seed row(s) for ORDERED seed datasets after an interrupted run. The fix threads each row group's planned start offset through both the sync and async engines so ordered seed readers seek to the correct position on resume.

The change is well-scoped: a new current_row_group_start_offset ContextVar carries the planned offset to order-dependent generators without coupling them to the engine. SeedDatasetColumnGenerator._index_range_at_offset derives the seek range with modulo wraparound for cycling selections. The async path introduces a RowGroupResumePlan dataclass and a pure helper build_row_group_resume_plan; the sync path threads the offset through _run_batch and tightens its ContextVar lifecycle.

Findings

Correctness

  • _index_range_at_offset modulo wraparound is correct. The relative_offset == 0 branch returning self._index_range is the right call — without it, an exactly-cycled offset would yield a start=selected_end+1, end=selected_end empty range. Test test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary covers this directly, and test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundary exercises it end-to-end.
  • Resume plan offsets are stable under holes. build_row_group_resume_plan snapshots offsets from the full original plan and then filters to the remaining groups (dataset_builder.py:222-232). This is the right design — recomputing offsets from remaining_row_groups would shift them when there are gaps. The dedicated unit test test_row_group_resume_plan_keeps_original_offsets_for_remaining_groups (with completed_ids={0, 2}) locks this in.
  • Sync _run_batch ContextVar lifecycle is sound. Both tokens are captured before the try, both reset in finally, and the snapshot/usage-deltas/post-batch logic now lives inside the same try so any failure between set and the rest of the body still resets tokens. The if token is not None guards correctly handle the preview path (no current_batch_number, no offset).
  • Fresh sync runs gain the (x/X) log prefix. Previously the prefix only appeared in async/resume paths; it's now consistent across engines, which the PR description calls out and the code matches (dataset_builder.py:1208-1209).

Async fresh-run behavioral change

The async scheduler now seeks to per-row-group offsets on fresh runs as well as resumed runs. The PR description frames this as removing an implicit shared-reader assumption (parallel-safe).

  • LocalFileSeedReader.create_batch_reader constructs a new query result and DuckDBSeedReaderBatchReader on each call (seed_reader.py:228-243), so creating multiple readers concurrently is safe in principle. Other implementations (DirectorySeedReader, FileContentsSeedReader, HuggingFaceSeedReader, AgentRolloutSeedReader) all derive from this same factory pattern, so the contract holds for them as well.
  • test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs only uses a synthetic in-memory generator; it locks in the offset propagation but does not exercise concurrent create_batch_reader calls against any real seed reader. Suggest: validate manually (or in a follow-up integration test) that the existing async ordered-seed flow against a real on-disk seed dataset still produces the expected sequence under buffer_size=1 with multiple concurrent row groups. The likelihood of regression is low, but this is the kind of change worth a smoke test before release.

Style / project conventions

  • All new code uses from __future__ import annotations, modern typing, absolute imports, and @dataclass(frozen=True, slots=True) — consistent with the style guide.
  • RowGroupResumePlan and build_row_group_resume_plan extract a non-trivial chunk of logic from _build_async into a pure, testable helper. Net readability win.
  • Docstring on _run_batch is longer than the project's "default to no comments" guidance, but the lifecycle it documents is non-obvious (two ContextVars, asymmetric across call sites — fresh sync sets only current_row_group, resume sets both, preview sets neither). I'd keep it.
  • Inline comment on _index_range_at_offset is similarly load-bearing — the modulo branch behavior is exactly the kind of "would surprise a reader" case the styleguide carves out for.

Minor nits (non-blocking)

  • async_scheduler.py:292row_group_start_offsets or self._build_row_group_start_offsets(row_groups) treats an empty dict as "missing." In practice the resume path always passes a non-empty dict and fresh runs pass None, so this is fine, but if row_group_start_offsets is None else row_group_start_offsets would be slightly more explicit about intent. Not worth a re-spin.
  • async_scheduler.py:1566 uses .get(task.row_group) returning None for missing keys. Defensive, but if the map is built from row_groups (auto-derived) or from RowGroupResumePlan.row_group_start_offsets (covers all remaining IDs), a missing key would indicate a logic bug. Either [task.row_group] (fail-fast) or a comment explaining why None is acceptable would be slightly clearer. Current behavior — falling back to None which the seed generator interprets as "no offset, use existing reader" — is reasonable.
  • dataset_builder.py:657row_group_start_offset=sum(self.batch_manager.num_records_list[:batch_idx]) is recomputed in each loop iteration; O(n^2) over batches. For typical batch counts this is irrelevant, but a running accumulator would be marginally cleaner.

Test coverage

Strong. Coverage spans:

  • Direct unit tests on _index_range_at_offset (offset, cycle-boundary wrap).
  • current_row_group_start_offset end-to-end through generate_from_scratch.
  • build_row_group_resume_plan with holes in completed_ids.
  • Async scheduler propagation, both with caller-supplied offsets and auto-derived offsets across non-aligned row groups.
  • Two end-to-end regression tests against the Resume replays ordered seed rows after completed checkpoints #709 minimal repro: basic resume and the cycle-boundary extension case.

Test refactors (_make_sampler_only_builder, _write_incompatible_config_metadata) replace heavy patch.object stacks with real config-driven setup. Good cleanup, and the unrelated _json/_Path/_ArtifactStorage underscore aliases get folded into normal imports — small but welcome.

Security / performance

No security concerns. No new I/O, no string interpolation against external data, no deserialization. Performance impact negligible: one extra dict.get per task on the async side; one extra IndexRange construction per resumed batch on the sync side.

Verdict

Looks good. The fix targets the reported bug, the design (per-row-group offsets via ContextVar) is the right shape — order-dependent generators stay decoupled from the engine — and test coverage is thorough on both sync and async paths.

Recommend a manual smoke test of fresh async ORDERED seed generation against a real on-disk seed dataset before release to validate the behavioral change called out under "Async fresh-run behavioral change" above. Otherwise no blocking concerns.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 28, 2026

Greptile Summary

This PR fixes a bug where resuming an ORDERED seed dataset run always replayed the first seed rows, because the generator always seeked to IndexRange.start on initialization. The fix threads each row group's planned start offset through both engines via a new current_row_group_start_offset ContextVar, and computes the correct seek position (with modulo wraparound for cycling) inside _index_range_at_offset.

  • Sync resume: _build_with_resume now passes row_group_start_offset=sum(num_records_list[:batch_idx]) to _run_batch, which sets the ContextVar for the duration of each batch; generate_from_scratch detects the offset and creates a correctly-positioned batch reader.
  • Async resume: build_row_group_resume_plan snapshots per-group offsets from the original plan (not the remaining list), so offsets remain stable when completed groups leave holes; the AsyncTaskScheduler accepts this map and sets the ContextVar per task.
  • Async fresh runs: the scheduler auto-computes offsets from row-group sizes when no map is supplied, replacing the previous implicit shared-reader assumption with an explicit, order-independent seek.

Confidence Score: 5/5

Safe to merge — the change is narrowly scoped to resume seek logic, preserves fresh-sync behavior, and is backed by regression tests that directly reproduce the reported failure mode.

The offset computation (_index_range_at_offset with modulo wraparound), the ContextVar lifecycle (set before generators run, reset in finally), and the resume plan's use of original-plan offsets (stable across holes) are all correct. Fresh sync runs are untouched. Async fresh runs shift from a shared-reader assumption to explicit per-group seeks, producing equivalent output. The three end-to-end regression tests directly exercise the repro scenario, the cycle-boundary edge case, and the PartitionBlock path.

No files require special attention — the most sensitive logic (_index_range_at_offset, RowGroupResumePlan offset stability, ContextVar cleanup) is all covered by dedicated unit and integration tests.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py Adds offset-based seek for ORDERED strategy: reads current_row_group_start_offset and calls _index_range_at_offset to compute the correct IndexRange, including modulo wraparound on full cycles; _df_remaining is cleared on every seek-based entry so stale leftovers never bleed into a resumed batch.
packages/data-designer-engine/src/data_designer/engine/context.py Adds current_row_group_start_offset ContextVar with default None; comment updated to reflect that both engines now set current_row_group.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Accepts optional row_group_start_offsets dict and initial_completed_records; auto-computes offsets from row-group sizes when not supplied; sets and resets current_row_group_start_offset per task in LIFO order inside the existing finally block.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Introduces RowGroupResumePlan and build_row_group_resume_plan for stable per-group offsets across resume holes; threads row_group_start_offset through _run_batch; refactored _run_batch correctly wraps both ContextVars in a single try/finally with LIFO reset order.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py Adds initial_completed parameter; pre-advances next_log_at past it; rate computed only over records completed since the current run started.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py Adds optional scheduled_records to log_start; uses remaining scheduled work rather than the full tracker total when available, giving accurate task counts in resume log messages.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Adds three end-to-end regression tests (basic #709 repro, cycle-boundary extension, PartitionBlock resume) and a build_row_group_resume_plan unit test; replaces heavy mock pyramids with real _make_sampler_only_builder builds for IF_POSSIBLE tests.

Sequence Diagram

sequenceDiagram
    participant Engine as Sync/Async Engine
    participant RB as _run_batch / _execute_task
    participant CV as current_row_group_start_offset (ContextVar)
    participant Gen as SeedDatasetColumnGenerator
    participant IR as _index_range_at_offset

    Engine->>RB: "row_group_start_offset = sum(sizes[:batch_idx])"
    RB->>CV: set(row_group_start_offset)
    RB->>Gen: generate_from_scratch(num_records)
    Gen->>CV: get() → row_group_start_offset
    alt ORDERED and offset is not None
        Gen->>Gen: "_df_remaining = None"
        Gen->>IR: _index_range_at_offset(offset)
        IR-->>Gen: IndexRange(start + relative_offset, end)
        Gen->>Gen: _reset_batch_reader(num_records, record_offset)
    else batch_reader is None (fresh sync)
        Gen->>Gen: _reset_batch_reader(num_records)
    end
    Gen-->>RB: DataFrame
    RB->>CV: reset(token)
Loading

Reviews (5): Last reviewed commit: "Merge branch 'main' into nmulepati/fix-7..." | Re-trigger Greptile

nabinchha added 4 commits May 28, 2026 13:46
Companion to the existing IndexRange resume test. Locks in correct
behavior when the seed selection comes from PartitionBlock — its
to_index_range produces a contiguous range today, but nothing else
asserts that contract. The test crosses a cycle boundary inside the
partition (4 records over a 2-row partition) so it exercises both
the offset-into-partition branch and the relative_offset == 0
wraparound branch end-to-end.
@johnnygreco
Copy link
Copy Markdown
Contributor

Nice work on this one, @nabinchha; thanks for the careful resume regression coverage.

Summary

This PR preserves planned row-group start offsets across sync and async resume so ordered seed datasets continue at the correct seed row instead of replaying completed rows. The implementation matches the PR description: offsets are computed from the original row-group plan, threaded through scheduler/builder context, and consumed only by ordered seed readers while shuffle behavior remains unchanged.

Findings

No findings.

What Looks Good

  • The RowGroupResumePlan makes the async resume semantics explicit and keeps sparse row-group holes from shifting seed offsets.
  • The seed-reader offset logic handles both offset-into-range and full-cycle wrap cases, and the tests cover IndexRange, PartitionBlock, and extension across a cycle boundary.
  • Good cleanup around the sync _run_batch ContextVar lifecycle: tokens are reset in a single finally, including failures before the model usage snapshot.

Verdict

Ship it.


This review was generated by an AI assistant.

@nabinchha nabinchha merged commit f7c8809 into main Jun 1, 2026
61 checks passed
@nabinchha nabinchha deleted the nmulepati/fix-709-seed-resume-offset branch June 1, 2026 22:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Resume replays ordered seed rows after completed checkpoints

2 participants