Add merge support for Parquet data format plugin#21079
Add merge support for Parquet data format plugin#21079mgodwan merged 15 commits intoopensearch-project:mainfrom
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit b999a66.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
6ba3500 to
d4f406f
Compare
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
d4f406f to
d5286a6
Compare
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
|
Thanks for the changes @Shailesh-Kumar-Singh |
|
Persistent review updated to latest commit bb33d2f |
bb33d2f to
40d152e
Compare
|
Persistent review updated to latest commit 40d152e |
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
40d152e to
d97d3ce
Compare
|
Persistent review updated to latest commit d97d3ce |
|
❌ Gradle check result for d97d3ce: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
|
Persistent review updated to latest commit 37469ca |
|
Persistent review updated to latest commit 436ec87 |
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
436ec87 to
322aded
Compare
|
Persistent review updated to latest commit 322aded |
|
❌ Gradle check result for 322aded: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 322aded: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
There was a problem hiding this comment.
Capturing some additional tasks to be covered here:
- Tests with randomization for sorting across different data types.
- #21079 (comment)
- #21079 (comment)
- https://github.com/opensearch-project/OpenSearch/pull/21079/changes#r3169979457
- Add back streaming crc writer tests
- tokio runtime is still doing both io/cpu work. we need to look into decoupling of the same.
Description
Description
Adds streaming k-way merge for the Parquet data format plugin, along with writer sort-on-finalize support. Produces a single globally-sorted Parquet file from N input segment files.
Writer Sort-on-Finalize
When a segment file is finalized, accumulated unsorted data must be sorted before the file is ready for queries:
This bounds memory to a single batch regardless of total file size.
Index Sort Configuration
The merge respects OpenSearch's
index.sort.*settings:index.sort.field— supportslong,date,integer,double,float,keyword, and their variants. Integer/float types are widened to 64-bit for uniform comparison.index.sort.order.index.sort.missing(OpenSearch defaults to nulls-last regardless of sort direction). Uses explicit null representations rather than sentinel values to avoid collision with real data.When no sort columns are configured, files are concatenated sequentially (unsorted merge).
K-Way Merge — Three-Tier Cascade
A min-heap tracks the current sort value of each input file. The algorithm pops the smallest entry and decides how much data to emit using three tiers, choosing the coarsest granularity possible:
don't overlap — one comparison per 100K-row batch instead of 100K individual comparisons.
Pipelined Write Path
Three concurrent stages so merge computation, compression, and disk IO overlap:
___row_idcolumn is appended, and the batch is decomposed into individual leaf columns.Each column is then compressed and encoded in parallel across 4 threads via work-stealing — columns that finish quickly free their threads to pick up remaining work, naturally load-balancing across columns of varying complexity.
starving concurrent indexing. The background task does not wait for the current write to complete before accepting the next row group — so disk write for row group N happens concurrently with encoding of row group N+1.
The bounded channel between Stage 2 and Stage 3 provides natural backpressure — at most 2 encoded row groups can be queued. If the merge loop produces faster than disk can absorb, the channel fills and the merge loop blocks, preventing
unbounded memory growth. Peak memory ~300-350 MB regardless of input size.
Schema Evolution & Row ID
Input files may have different column sets due to mapping changes. Union schema is computed once; batches from files missing a column get null-padded.
___row_idis stripped from inputs and rewritten with globally sequential values[0, total_rows)for cross-format joins.