Skip to content

feat: Phase 3f — end-to-end integration test + TempDirectory lifetime fix#6356

Open
g-talbot wants to merge 7 commits intogtt/parquet-merge-pipeline-3defrom
gtt/parquet-merge-pipeline-3f
Open

feat: Phase 3f — end-to-end integration test + TempDirectory lifetime fix#6356
g-talbot wants to merge 7 commits intogtt/parquet-merge-pipeline-3defrom
gtt/parquet-merge-pipeline-3f

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Summary

Stacked on #6354 (Phase 3d+3e). End-to-end integration test and a bug fix discovered during testing.

Integration test

Exercises the full merge actor chain in-process with RamStorage and mock metastore:

  1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series, sort schema KV metadata, window metadata)
  2. Uploads to RamStorage
  3. Seeds ParquetMergePipeline with split metadata (merge_factor=2)
  4. Verifies the pipeline plans a merge, downloads, merges, uploads, and publishes
  5. Asserts publish_metrics_splits called with correct replaced_split_ids = ["split-a", "split-b"]

Bug fix: TempDirectory lifetime

Found and fixed a bug where the merge executor's scratch TempDirectory was dropped before the uploader's async upload task could read the merged files. Added _scratch_directory_opt: Option<TempDirectory> to ParquetSplitBatch so the directory stays alive until the upload task completes.

Test plan

  • Integration test passes (full merge pipeline e2e)
  • 27 existing tests pass (uploader, packager, publisher, planner, scheduler, pipeline)
  • cargo clippy clean
  • Compiles with and without metrics feature

🤖 Generated with Claude Code

@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from e220505 to 0af0572 Compare April 29, 2026 14:02
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from d82d72d to 92cb5ed Compare April 29, 2026 15:31
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 1e9ff6c to 372f4eb Compare April 29, 2026 15:31
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 92cb5ed to b6f8bcc Compare April 29, 2026 18:11
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 372f4eb to 7db8a68 Compare April 29, 2026 18:11
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from b6f8bcc to d296774 Compare April 29, 2026 18:16
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 7db8a68 to 4a4679e Compare April 29, 2026 18:16
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from d296774 to ededb89 Compare April 29, 2026 18:24
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 4a4679e to 59f9690 Compare April 29, 2026 18:26
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from ededb89 to 761b379 Compare April 29, 2026 18:42
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 59f9690 to f8155a6 Compare April 29, 2026 18:43
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 761b379 to 9b3769e Compare April 29, 2026 18:51
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from f8155a6 to 74ebf40 Compare April 29, 2026 18:51
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 9b3769e to 927dc9f Compare April 29, 2026 19:05
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 11766e6 to 2933c6e Compare April 29, 2026 19:06
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 927dc9f to c51a84b Compare April 29, 2026 20:54
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 2933c6e to 9bbfd34 Compare April 29, 2026 20:54
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from c51a84b to 804e384 Compare April 30, 2026 02:30
g-talbot and others added 7 commits April 29, 2026 22:30
…se 3f)

Integration test that exercises the full merge actor chain:
1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series,
   sort schema KV metadata, and window metadata)
2. Uploads to RamStorage
3. Seeds ParquetMergePipeline with split metadata (merge_factor=2)
4. Verifies the pipeline plans and executes a merge
5. Asserts publish_metrics_splits called with correct replaced_split_ids

Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to
ParquetSplitBatch so the merge executor's scratch directory stays alive
until the uploader finishes reading the merged files. Without this, the
temp directory was cleaned up between the executor handler returning and
the uploader's async upload task reading the files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…y comment

Review findings:

1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest
   uses custom compression, merge output would differ. Now threaded from
   ParquetMergePipelineParams through to the executor.

2. Fixed misleading comment claiming "planner will eventually re-plan"
   on merge failure. In reality, input splits are drained by operations()
   and won't be re-planned until the pipeline restarts with metastore
   re-seeding (not yet implemented — TODO added).

3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn,
   matching the Tantivy MergePipeline pattern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…riter_config

Review findings addressed:

1. fetch_immature_splits(): on pipeline respawn after crash, queries the
   metastore for published Parquet splits so the planner can re-plan
   merges that were in-flight during the crash. On first spawn, uses the
   initial splits from the IndexingService (same as Tantivy pattern).

2. ParquetWriterConfig threaded from pipeline params to executor so merge
   output uses the same compression as ingest.

3. Fixed misleading "planner will eventually re-plan" comment on merge
   failure — honest about the limitation that failed splits wait for
   respawn re-seeding.

4. Added index_uid to ParquetMergePipelineParams for metastore queries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches the Tantivy MergePipeline pattern:
- ObservableState is now MergeStatistics (was unit type)
- perform_observe() collects counters from uploader + publisher handles
- Tracks generation, num_spawn_attempts, num_ongoing_merges,
  num_uploaded_splits, num_published_splits
- previous_generations_statistics preserved across respawns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both Tantivy and Parquet merge scheduling used identical score logic
(prefer merges that reduce more splits for less total bytes). Extracted
the core arithmetic into score_merge(num_splits, total_bytes) and have
both score_merge_operation() and score_parquet_merge_operation() call it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CODE_STYLE.md: comments should convey intent, not implementation.
Added explanations for num_merge_ops lineage, known_split_ids rebuild
heuristic, output dir isolation, empty merge handling, scratch dir
lifetime, permit Drop safety, publisher setter ordering, and feedback
loop guard conditions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line

Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs
from IndexingSettings when constructing the ingest pipeline's TableConfig
(was hardcoded to defaults).

Adds parquet_merge_policy_from_settings() that converts the config-layer
ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy,
paralleling merge_policy_from_settings() for Tantivy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 1396fd4 to dd1f8fc Compare April 30, 2026 02:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant