Sort the documents according to the indexSort during the refresh time#21468
Sort the documents according to the indexSort during the refresh time#21468chaitanya588 wants to merge 11 commits into
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit bada5b6.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
29ca47a to
6a19587
Compare
6a19587 to
384f605
Compare
384f605 to
22386be
Compare
PR Code Suggestions ✨Latest suggestions up to 299486d Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 5d4b4ae
Suggestions up to commit 0b79df7
Suggestions up to commit 34c5114
Suggestions up to commit bada5b6
|
|
❌ Gradle check result for d82d0f1: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
d82d0f1 to
fed81b0
Compare
|
❌ Gradle check result for fed81b0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
2d16f26 to
28115b1
Compare
28115b1 to
bada5b6
Compare
|
❌ Gradle check result for bada5b6: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
…oring Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
… Parallelizing flushes in refresh flow via Flush Thread pool Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
…prepared for that Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Removes the legacy sort_small_file, sort_large_file, sort_and_rewrite_parquet, and sort_and_rewrite_parquet_from_chunks helpers that were resurrected during a rebase. The active sort path is SortingChunkedWriter + finalize_sorted_chunks, which performs eager sort-during-write plus a k-way merge at finalize. Also adapts the finalize_sorted_chunks call to merge_sorted's 7-arg signature on this branch (the lastminute branch carried the 6-arg pre-opensearch-project#21576 signature). Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
34c5114 to
0b79df7
Compare
|
❌ Gradle check result for 0b79df7: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
0b79df7 to
5d4b4ae
Compare
Summary
This PR enables physical document sorting by the index sort (e.g. timestamp) at refresh time across both Parquet (primary) and Lucene (secondary) data formats in the composite indexing engine. The Parquet writer sorts batches as they accumulate and emits a row-ID permutation on flush. That permutation is propagated to the Lucene secondary writer so its segment is reordered to match Parquet's new physical row order, keeping both formats aligned by row_id.
It also restructures the refresh flow with a cooperative flush queue so write threads share flush work with the refresh thread (backpressure), and cleans up the row-ID mapping API by unifying it into a single class for supporting both Merge and Refresh Flows.
Why
Without sort-on-refresh:
Range queries on the index sort field (typically @timestamp) can't take advantage of segment ordering for early-termination or block-skip optimizations.
Parquet row groups can't prune by min/max effectively when rows are inserted in arrival order.
Lucene and Parquet would diverge in physical row order, breaking the row_id 1:1 mapping the composite engine relies on for cross-format reads.
This change closes that gap by sorting eagerly during chunked Parquet writes and applying the same permutation to the Lucene segment via a custom MergePolicy + reorder pass + DocValues row-ID rewrite.
What's Changing
1. Sort-on-flush in the Parquet writer (Rust)
sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rsSortingChunkedWriterreplaces the simple Arrow IPC staging path whensort_columnsare configured.memory_threshold_bytes, read the IPC back, sort in memory, write a sorted Parquet chunk, then reopen a fresh IPC. Oversized single batches are sliced to fit the budget.finish(): flush trailing IPC, return the list of sorted Parquet chunks. A k-way merge stitches them into the final Parquet file, producing a flat row-ID permutationmapping[original_row_id] = new_row_id.___row_idto sequential0..Nso each chunk is self-consistent; original row IDs are tracked separately for the permutation.2. Permutation handoff Rust → Java
sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs,RustBridge.java,NativeParquetWriter.javaparquet_finalize_writernow returns two extra out-params:sort_perm_ptr_outandsort_perm_len_out, pointing to a heap-allocatedBox<[i64]>.parquet_free_row_id_mappingFFI to release the Rust-owned buffer after the Java side copies it.RustBridge.finalizeWriternow returns aWriterFinalizeResult(metadata, rowIdMapping)and constructs aPackedRowIdMapping(mappingArray, true)with reverse-lookup support, then frees the native buffer.3. RowIdMapping API unification (server core)
server/src/main/java/org/opensearch/index/engine/dataformat/RowIdMappinginterface gains:SINGLE_GEN = -1Lsentinel for single-generation flush-sort cases.getOldRowId(long newId)reverse lookup.isNewToOldSupported()capability check.size().PackedRowIdMappingnow handles both multi-generation merge mappings and single-generation flush permutations through two constructors. The legacyPackedSingleGenRowIdMappingis removed.FlushInputrecord carries the optionalRowIdMappingintoWriter.flush(FlushInput).Writer.flush()is replaced withflush(FlushInput);FlushInput.EMPTYis the no-op default.FileInfoscarriesrowIdMappingso flush results can hand the permutation to the parent writer.4. Sort propagation in CompositeWriter
sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.javaRowIdMapping→ pass it viaFlushInputto every secondary writer (Lucene).5. Sorted flush in the Lucene writer
sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.flush(FlushInput):RowIdMappingis present, validatesmapping.size() == docCount, configures a customReorderingMergePolicy, thenforceMerge(1)triggers a single physical reorder of the segment..sito declare the IndexSort (SortedNumericSortFieldon__row_id__) soaddIndexes(Directory…)on the shared writer sees matching sort metadata.ReorderingMergePolicy+ReorderingOneMergeuse the mapping as a LuceneSorter.DocMap(oldToNew/newToOld) to drive the physical reorder.LuceneWriterCodecgainsenableRowIdRewrite(), which wraps the codec'sDocValuesFormatwith the newLuceneWriterDocValuesFormat. This intercepts writes to the___row_idfield during the reorder merge and replaces values with sequential0..N, so reorder + ID rewrite happen in a single pass.6. Cooperative refresh + write-side backpressure
server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.javaflushQueueof writers pending flush, drained cooperatively by the refresh thread and write threads.CountDownLatch(activeFlushLatch) tracks per-cycle completion; both refresh and write threads decrement it.preIndex()(gated byindex.check_pending_flush.enabled) lets a write thread pull a writer offflushQueueand flush it inline before its own indexing op. This provides backpressure on indexing while helping drain flush work.pendingSegments; their writers are deferred-closed viapendingWritersToClose(since temp dirs must survive until refresh runsaddIndexes).7. Tests
LuceneWriterSortedFlushTests— validates sorted-flush with mapping handoff.LuceneWriterDocValuesFormatTests— verifies___row_idis rewritten to sequential values.CompositeWriterSortPropagationTests— verifies the primary→secondaryFlushInputhandoff.CompositeRefreshSortedIT— end-to-end IT covering sorted refresh across the composite engine.End to End Testing Scenarios:
CompositeRefreshSortedIT: A detailed list of correctness tests which ensures both primary and secondary formats correct end to end (with the final ordering .
testSortedRefreshProducesSortedParquet: (Parquet-only) Indexes 5 unsorted docs, flushes once. Asserts only parquet format is present, row count is 5, Parquet rows obey the sort order, and row_id in Parquet is sequential starting at 0.
testSortedRefreshWithLuceneSecondary: (Parquet + Lucene) Indexes 5 docs that also have a non-sort-key keyword field tag, flushes once. Asserts both formats are present, Parquet row count and sort order, Lucene doc count, sequential row_id doc values in Lucene, and cross-format alignment: at every physical position, Parquet row i and Lucene doc i agree on name and tag (where tag confirms non-sort fields are also co-located by the row-id rewrite).
testMultipleSortedRefreshesProduceIndependentlySortedSegments: (Parquet-only) Indexes one batch, flushes; indexes a second batch, flushes. Asserts the snapshot has two segments and that each segment is independently sorted (sort is per-segment, not cross-segment).
testSortedRefreshWithNulls: (Parquet + Lucene) Mix of docs with and without age to
exercise null handling. Asserts both formats are present, Parquet sort obeys nulls-first on age and nulls-last on name, and Lucene row_id is sequential — confirming the row-id rewrite driven by the Parquet sort permutation works even when nulls participate in the key.
testSortedRefreshWithLargeBatch: (Parquet + Lucene) Indexes 200 docs to push past the chunked-sort path (default sort_batch_size = 65536 is much larger, so this mainly stresses non-trivial volume across both formats). Asserts row count, sort order, Lucene doc count, and sequential Lucene row_id.
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.