Skip to content

Sort the documents according to the indexSort during the refresh time#21468

Open
chaitanya588 wants to merge 11 commits into
opensearch-project:mainfrom
chaitanya588:flush_with_sorting
Open

Sort the documents according to the indexSort during the refresh time#21468
chaitanya588 wants to merge 11 commits into
opensearch-project:mainfrom
chaitanya588:flush_with_sorting

Conversation

@chaitanya588
Copy link
Copy Markdown
Contributor

@chaitanya588 chaitanya588 commented May 4, 2026

Summary

This PR enables physical document sorting by the index sort (e.g. timestamp) at refresh time across both Parquet (primary) and Lucene (secondary) data formats in the composite indexing engine. The Parquet writer sorts batches as they accumulate and emits a row-ID permutation on flush. That permutation is propagated to the Lucene secondary writer so its segment is reordered to match Parquet's new physical row order, keeping both formats aligned by row_id.

It also restructures the refresh flow with a cooperative flush queue so write threads share flush work with the refresh thread (backpressure), and cleans up the row-ID mapping API by unifying it into a single class for supporting both Merge and Refresh Flows.

Why

Without sort-on-refresh:

Range queries on the index sort field (typically @timestamp) can't take advantage of segment ordering for early-termination or block-skip optimizations.
Parquet row groups can't prune by min/max effectively when rows are inserted in arrival order.
Lucene and Parquet would diverge in physical row order, breaking the row_id 1:1 mapping the composite engine relies on for cross-format reads.
This change closes that gap by sorting eagerly during chunked Parquet writes and applying the same permutation to the Lucene segment via a custom MergePolicy + reorder pass + DocValues row-ID rewrite.

What's Changing

1. Sort-on-flush in the Parquet writer (Rust)

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs

  • New SortingChunkedWriter replaces the simple Arrow IPC staging path when sort_columns are configured.
  • Strategy: write incoming batches to an Arrow IPC staging file; when accumulated bytes plus the next batch would exceed memory_threshold_bytes, read the IPC back, sort in memory, write a sorted Parquet chunk, then reopen a fresh IPC. Oversized single batches are sliced to fit the budget.
  • On finish(): flush trailing IPC, return the list of sorted Parquet chunks. A k-way merge stitches them into the final Parquet file, producing a flat row-ID permutation mapping[original_row_id] = new_row_id.
  • The chunk file rewrites ___row_id to sequential 0..N so each chunk is self-consistent; original row IDs are tracked separately for the permutation.

2. Permutation handoff Rust → Java

sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs, RustBridge.java, NativeParquetWriter.java

  • parquet_finalize_writer now returns two extra out-params: sort_perm_ptr_out and sort_perm_len_out, pointing to a heap-allocated Box<[i64]>.
  • New parquet_free_row_id_mapping FFI to release the Rust-owned buffer after the Java side copies it.
  • RustBridge.finalizeWriter now returns a WriterFinalizeResult(metadata, rowIdMapping) and constructs a PackedRowIdMapping(mappingArray, true) with reverse-lookup support, then frees the native buffer.

3. RowIdMapping API unification (server core)

server/src/main/java/org/opensearch/index/engine/dataformat/

  • RowIdMapping interface gains:
    • SINGLE_GEN = -1L sentinel for single-generation flush-sort cases.
    • getOldRowId(long newId) reverse lookup.
    • isNewToOldSupported() capability check.
    • size().
  • PackedRowIdMapping now handles both multi-generation merge mappings and single-generation flush permutations through two constructors. The legacy PackedSingleGenRowIdMapping is removed.
  • New FlushInput record carries the optional RowIdMapping into Writer.flush(FlushInput). Writer.flush() is replaced with flush(FlushInput); FlushInput.EMPTY is the no-op default.
  • FileInfos carries rowIdMapping so flush results can hand the permutation to the parent writer.

4. Sort propagation in CompositeWriter

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java

  • Flush order is now: primary (Parquet) first → capture its RowIdMapping → pass it via FlushInput to every secondary writer (Lucene).

5. Sorted flush in the Lucene writer

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/

  • LuceneWriter.flush(FlushInput):
    • When a RowIdMapping is present, validates mapping.size() == docCount, configures a custom ReorderingMergePolicy, then forceMerge(1) triggers a single physical reorder of the segment.
    • After commit + close, rewrites the segment's .si to declare the IndexSort (SortedNumericSortField on __row_id__) so addIndexes(Directory…) on the shared writer sees matching sort metadata.
  • ReorderingMergePolicy + ReorderingOneMerge use the mapping as a Lucene Sorter.DocMap (oldToNew / newToOld) to drive the physical reorder.
  • LuceneWriterCodec gains enableRowIdRewrite(), which wraps the codec's DocValuesFormat with the new LuceneWriterDocValuesFormat. This intercepts writes to the ___row_id field during the reorder merge and replaces values with sequential 0..N, so reorder + ID rewrite happen in a single pass.

6. Cooperative refresh + write-side backpressure

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java

  • New shared flushQueue of writers pending flush, drained cooperatively by the refresh thread and write threads.
  • A CountDownLatch (activeFlushLatch) tracks per-cycle completion; both refresh and write threads decrement it.
  • New preIndex() (gated by index.check_pending_flush.enabled) lets a write thread pull a writer off flushQueue and flush it inline before its own indexing op. This provides backpressure on indexing while helping drain flush work.
  • Inline-flushed segments are queued in pendingSegments; their writers are deferred-closed via pendingWritersToClose (since temp dirs must survive until refresh runs addIndexes).
  • Refresh thread waits on the latch, drains both pending queues, then runs catalog commit. Added timing logs around flushAll, engine refresh, and catalog commit.

7. Tests

  • LuceneWriterSortedFlushTests — validates sorted-flush with mapping handoff.
  • LuceneWriterDocValuesFormatTests — verifies ___row_id is rewritten to sequential values.
  • CompositeWriterSortPropagationTests — verifies the primary→secondary FlushInput handoff.
  • CompositeRefreshSortedIT — end-to-end IT covering sorted refresh across the composite engine.

End to End Testing Scenarios:

CompositeRefreshSortedIT: A detailed list of correctness tests which ensures both primary and secondary formats correct end to end (with the final ordering .

  1. testSortedRefreshProducesSortedParquet: (Parquet-only) Indexes 5 unsorted docs, flushes once. Asserts only parquet format is present, row count is 5, Parquet rows obey the sort order, and row_id in Parquet is sequential starting at 0.

  2. testSortedRefreshWithLuceneSecondary: (Parquet + Lucene) Indexes 5 docs that also have a non-sort-key keyword field tag, flushes once. Asserts both formats are present, Parquet row count and sort order, Lucene doc count, sequential row_id doc values in Lucene, and cross-format alignment: at every physical position, Parquet row i and Lucene doc i agree on name and tag (where tag confirms non-sort fields are also co-located by the row-id rewrite).

  3. testMultipleSortedRefreshesProduceIndependentlySortedSegments: (Parquet-only) Indexes one batch, flushes; indexes a second batch, flushes. Asserts the snapshot has two segments and that each segment is independently sorted (sort is per-segment, not cross-segment).

  4. testSortedRefreshWithNulls: (Parquet + Lucene) Mix of docs with and without age to
    exercise null handling. Asserts both formats are present, Parquet sort obeys nulls-first on age and nulls-last on name, and Lucene row_id is sequential — confirming the row-id rewrite driven by the Parquet sort permutation works even when nulls participate in the key.

  5. testSortedRefreshWithLargeBatch: (Parquet + Lucene) Indexes 200 docs to push past the chunked-sort path (default sort_batch_size = 65536 is much larger, so this mainly stresses non-trivial volume across both formats). Asserts row count, sort order, Lucene doc count, and sequential Lucene row_id.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit bada5b6.

PathLineSeverityDescription
sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java272mediumMemorySegment.ofAddress(permAddr).reinterpret(permLen * ValueLayout.JAVA_LONG.byteSize()) constructs an unbounded native memory segment from a raw address and length returned by the Rust library. No bounds validation is performed in Java before the reinterpret — the entire trust chain rests on the native library. If the native code were tampered with or the library swapped (supply chain), this could be used to read arbitrary process memory. This follows existing FFI patterns in the codebase but represents an inherent risk surface expansion.
server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java1664lowThe new preIndex() method allows write threads to invoke failEngine() via the flush failure path during cooperative flushing. This expands the engine-failure attack surface beyond the refresh thread: a writer whose flush() throws (e.g., due to a crafted exception from a compromised data format plugin) can now cause engine failure and shard unavailability from any indexing thread, not just during explicit refresh.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 1 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@chaitanya588 chaitanya588 reopened this May 7, 2026
@chaitanya588 chaitanya588 force-pushed the flush_with_sorting branch 4 times, most recently from 29ca47a to 6a19587 Compare May 8, 2026 14:12
Comment thread server/src/main/java/org/opensearch/index/engine/dataformat/RowIdMapping.java Outdated
@chaitanya588 chaitanya588 force-pushed the flush_with_sorting branch from 6a19587 to 384f605 Compare May 9, 2026 12:18
Comment thread server/src/main/java/org/opensearch/index/engine/dataformat/RowIdMapping.java Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 10, 2026

PR Code Suggestions ✨

Latest suggestions up to 299486d

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure latch countdown on flush failure

If writerToFlush.flush() throws an exception, the latch is never decremented,
causing the await loop below to hang indefinitely. Wrap the flush call in a
try-finally block to ensure flushLatch.countDown() is always called, even on
failure.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [805-836]

 Writer<?> writerToFlush;
 while ((writerToFlush = flushQueue.poll()) != null) {
     ensureOpen();
-    final long writerFlushStartNanos = System.nanoTime();
-    FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
-    ...
-    flushLatch.countDown();
+    try {
+        final long writerFlushStartNanos = System.nanoTime();
+        FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
+        ...
+    } finally {
+        flushLatch.countDown();
+    }
 }
Suggestion importance[1-10]: 9

__

Why: Critical correctness issue. If writerToFlush.flush() throws an exception, the flushLatch.countDown() at line 836 is never reached, causing the await loop at lines 842-854 to hang indefinitely. The suggestion to wrap the flush in try-finally ensures the latch is always decremented, preventing deadlock. This is a high-impact bug fix.

High
Prevent division by zero for empty batches

Division by zero can occur if num_rows is zero, causing a panic. Although the code
checks num_rows later, the division happens before that check. Add an early return
or guard clause to handle empty batches before performing the division.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [179-185]

+if num_rows == 0 {
+    return Ok(());
+}
 let bytes_per_row = incoming_batch_bytes / num_rows as u64;
 let rows_per_slice = std::cmp::max(
     1,
     (self.memory_threshold_bytes / bytes_per_row) as usize,
 );
Suggestion importance[1-10]: 8

__

Why: Critical safety issue. Division by zero at line 180 (incoming_batch_bytes / num_rows as u64) will panic if num_rows is 0. The early return guard is necessary and should be added before the division.

Medium
Prevent writer resource leaks on failure

The active polling loop checks engine state every second but doesn't drain the
flushQueue or handle orphaned writers if the engine fails mid-refresh. If a writer
flush fails on another thread, this loop could wait indefinitely. Add explicit queue
cleanup on engine failure to prevent resource leaks.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [842-854]

 while (flushLatch.getCount() > 0) {
     if (isClosed.get() || failedEngine.get() != null) {
+        // Drain remaining writers from queue to prevent leaks
+        Writer<?> orphan;
+        while ((orphan = flushQueue.poll()) != null) {
+            IOUtils.closeWhileHandlingException(orphan);
+        }
         throw new AlreadyClosedException("engine closed during refresh flush");
     }
     try {
         if (flushLatch.await(1, TimeUnit.SECONDS) == false) {
-            continue; // re-check isClosed / failedEngine
+            continue;
         }
     } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException("Refresh interrupted waiting for flush completion", e);
     }
 }
Suggestion importance[1-10]: 7

__

Why: Valid concern about orphaned writers in flushQueue when the engine fails mid-refresh. The suggestion to drain and close remaining writers on failure prevents resource leaks. However, the existing failEngine path already handles cleanup via IOUtils.closeWhileHandlingException in the outer catch block, so this is a defensive improvement rather than a critical fix.

Medium
Add bounds validation for permutation mapping

The permutation mapping construction silently skips invalid indices without logging
or error handling. If orig_idx >= total or pos >= total, data corruption could occur
silently. Add validation to ensure all indices are within bounds and log warnings
for out-of-bounds cases.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [670-680]

 let mut pos = 0usize;
 for chunk_ids in chunk_row_ids {
     for &original_row_id in chunk_ids {
         let orig_idx = original_row_id as usize;
-        if orig_idx < total && pos < total {
-            flat_mapping[orig_idx] = merge_output.mapping[pos];
+        if orig_idx >= total {
+            log_error!("Invalid original_row_id {} exceeds total {}", original_row_id, total);
+            continue;
         }
+        if pos >= total {
+            log_error!("Position {} exceeds total {} during permutation build", pos, total);
+            break;
+        }
+        flat_mapping[orig_idx] = merge_output.mapping[pos];
         pos += 1;
     }
 }
Suggestion importance[1-10]: 6

__

Why: Valid concern about silent data corruption. The bounds checks exist but adding logging for out-of-bounds cases would improve debuggability. However, the continue and break logic in the improved code differs from the original's silent skip behavior.

Low
Add null check for mapping

The validation logic checks flushInput.hasRowIdMapping() twice in nested conditions.
Extract flushInput.rowIdMapping() once at the start to avoid redundant calls and
improve readability. Also, validate mapping.size() before passing to
configureSortedMerge() to ensure the mapping is valid.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [250-273]

 if (flushInput.hasRowIdMapping()) {
-    // RowIdMapping shouldn't be available if index has sort configurations.
     Sort configuredIndexSort = indexWriter.getConfig().getIndexSort();
     if (configuredIndexSort != null) {
         throw new IllegalStateException(
             "RowIdMapping should not be available when child IndexWriter is configured with IndexSort ["
                 + configuredIndexSort
                 + "] for writer generation ["
                 + writerGeneration
                 + "]"
         );
     }
     RowIdMapping mapping = flushInput.rowIdMapping();
-    if (mapping.size() != docCount) {
+    if (mapping == null || mapping.size() != docCount) {
         throw new IllegalStateException(
             "RowIdMapping size ["
-                + mapping.size()
+                + (mapping != null ? mapping.size() : "null")
                 + "] does not match document count ["
                 + docCount
                 + "] for writer generation ["
                 + writerGeneration
                 + "]"
         );
     }
     configureSortedMerge(mapping);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion adds a null check for mapping, but flushInput.rowIdMapping() is guaranteed to return non-null when hasRowIdMapping() is true (per the FlushInput contract). The added null check is redundant and the error message change to include "null" is unnecessary.

Low
General
Optimize memory threshold check logic

The memory threshold check may cause premature flushes when current_chunk_bytes is
small but incoming_batch_bytes is large. Consider checking if the incoming batch
alone exceeds the threshold before flushing the current chunk, to avoid unnecessary
flush operations when the current chunk could still accommodate more data.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [162-166]

 if self.current_chunk_bytes > 0
+    && incoming_batch_bytes <= self.memory_threshold_bytes
     && self.current_chunk_bytes + incoming_batch_bytes > self.memory_threshold_bytes
 {
     self.flush_and_sort_chunk()?;
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion identifies a potential optimization but the logic is already handled by the subsequent batch slicing code path (lines 177-206). The current check is correct for the common case where batches fit within the threshold.

Low
Use Optional for safer not-found handling

The interface methods return -1 to indicate "not found", but this sentinel value
could be confused with valid negative row IDs in edge cases. Consider using Optional
or throwing a checked exception to make the "not found" case explicit and type-safe.

server/src/main/java/org/opensearch/index/engine/dataformat/RowIdMapping.java [31-39]

-long getNewRowId(long oldId, long oldGeneration);
+Optional<Long> getNewRowId(long oldId, long oldGeneration);
 ...
-long getOldRowId(long newId);
+Optional<Long> getOldRowId(long newId);
Suggestion importance[1-10]: 3

__

Why: While Optional<Long> is more idiomatic Java, changing the return type would break all existing implementations and callers. The -1 sentinel is documented and consistent with the codebase. This is a style preference rather than a correctness issue.

Low

Previous suggestions

Suggestions up to commit 5d4b4ae
CategorySuggestion                                                                                                                                    Impact
Possible issue
Use atomic file operations for metadata

Deleting the .si file before rewriting creates a window where the segment metadata
is missing. If the write fails after deletion, the segment becomes unreadable. Use
atomic file operations: write to a temporary file first, then atomically rename it
to replace the original, ensuring the segment remains valid even if the operation is
interrupted.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [467-474]

 private void rewriteSegmentInfoWithSort(SegmentInfos segmentInfos, SegmentCommitInfo segmentCommitInfo) throws IOException {
     ...
-    // Delete the existing .si file before rewriting
+    // Write to temporary file first for atomicity
     String siFileName = originalInfo.name + ".si";
+    String tempFileName = originalInfo.name + ".si.tmp";
+    
+    originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
+    
+    // Atomically replace the original file
     directory.deleteFile(siFileName);
-    
-    // Rewrite the .si file with sort metadata
-    originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
+    directory.rename(tempFileName, siFileName);
     ...
 }
Suggestion importance[1-10]: 9

__

Why: Deleting the .si file before rewriting creates a window where segment metadata is missing. If the write fails, the segment becomes unreadable. Using atomic operations (write-then-rename) prevents data corruption, a critical correctness issue.

High
Prevent data loss after chunk flush

After flushing a chunk, current_chunk_bytes is reset to 0, but the incoming batch is
not written to the new IPC writer. This means the batch that triggered the flush is
silently dropped. After calling flush_and_sort_chunk(), the incoming batch should be
written to the newly opened IPC writer.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [162-175]

 if self.current_chunk_bytes > 0
     && self.current_chunk_bytes + incoming_batch_bytes > self.memory_threshold_bytes
 {
     self.flush_and_sort_chunk()?;
+    // After flush, write the incoming batch to the new IPC writer
+    if incoming_batch_bytes <= self.memory_threshold_bytes {
+        if let Some(ref mut w) = self.current_ipc_writer {
+            w.write(batch)?;
+        }
+        self.current_chunk_bytes += incoming_batch_bytes;
+        self.current_rows += batch.num_rows();
+        self.total_rows += batch.num_rows();
+        return Ok(());
+    }
 }
 
 // If the batch itself fits within the threshold, write it directly.
 if incoming_batch_bytes <= self.memory_threshold_bytes {
     if let Some(ref mut w) = self.current_ipc_writer {
         w.write(batch)?;
     }
     self.current_chunk_bytes += incoming_batch_bytes;
     self.current_rows += batch.num_rows();
     self.total_rows += batch.num_rows();
 } else {
     // The batch alone exceeds the memory budget — slice it into pieces
     ...
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical bug. After flushing a chunk, the incoming batch that triggered the flush is not written to the new IPC writer, causing data loss. The suggestion correctly identifies this issue and provides a fix to write the batch after the flush.

High
Clean up leaked writers on failure

The active polling loop waits for flush completion but doesn't drain the flushQueue
on failure. If the engine closes or fails while writers are still in the queue,
those writers leak and their resources aren't cleaned up. Add cleanup logic to drain
and close any remaining writers in the queue when exiting due to engine closure or
failure.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [842-854]

 while (flushLatch.getCount() > 0) {
     if (isClosed.get() || failedEngine.get() != null) {
+        // Drain and close any remaining writers in the queue
+        Writer<?> remaining;
+        while ((remaining = flushQueue.poll()) != null) {
+            IOUtils.closeWhileHandlingException(remaining);
+        }
         throw new AlreadyClosedException("engine closed during refresh flush");
     }
     try {
         if (flushLatch.await(1, TimeUnit.SECONDS) == false) {
-            continue; // re-check isClosed / failedEngine
+            continue;
         }
     } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException("Refresh interrupted waiting for flush completion", e);
     }
 }
Suggestion importance[1-10]: 8

__

Why: The polling loop doesn't drain flushQueue on engine closure/failure, leaking writer resources. Adding cleanup prevents resource leaks and ensures proper shutdown, addressing a critical resource management issue.

Medium
Prevent double-close of flushed writers

The preIndex() method adds the flushed writer to pendingWritersToClose before the
flush completes successfully. If the flush throws an exception after this point, the
writer is both in the pending queue and closed in the catch block, leading to
potential double-close. Move the pendingWritersToClose.add() call to after the flush
succeeds to ensure writers are only queued for deferred close when flush completes
without error.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [1685-1703]

-private void preIndex() {
-    if (engineConfig.getIndexSettings().isCheckPendingFlushEnabled() == false) {
-        logger.trace("preIndex: check pending flush disabled, skipping");
-        return;
+try {
+    final long flushStartNanos = System.nanoTime();
+    FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
+    final long flushElapsedMs = TimeValue.nsecToMSec(System.nanoTime() - flushStartNanos);
+    
+    if (fileInfos.writerFilesMap().isEmpty() == false) {
+        Segment.Builder segmentBuilder = Segment.builder(writerToFlush.generation());
+        for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
+            ...
+            segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
+        }
+        pendingSegments.add(segmentBuilder.build());
     }
-    Writer<?> writerToFlush = flushQueue.poll();
-    if (writerToFlush == null) {
-        return;
-    }
-    try {
-        ...
-        pendingWritersToClose.add(writerToFlush);
-        ...
-    } catch (Exception e) {
-        ...
-        IOUtils.closeWhileHandlingException(writerToFlush);
-        failEngine("flush failed during preIndex for writer gen=" + writerToFlush.generation(), e);
-    } finally {
-        CountDownLatch latch = activeFlushLatch;
-        if (latch != null) {
-            latch.countDown();
-        }
-    }
+    
+    // Only queue for deferred close after successful flush
+    pendingWritersToClose.add(writerToFlush);
+    
+    logger.debug("preIndex: write thread flushed writer gen={} in [{}ms]", writerToFlush.generation(), flushElapsedMs);
+} catch (Exception e) {
+    ...
 }
Suggestion importance[1-10]: 8

__

Why: Adding the writer to pendingWritersToClose before flush completes can cause double-close if flush throws. Moving the add after successful flush prevents resource management errors and potential crashes.

Medium
Validate sort configuration before flush

The validation logic checks for conflicting sort configurations but occurs after
indexWriter.flush() has already been called. If the validation fails, the flush has
already modified the index state. Move this validation to the beginning of the
flush() method, before any state-changing operations, to fail fast and avoid partial
writes.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [250-261]

-if (flushInput.hasRowIdMapping()) {
-    // RowIdMapping shouldn't be available if index has sort configurations.
-    Sort configuredIndexSort = indexWriter.getConfig().getIndexSort();
-    if (configuredIndexSort != null) {
-        throw new IllegalStateException(
-            "RowIdMapping should not be available when child IndexWriter is configured with IndexSort ["
-                + configuredIndexSort
-                + "] for writer generation ["
-                + writerGeneration
-                + "]"
-        );
+public FileInfos flush(FlushInput flushInput) throws IOException {
+    if (docCount == 0) {
+        return FileInfos.empty();
     }
+    
+    // Validate sort configuration before any state changes
+    if (flushInput.hasRowIdMapping()) {
+        Sort configuredIndexSort = indexWriter.getConfig().getIndexSort();
+        if (configuredIndexSort != null) {
+            throw new IllegalStateException(
+                "RowIdMapping should not be available when child IndexWriter is configured with IndexSort ["
+                    + configuredIndexSort
+                    + "] for writer generation ["
+                    + writerGeneration
+                    + "]"
+            );
+        }
+    }
+    
+    long flushStartNanos = System.nanoTime();
+    logger.info(...);
+    indexWriter.flush();
     ...
 }
Suggestion importance[1-10]: 7

__

Why: The validation occurs after indexWriter.flush() has already modified state. Moving validation before state changes prevents partial writes on configuration errors, improving robustness.

Medium
General
Replace panic with error handling

The code uses expect() which will panic if the row_id column is not Int64. This
could crash the process if the schema is malformed or corrupted. Replace with proper
error handling that returns a descriptive error instead of panicking.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [259-277]

 let row_id_col_idx = self.schema.fields().iter().position(|f| f.name() == ROW_ID_COLUMN_NAME);
 let final_batch = if let Some(idx) = row_id_col_idx {
     let row_id_array = sorted_batch.column(idx)
         .as_any()
         .downcast_ref::<Int64Array>()
-        .expect("___row_id column must be Int64");
+        .ok_or_else(|| format!("__row_id__ column at index {} is not Int64", idx))?;
     let ids: Vec<i64> = (0..row_id_array.len())
         .map(|i| row_id_array.value(i))
         .collect();
     self.chunk_row_ids.push(ids);
 
-    // Rewrite ___row_id to sequential 0..N so the chunk file is self-consistent
+    // Rewrite __row_id__ to sequential 0..N so the chunk file is self-consistent
     let sequential_ids = Int64Array::from_iter_values(
         (0..sorted_batch.num_rows() as i64).map(|x| x)
     );
     let mut columns = sorted_batch.columns().to_vec();
     columns[idx] = Arc::new(sequential_ids);
     RecordBatch::try_new(self.schema.clone(), columns)?
 } else {
     sorted_batch
 };
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that expect() will panic if the __row_id__ column is not Int64, which could crash the process. Replacing it with ok_or_else() provides proper error handling and returns a descriptive error, improving robustness.

Medium
Ensure file handle closure before deletion

The IPC file handle should be explicitly closed before attempting to read it back.
While writer.finish() may flush data, the file handle might still be held open,
potentially causing issues on Windows or with certain filesystems. Add an explicit
drop(file) after reading to ensure the file handle is released before deletion.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [234-248]

 fn flush_and_sort_chunk(&mut self) -> Result<(), Box<dyn std::error::Error>> {
     use arrow::array::Int64Array;
 
     log_debug!(
         "flush_and_sort_chunk: chunk_idx={}, current_chunk_bytes={}, current_rows={}, row_id_memory_size={}",
         self.chunk_idx, self.current_chunk_bytes, self.current_rows, self.memory_size()
     );
 
     // Close the IPC writer
     if let Some(mut writer) = self.current_ipc_writer.take() {
         writer.finish()?;
     }
 
     let ipc_path = self.ipc_staging_path();
 
     // Read back the IPC file (still hot in page cache since we just wrote it)
-    let file = File::open(&ipc_path)?;
-    let reader = IpcFileReader::try_new(file, None)?;
-    let mut batches: Vec<RecordBatch> = Vec::new();
-    for batch_result in reader {
-        let batch = batch_result?;
-        if batch.num_rows() > 0 {
-            batches.push(batch);
+    let batches = {
+        let file = File::open(&ipc_path)?;
+        let reader = IpcFileReader::try_new(file, None)?;
+        let mut batches: Vec<RecordBatch> = Vec::new();
+        for batch_result in reader {
+            let batch = batch_result?;
+            if batch.num_rows() > 0 {
+                batches.push(batch);
+            }
         }
-    }
+        batches
+    };
 
     if batches.is_empty() {
         // Nothing to sort, just reopen
         let _ = std::fs::remove_file(&ipc_path);
         self.open_new_ipc()?;
         return Ok(());
     }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential file handle leak on Windows or certain filesystems. Wrapping the file reading in a block scope ensures the file handle is dropped before deletion, which is a good practice for cross-platform compatibility.

Medium
Suggestions up to commit 0b79df7
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure latch countdown on failure

If writerToFlush.flush() throws an exception, the writer is added to toClose but the
latch is not counted down, potentially causing the refresh thread to wait
indefinitely. Wrap the flush logic in a try-catch block to ensure the latch is
always decremented and the writer is closed even on failure.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [805-836]

 Writer<?> writerToFlush;
 while ((writerToFlush = flushQueue.poll()) != null) {
     ensureOpen();
-    final long writerFlushStartNanos = System.nanoTime();
-    FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
-    ...
-    toClose.add(writerToFlush);
-    ...
-    flushLatch.countDown();
+    try {
+        final long writerFlushStartNanos = System.nanoTime();
+        FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
+        ...
+        toClose.add(writerToFlush);
+        ...
+    } catch (Exception e) {
+        IOUtils.closeWhileHandlingException(writerToFlush);
+        failEngine("flush failed during refresh for writer gen=" + writerToFlush.generation(), e);
+        throw e;
+    } finally {
+        flushLatch.countDown();
+    }
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical concurrency bug. If flush() throws an exception, the latch is never decremented, causing the refresh thread to wait indefinitely. The suggestion correctly wraps the flush logic in try-finally to guarantee latch countdown, preventing deadlocks.

High
Drain flush queue on failure

The active polling loop waits for the flush latch but doesn't drain the flushQueue
on failure. If the engine closes or fails while writers are still in the queue,
those writers may leak resources. Ensure the queue is drained and writers are closed
when exiting due to engine closure or failure.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [842-854]

 while (flushLatch.getCount() > 0) {
     if (isClosed.get() || failedEngine.get() != null) {
+        // Drain remaining writers from queue and close them
+        Writer<?> remainingWriter;
+        while ((remainingWriter = flushQueue.poll()) != null) {
+            IOUtils.closeWhileHandlingException(remainingWriter);
+        }
         throw new AlreadyClosedException("engine closed during refresh flush");
     }
     try {
         if (flushLatch.await(1, TimeUnit.SECONDS) == false) {
-            continue; // re-check isClosed / failedEngine
+            continue;
         }
     } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException("Refresh interrupted waiting for flush completion", e);
     }
 }
Suggestion importance[1-10]: 8

__

Why: This is a critical resource leak fix. If the engine closes or fails while writers remain in flushQueue, those writers won't be closed, leading to resource leaks. The suggestion ensures proper cleanup on failure paths.

Medium
Validate row ID bounds during mapping

The permutation mapping construction silently skips invalid row IDs when orig_idx >=
total or pos >= total, which could mask data corruption or logic errors. Add
validation to detect and report these conditions, as they indicate a serious
inconsistency between the chunk row IDs and the merge output.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [664-679]

 let mut flat_mapping = vec![0i64; total];
 for i in 0..total {
     flat_mapping[i] = i as i64;
 }
 let mut pos = 0usize;
 for chunk_ids in chunk_row_ids {
     for &original_row_id in chunk_ids {
         let orig_idx = original_row_id as usize;
-        if orig_idx < total && pos < total {
-            flat_mapping[orig_idx] = merge_output.mapping[pos];
+        if orig_idx >= total {
+            return Err(format!("Invalid original_row_id {} exceeds total rows {}", original_row_id, total).into());
         }
+        if pos >= total {
+            return Err(format!("Position {} exceeds total rows {} during permutation build", pos, total).into());
+        }
+        flat_mapping[orig_idx] = merge_output.mapping[pos];
         pos += 1;
     }
 }
Suggestion importance[1-10]: 8

__

Why: This suggestion addresses a potential data corruption issue where invalid row IDs are silently skipped. Adding explicit validation with error returns would catch serious inconsistencies between chunk row IDs and merge output, making the code more robust and easier to debug.

Medium
Add null check for mapping

The validation logic checks mapping.size() != docCount after retrieving the mapping
from flushInput.rowIdMapping(). However, if mapping is null (which could happen if
hasRowIdMapping() returns true but rowIdMapping() returns null), this will cause a
NullPointerException. Add a null check for mapping before accessing mapping.size().

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [250-274]

 if (flushInput.hasRowIdMapping()) {
-    // RowIdMapping shouldn't be available if index has sort configurations.
     Sort configuredIndexSort = indexWriter.getConfig().getIndexSort();
     if (configuredIndexSort != null) {
         throw new IllegalStateException(
             "RowIdMapping should not be available when child IndexWriter is configured with IndexSort ["
                 + configuredIndexSort
                 + "] for writer generation ["
                 + writerGeneration
                 + "]"
         );
     }
     RowIdMapping mapping = flushInput.rowIdMapping();
+    if (mapping == null) {
+        throw new IllegalStateException(
+            "FlushInput indicates RowIdMapping is present but returned null for writer generation ["
+                + writerGeneration
+                + "]"
+        );
+    }
     if (mapping.size() != docCount) {
         throw new IllegalStateException(
             "RowIdMapping size ["
                 + mapping.size()
                 + "] does not match document count ["
                 + docCount
                 + "] for writer generation ["
                 + writerGeneration
                 + "]"
         );
     }
     configureSortedMerge(mapping);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential NullPointerException if rowIdMapping() returns null despite hasRowIdMapping() being true. Adding a null check improves robustness and prevents runtime failures.

Medium
General
Replace panic with error handling

The code uses expect() when downcasting the row ID column to Int64Array, which will
panic if the column type is incorrect. This could happen if the schema is modified
or corrupted. Replace with proper error handling that returns a descriptive error
instead of panicking.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [259-277]

 let row_id_col_idx = self.schema.fields().iter().position(|f| f.name() == ROW_ID_COLUMN_NAME);
 let final_batch = if let Some(idx) = row_id_col_idx {
     let row_id_array = sorted_batch.column(idx)
         .as_any()
         .downcast_ref::<Int64Array>()
-        .expect("___row_id column must be Int64");
+        .ok_or_else(|| format!("__row_id__ column at index {} is not Int64 type", idx))?;
     let ids: Vec<i64> = (0..row_id_array.len())
         .map(|i| row_id_array.value(i))
         .collect();
     self.chunk_row_ids.push(ids);
 
-    // Rewrite ___row_id to sequential 0..N so the chunk file is self-consistent
+    // Rewrite __row_id__ to sequential 0..N so the chunk file is self-consistent
     let sequential_ids = Int64Array::from_iter_values(
         (0..sorted_batch.num_rows() as i64).map(|x| x)
     );
     let mut columns = sorted_batch.columns().to_vec();
     columns[idx] = Arc::new(sequential_ids);
     RecordBatch::try_new(self.schema.clone(), columns)?
 } else {
     sorted_batch
 };
Suggestion importance[1-10]: 7

__

Why: Replacing expect() with proper error handling using ok_or_else() is a good practice that prevents panics and provides better error messages. This is a meaningful improvement for production code robustness, though the schema corruption scenario is unlikely.

Medium
Ensure reader is closed properly

The method assertRowIdsSequential is called within an assert statement, meaning it
only runs when assertions are enabled. However, it opens a DirectoryReader which
allocates resources. If an exception occurs during the assertion check, the reader
may not be properly closed, leading to resource leaks. Ensure the reader is closed
in a finally block or use try-with-resources properly.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [386-429]

 private boolean assertRowIdsSequential(Directory directory) {
-    try (DirectoryReader reader = DirectoryReader.open(directory)) {
+    DirectoryReader reader = null;
+    try {
+        reader = DirectoryReader.open(directory);
         assert reader.leaves().size() == 1 : "Expected exactly 1 leaf reader, got " + reader.leaves().size();
         LeafReader leaf = reader.leaves().get(0).reader();
         SortedNumericDocValues rowIdDV = leaf.getSortedNumericDocValues(LuceneDocumentInput.ROW_ID_FIELD);
         if (rowIdDV == null) {
             throw new AssertionError(
                 "Field [" + LuceneDocumentInput.ROW_ID_FIELD + "] missing from segment for writer generation [" + writerGeneration + "]"
             );
         }
         ...
+        return true;
     } catch (IOException e) {
         throw new AssertionError("Failed to verify ___row_id__ invariant for writer generation [" + writerGeneration + "]", e);
+    } finally {
+        IOUtils.closeWhileHandlingException(reader);
     }
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion addresses a potential resource leak in the assertion method. While the method already uses try-with-resources for DirectoryReader, making the cleanup explicit in a finally block adds defensive robustness, especially since this runs only when assertions are enabled.

Low
Add file existence check before reading

The IPC file is opened and read immediately after closing the writer without
verifying the file exists or handling potential I/O errors from concurrent
operations. Add explicit error handling for the file open operation and consider
checking file existence before attempting to read, especially since the file is
deleted shortly after in the empty case.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [217-240]

 fn flush_and_sort_chunk(&mut self) -> Result<(), Box<dyn std::error::Error>> {
     use arrow::array::Int64Array;
 
     log_debug!(
         "flush_and_sort_chunk: chunk_idx={}, current_chunk_bytes={}, current_rows={}, row_id_memory_size={}",
         self.chunk_idx, self.current_chunk_bytes, self.current_rows, self.memory_size()
     );
 
     // Close the IPC writer
     if let Some(mut writer) = self.current_ipc_writer.take() {
         writer.finish()?;
     }
 
     let ipc_path = self.ipc_staging_path();
 
+    // Verify file exists before attempting to read
+    if !std::path::Path::new(&ipc_path).exists() {
+        self.open_new_ipc()?;
+        return Ok(());
+    }
+
     // Read back the IPC file (still hot in page cache since we just wrote it)
-    let file = File::open(&ipc_path)?;
+    let file = File::open(&ipc_path)
+        .map_err(|e| format!("Failed to open IPC staging file {}: {}", ipc_path, e))?;
     let reader = IpcFileReader::try_new(file, None)?;
     let mut batches: Vec<RecordBatch> = Vec::new();
     for batch_result in reader {
         let batch = batch_result?;
         if batch.num_rows() > 0 {
             batches.push(batch);
         }
     }
 
     if batches.is_empty() {
         // Nothing to sort, just reopen
         let _ = std::fs::remove_file(&ipc_path);
         self.open_new_ipc()?;
         return Ok(());
     }
Suggestion importance[1-10]: 5

__

Why: The suggestion adds a file existence check before reading the IPC file, which improves robustness. However, the existing code already handles the empty batches case after reading, and the file should exist since it was just closed. The added error context in File::open is useful but not critical.

Low
Validate null values before emptiness

The null check for WriterFileSet values is performed after the emptiness check, but
the error message suggests this is a critical merge failure. This check should be
performed before any other validation to catch incomplete merges early and provide
clearer diagnostics about which format failed.

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java [558-563]

-// Check for null values (format participated but returned no result)
+// Check for null values first (format participated but returned no result)
 for (Map.Entry<DataFormat, WriterFileSet> entry : writerFileSetMap.entrySet()) {
     if (entry.getValue() == null) {
         throw new IllegalStateException("WriterFileSet is null for format [" + entry.getKey().name() + "] — merge was incomplete");
     }
 }
+if (writerFileSetMap.isEmpty()) {
+    throw new IllegalArgumentException("writerFileSetMap must not be empty");
+}
Suggestion importance[1-10]: 4

__

Why: The suggestion to check for null values before emptiness is a minor ordering improvement. Both checks are necessary, but checking nulls first provides slightly better diagnostics. The impact is minimal since both conditions are error cases.

Low
Suggestions up to commit 34c5114
CategorySuggestion                                                                                                                                    Impact
General
Prevent double-close of failed writers

When an exception occurs during preIndex(), the writer is closed via
closeWhileHandlingException() but also added to pendingWritersToClose before the
exception. This means the refresh thread will attempt to close an already-closed
writer, potentially logging spurious errors. Move the pendingWritersToClose.add()
call after the flush succeeds to ensure only successfully flushed writers are queued
for deferred close.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [1675-1720]

 private void preIndex() {
     if (engineConfig.getIndexSettings().isCheckPendingFlushEnabled() == false) {
         logger.trace("preIndex: check pending flush disabled, skipping");
         return;
     }
     Writer<?> writerToFlush = flushQueue.poll();
     if (writerToFlush == null) {
         return;
     }
     try {
-        ...
+        final long flushStartNanos = System.nanoTime();
+        FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
+        final long flushElapsedMs = TimeValue.nsecToMSec(System.nanoTime() - flushStartNanos);
+
+        if (fileInfos.writerFilesMap().isEmpty() == false) {
+            Segment.Builder segmentBuilder = Segment.builder(writerToFlush.generation());
+            for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
+                ...
+                segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
+            }
+            pendingSegments.add(segmentBuilder.build());
+        }
         pendingWritersToClose.add(writerToFlush);
-        ...
+        logger.debug("preIndex: write thread flushed writer gen={} in [{}ms]", writerToFlush.generation(), flushElapsedMs);
     } catch (Exception e) {
         ...
         IOUtils.closeWhileHandlingException(writerToFlush);
         failEngine("flush failed during preIndex for writer gen=" + writerToFlush.generation(), e);
     } finally {
-        CountDownLatch latch = activeFlushLatch;
-        if (latch != null) {
-            latch.countDown();
-        }
+        ...
     }
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a bug where pendingWritersToClose.add(writerToFlush) at line 1703 happens before the flush completes. If the flush throws an exception, the writer is closed via closeWhileHandlingException() at line 1712, but it's already in pendingWritersToClose, causing the refresh thread to attempt closing it again. Moving the add() call after the flush succeeds (line 1703 should move to after line 1701) prevents this double-close scenario, which is a correctness issue.

Medium
Fix race condition in latch polling

The active polling loop checks engine state every second but doesn't handle the case
where the latch countdown completes between the getCount() > 0 check and the await()
call. This creates a race condition where the thread could wait unnecessarily. Use
await() with timeout directly and check the return value to avoid the race.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [842-854]

-while (flushLatch.getCount() > 0) {
+while (true) {
     if (isClosed.get() || failedEngine.get() != null) {
         throw new AlreadyClosedException("engine closed during refresh flush");
     }
     try {
-        if (flushLatch.await(1, TimeUnit.SECONDS) == false) {
-            continue; // re-check isClosed / failedEngine
+        if (flushLatch.await(1, TimeUnit.SECONDS)) {
+            break; // all flushes completed
         }
+        // timeout expired, loop to re-check engine state
     } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException("Refresh interrupted waiting for flush completion", e);
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition in the latch polling loop. The improved code eliminates the getCount() > 0 check and relies solely on await() with timeout, which is cleaner and avoids the race between checking the count and waiting. This is a valid improvement to the synchronization logic, though the impact is moderate since the original code would only cause an extra 1-second delay in the worst case.

Medium
Handle missing segment file gracefully

The method deletes the .si file before rewriting but doesn't verify the file exists
first. If the file is missing (corrupted state or concurrent deletion), deleteFile()
will throw NoSuchFileException, masking the root cause. Check file existence before
deletion or catch and handle the exception explicitly to provide better diagnostics.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [446-490]

 private void rewriteSegmentInfoWithSort(SegmentInfos segmentInfos, SegmentCommitInfo segmentCommitInfo) throws IOException {
     SegmentInfo originalInfo = segmentCommitInfo.info;
     Sort sort = new Sort(new SortedNumericSortField(LuceneDocumentInput.ROW_ID_FIELD, SortField.Type.LONG));
     ...
     String siFileName = originalInfo.name + ".si";
-    directory.deleteFile(siFileName);
+    try {
+        directory.deleteFile(siFileName);
+    } catch (java.nio.file.NoSuchFileException e) {
+        logger.warn("Segment info file {} not found during rewrite for generation {}", siFileName, writerGeneration);
+    }
     originalInfo.getCodec().segmentInfoFormat().write(directory, sortedInfo, IOContext.DEFAULT);
     ...
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that directory.deleteFile(siFileName) at line 469 could throw NoSuchFileException if the file is missing. Adding a try-catch to handle this case gracefully with a warning log improves robustness. However, the impact is moderate since this scenario (missing .si file) would indicate a corrupted state that might require broader handling beyond just logging a warning.

Low
Possible issue
Validate row ID bounds during mapping

The silent skip when orig_idx >= total could hide data corruption issues where row
IDs are out of bounds. Add logging or return an error when this condition occurs to
detect and diagnose invalid row ID mappings during sort operations.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [622-627]

 for (new_pos, &old_row_id) in ids.iter().enumerate() {
     let orig_idx = old_row_id as usize;
-    if orig_idx < total {
-        mapping[orig_idx] = new_pos as i64;
+    if orig_idx >= total {
+        return Err(format!("Invalid row ID {} at position {} exceeds total rows {}", old_row_id, new_pos, total).into());
     }
+    mapping[orig_idx] = new_pos as i64;
 }
Suggestion importance[1-10]: 8

__

Why: Excellent catch. Silently skipping out-of-bounds row IDs (line 625) could hide serious data corruption issues. The suggestion to return an error when orig_idx >= total would help detect invalid row ID mappings early, preventing silent data loss or corruption in the permutation mapping.

Medium
Prevent integer overflow in memory calculation

The memory threshold check should account for potential integer overflow when adding
current_chunk_bytes and incoming_batch_bytes. Use checked_add or saturating_add to
prevent overflow, which could cause the condition to incorrectly evaluate and skip
necessary chunk flushes.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [162-166]

 if self.current_chunk_bytes > 0
-    && self.current_chunk_bytes + incoming_batch_bytes > self.memory_threshold_bytes
+    && self.current_chunk_bytes.saturating_add(incoming_batch_bytes) > self.memory_threshold_bytes
 {
     self.flush_and_sort_chunk()?;
 }
Suggestion importance[1-10]: 7

__

Why: Valid concern about potential integer overflow when adding current_chunk_bytes and incoming_batch_bytes. Using saturating_add is a good defensive practice to prevent overflow from causing incorrect threshold checks, which could lead to memory issues or incorrect chunking behavior.

Medium
Replace panic with error handling

The expect call will panic if the row_id column is not an Int64Array, which
could occur due to schema inconsistencies. Replace with proper error handling that
returns a Result to allow graceful failure and better error reporting.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [259-265]

 let row_id_col_idx = self.schema.fields().iter().position(|f| f.name() == ROW_ID_COLUMN_NAME);
 let final_batch = if let Some(idx) = row_id_col_idx {
     let row_id_array = sorted_batch.column(idx)
         .as_any()
         .downcast_ref::<Int64Array>()
-        .expect("___row_id column must be Int64");
+        .ok_or_else(|| format!("__row_id__ column at index {} is not Int64Array", idx))?;
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that using expect can cause panics. Replacing with ok_or_else for proper error propagation is a better practice, allowing graceful failure instead of crashing. However, this is a schema consistency issue that should be caught earlier, so the impact is moderate.

Low
Validate null values before accessing entries

The null check for WriterFileSet should occur before attempting to extract the
generation from the map. Move this validation to the beginning of the method to fail
fast and avoid potential NPE when calling
writerFileSetMap.values().iterator().next() on a map containing null values.

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java [558-564]

+if (writerFileSetMap.isEmpty()) {
+    throw new IllegalArgumentException("writerFileSetMap must not be empty");
+}
+// Check for null values before accessing any entries
 for (Map.Entry<DataFormat, WriterFileSet> entry : writerFileSetMap.entrySet()) {
     if (entry.getValue() == null) {
         throw new IllegalStateException("WriterFileSet is null for format [" + entry.getKey().name() + "] — merge was incomplete");
     }
 }
+long generation = writerFileSetMap.values().iterator().next().writerGeneration();
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies that null checks should occur before accessing map entries. However, the current code already checks for empty map first (line 555-557), and the null check (lines 559-563) occurs before the writerGeneration() call (line 564), so the ordering is already safe. The suggestion's reordering doesn't add significant value.

Low
Suggestions up to commit bada5b6
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guarantee latch decrement on flush failure

If writerToFlush.flush() throws an exception, the writer is never added to toClose
and the latch is never decremented. This leaves the latch in an inconsistent state,
causing the subsequent await loop to hang indefinitely. Wrap the flush logic in
try-finally to ensure the latch is always decremented and the writer is queued for
cleanup even on failure.

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java [805-836]

 Writer<?> writerToFlush;
 while ((writerToFlush = flushQueue.poll()) != null) {
-    ensureOpen(); // short-circuit if engine has failed/closed concurrently
+    ensureOpen();
     final long writerFlushStartNanos = System.nanoTime();
-    FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
-    ...
-    toClose.add(writerToFlush);
-    ...
-    flushLatch.countDown();
+    try {
+        FileInfos fileInfos = writerToFlush.flush(FlushInput.EMPTY);
+        ...
+        if (hasFiles) {
+            newSegments.add(segmentBuilder.build());
+        }
+        refreshed |= hasFiles;
+    } finally {
+        toClose.add(writerToFlush);
+        flushLatch.countDown();
+    }
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical bug fix. If writerToFlush.flush() throws an exception at line 808, the writer is never added to toClose (line 831) and the latch is never decremented (line 836). This causes the subsequent flushLatch.await() loop at lines 842-854 to hang indefinitely, blocking the refresh thread. The suggested try-finally block ensures both the latch decrement and writer cleanup happen even on failure, preventing a deadlock scenario.

High
Validate null values before accessing map

The null check for WriterFileSet should occur before attempting to extract the
generation from the map values. Move this validation before the generation
assignment to prevent potential NullPointerException when calling iterator().next()
on a map that may contain null values.

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java [558-564]

+if (writerFileSetMap.isEmpty()) {
+    throw new IllegalArgumentException("writerFileSetMap must not be empty");
+}
+// Check for null values (format participated but returned no result)
 for (Map.Entry<DataFormat, WriterFileSet> entry : writerFileSetMap.entrySet()) {
     if (entry.getValue() == null) {
         throw new IllegalStateException("WriterFileSet is null for format [" + entry.getKey().name() + "] — merge was incomplete");
     }
 }
+long generation = writerFileSetMap.values().iterator().next().writerGeneration();
Suggestion importance[1-10]: 9

__

Why: The null check is correctly placed after the empty check but before accessing writerFileSetMap.values().iterator().next(). However, the suggestion to move it before the generation assignment is valid because if the first entry is null, calling .writerGeneration() on it would throw NullPointerException. The current code checks for nulls but only after already extracting generation from the first value. This is a critical bug that could cause runtime failures.

High
Prevent out-of-bounds access in mapping

The bounds check pos < total may allow silent data corruption if pos exceeds
merge_output.mapping.len(). Add an explicit check against merge_output.mapping.len()
to prevent out-of-bounds access and ensure the permutation mapping is constructed
correctly.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [665-679]

 let mut flat_mapping = vec![0i64; total];
 for i in 0..total {
     flat_mapping[i] = i as i64;
 }
 let mut pos = 0usize;
 for chunk_ids in chunk_row_ids {
     for &original_row_id in chunk_ids {
         let orig_idx = original_row_id as usize;
-        if orig_idx < total && pos < total {
+        if orig_idx < total && pos < merge_output.mapping.len() {
             flat_mapping[orig_idx] = merge_output.mapping[pos];
         }
         pos += 1;
     }
 }
Suggestion importance[1-10]: 8

__

Why: The bounds check pos < total does not guard against pos exceeding merge_output.mapping.len(), which could cause an out-of-bounds panic when accessing merge_output.mapping[pos]. Changing the condition to pos < merge_output.mapping.len() prevents potential crashes and ensures correct permutation construction.

Medium
General
Replace...

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for d82d0f1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for fed81b0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@chaitanya588 chaitanya588 force-pushed the flush_with_sorting branch 2 times, most recently from 2d16f26 to 28115b1 Compare May 20, 2026 12:40
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 28115b1: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for bada5b6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@chaitanya588 chaitanya588 reopened this May 20, 2026
@mgodwan mgodwan added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 20, 2026
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 34c5114: SUCCESS

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
…oring

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
… Parallelizing flushes in refresh flow via Flush Thread pool

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
…prepared for that

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Removes the legacy sort_small_file, sort_large_file,
sort_and_rewrite_parquet, and sort_and_rewrite_parquet_from_chunks
helpers that were resurrected during a rebase. The active sort path
is SortingChunkedWriter + finalize_sorted_chunks, which performs
eager sort-during-write plus a k-way merge at finalize.

Also adapts the finalize_sorted_chunks call to merge_sorted's
7-arg signature on this branch (the lastminute branch carried the
6-arg pre-opensearch-project#21576 signature).

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 0b79df7: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 5d4b4ae: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants