Skip to content

Add merge support for Parquet data format plugin#21079

Merged
mgodwan merged 15 commits intoopensearch-project:mainfrom
Shailesh-Kumar-Singh:merge_parquet
Apr 30, 2026
Merged

Add merge support for Parquet data format plugin#21079
mgodwan merged 15 commits intoopensearch-project:mainfrom
Shailesh-Kumar-Singh:merge_parquet

Conversation

@Shailesh-Kumar-Singh
Copy link
Copy Markdown
Contributor

@Shailesh-Kumar-Singh Shailesh-Kumar-Singh commented Apr 2, 2026

Description

Description

Adds streaming k-way merge for the Parquet data format plugin, along with writer sort-on-finalize support. Produces a single globally-sorted Parquet file from N input segment files.

Writer Sort-on-Finalize

When a segment file is finalized, accumulated unsorted data must be sorted before the file is ready for queries:

  • Small files (≤ 32 MB): Sorted in-memory using Arrow's built-in lexicographic sort
  • Large files (> 32 MB): External merge sort — each batch is sorted individually and written as a temporary chunk file, then k-way merged into the final sorted output

This bounds memory to a single batch regardless of total file size.

Index Sort Configuration

The merge respects OpenSearch's index.sort.* settings:

  • Sort columns: Multi-column sort via index.sort.field — supports long, date, integer, double, float, keyword, and their variants. Integer/float types are widened to 64-bit for uniform comparison.
  • Reverse sort: Per-column ascending/descending direction via index.sort.order.
  • Null handling: Explicit null-first / null-last via index.sort.missing (OpenSearch defaults to nulls-last regardless of sort direction). Uses explicit null representations rather than sentinel values to avoid collision with real data.

When no sort columns are configured, files are concatenated sequentially (unsorted merge).

K-Way Merge — Three-Tier Cascade

A min-heap tracks the current sort value of each input file. The algorithm pops the smallest entry and decides how much data to emit using three tiers, choosing the coarsest granularity possible:

  1. Tier 1 — Drain: When only one file remains, stream all its batches directly to output. Zero comparisons.
  2. Tier 2 — Whole-Batch Emit: If the last row of the current batch ≤ the heap top (next file's smallest value), emit the entire batch as a zero-copy Arrow slice. Repeat for consecutive batches. This is the dominant tier when sort ranges
    don't overlap — one comparison per 100K-row batch instead of 100K individual comparisons.
  3. Tier 3 — Binary Search: When a batch partially overlaps with the heap top, binary search finds the exact boundary row (O(log₂ B) ≈ 17 extractions for 100K rows). Emit the prefix, advance the cursor, re-insert into the heap.

Pipelined Write Path

Three concurrent stages so merge computation, compression, and disk IO overlap:

  1. Merge loop (calling thread) — k-way merge produces Arrow record batch slices, pads them to the union schema, and buffers them in memory. When ~1M rows accumulate, triggers a flush to Stage 2.
  2. Column encoding (Rayon work-stealing pool, 4 threads) — on flush, all buffered batches are concatenated into a single large batch, a sequential ___row_id column is appended, and the batch is decomposed into individual leaf columns.
    Each column is then compressed and encoded in parallel across 4 threads via work-stealing — columns that finish quickly free their threads to pick up remaining work, naturally load-balancing across columns of varying complexity.
  3. Disk write (Tokio async runtime) — encoded row groups are sent over a bounded channel (capacity 2) to a background task, which dispatches each write to a blocking thread through a rate-limited wrapper (default 20 MB/s) to avoid
    starving concurrent indexing. The background task does not wait for the current write to complete before accepting the next row group — so disk write for row group N happens concurrently with encoding of row group N+1.

The bounded channel between Stage 2 and Stage 3 provides natural backpressure — at most 2 encoded row groups can be queued. If the merge loop produces faster than disk can absorb, the channel fills and the merge loop blocks, preventing
unbounded memory growth. Peak memory ~300-350 MB regardless of input size.

Schema Evolution & Row ID

Input files may have different column sets due to mapping changes. Union schema is computed once; batches from files missing a column get null-padded. ___row_id is stripped from inputs and rewritten with globally sequential values [0, total_rows) for cross-format joins.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 2, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit b999a66.

PathLineSeverityDescription
sandbox/libs/dataformat-native/rust/Cargo.toml61highNew Rust dependency 'rayon = "1.10"' added to workspace Cargo.toml. Supply chain change — verify artifact authenticity and crate origin before merging.
sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml22highNew Rust dependency 'rayon' added via workspace reference. Supply chain change — verify crate is the canonical rayon parallel-iteration library and that the workspace pin resolves to an expected version.
sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml23highNew Rust dependency 'tokio' added via workspace reference. This introduces an async runtime dependency. Supply chain change — verify workspace-pinned version and that no postinstall/build scripts are introduced.
sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml15highMultiple existing Rust dependencies removed (arrow-array, arrow-schema, arrow-buffer, log, chrono, mimalloc). Dependency removals can mask version downgrade or substitution attacks elsewhere in the workspace. Verify the workspace-level versions for the retained 'arrow' umbrella crate now cover all previously explicit sub-crates.
sandbox/plugins/parquet-data-format/src/main/rust/tests/merge_integration_tests.rs69lowDeveloper-specific absolute path '/Users/shaikumm/Downloads/files' hardcoded as INPUT_DIR in committed test code. While the test guards with an existence check, committing a personal home-directory path leaks contributor identity and suggests this test was not intended for the shared repository.

The table above displays the top 10 most important findings.

Total: 5 | Critical: 0 | High: 4 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/native_settings.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/parquet_merge_stream.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/parquet_merge_stream.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/parquet_merge_stream.rs Outdated
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
Comment thread gradle.properties Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/io_task.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/io_task.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/heap.rs
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs Outdated
Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/merge/heap.rs
@mgodwan
Copy link
Copy Markdown
Member

mgodwan commented Apr 29, 2026

Thanks for the changes @Shailesh-Kumar-Singh
Like the overall documentation, and clean separation of concerns.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit bb33d2f

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 40d152e

Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d97d3ce

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for d97d3ce: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment thread sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 37469ca

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 436ec87

Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 322aded

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 322aded: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 322aded: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 322aded: SUCCESS

Copy link
Copy Markdown
Member

@mgodwan mgodwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing some additional tasks to be covered here:

@mgodwan mgodwan merged commit 81ec182 into opensearch-project:main Apr 30, 2026
21 of 26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants