Skip to content

Adding CompositeMergeHandler and CompositeMergePolicy#21128

Merged
mgodwan merged 7 commits intoopensearch-project:mainfrom
darjisagar7:MergeComposite
Apr 28, 2026
Merged

Adding CompositeMergeHandler and CompositeMergePolicy#21128
mgodwan merged 7 commits intoopensearch-project:mainfrom
darjisagar7:MergeComposite

Conversation

@darjisagar7
Copy link
Copy Markdown
Contributor

@darjisagar7 darjisagar7 commented Apr 5, 2026

Add Segment Merge Infrastructure for Data-Format-Aware Engine

Description

This PR introduces a complete segment merge pipeline for the pluggable data-format-aware engine (DataFormatAwareEngine), enabling background and forced merge operations across single and composite (multi-format) data formats. Previously, the data-format engine could only index and refresh — segments accumulated indefinitely with no compaction.

The merge infrastructure reuses Lucene's battle-tested TieredMergePolicy for candidate selection while keeping the actual merge execution pluggable per data format. For composite indices (e.g., Parquet + Lucene), a two-phase merge ensures cross-format row-ID consistency.


Architecture Overview

┌──────────────────────────────────────────────────────────────────────────┐
│                        DataFormatAwareEngine                             │
│                                                                          │
│  refresh() ──→ triggerPossibleMerges()                                   │
│                       │                                                  │
│                       ▼                                                  │
│               ┌──────────────┐                                           │
│               │MergeScheduler │  (concurrency control, thread pool)      │
│               └──────┬───────┘                                           │
│                      │ findAndRegisterMerges()                           │
│                      ▼                                                   │
│               ┌──────────────┐                                           │
│               │ MergeHandler  │  (queue, lifecycle, candidate selection)  │
│               └──────┬───────┘                                           │
│                      │ findMergeCandidates()                             │
│                      ▼                                                   │
│  ┌─────────────────────────────────────┐                                 │
│  │ DataFormatAwareMergePolicy          │                                 │
│  │ (adapts Lucene TieredMergePolicy    │                                 │
│  │  to Segment model via               │                                 │
│  │  SegmentWrapper shims)              │                                 │
│  └─────────────────────────────────────┘                                 │
│                      │                                                   │
│                      ▼  doMerge()                                        │
│  ┌──────────────────────────────────────────────────────────────────┐    │
│  │ Merger (per-format)          OR    CompositeMerger (multi-format)│    │
│  │                                         │                        │    │
│  │                              ┌──────────┴──────────┐             │    │
│  │                              │CompositeMergeExecutor│             │    │
│  │                              │  1. merge primary    │             │    │
│  │                              │  2. get rowIdMapping │             │    │
│  │                              │  3. merge secondaries│             │    │
│  │                              └─────────────────────┘             │    │
│  └──────────────────────────────────────────────────────────────────┘    │
│                      │                                                   │
│                      ▼  applyMergeChanges()                              │
│  ┌──────────────────────────────────────┐                                │
│  │ CatalogSnapshotManager              │                                 │
│  │  replaces source segments with       │                                │
│  │  merged segment in catalog snapshot  │                                │
│  └──────────────────────────────────────┘                                │
└──────────────────────────────────────────────────────────────────────────┘

Sequence Diagrams

1. Background Merge Trigger (after refresh)

sequenceDiagram
    participant E as DataFormatAwareEngine
    participant MS as MergeScheduler
    participant MH as MergeHandler
    participant MP as DataFormatAwareMergePolicy
    participant CSM as CatalogSnapshotManager
    participant TP as ThreadPool (merge)

    E->>MS: triggerPossibleMerges()
    MS->>MH: findAndRegisterMerges()
    MH->>CSM: acquireSnapshot()
    CSM-->>MH: CatalogSnapshot (segments)
    MH->>MP: findMergeCandidates(segments)
    Note over MP: Wrap Segments as<br/>SegmentWrapper shims
    MP->>MP: TieredMergePolicy.findMerges()
    MP-->>MH: List<List<Segment>> candidates
    loop For each merge candidate
        MH->>MH: validate (no overlap with<br/>in-progress merges)
        MH->>MH: registerMerge(oneMerge)<br/>add to pending queue,<br/>lock segments
    end
    MH-->>MS: return
    loop While activeMerges < max && hasPending
        MS->>MH: getNextMerge()
        MH-->>MS: OneMerge
        MS->>TP: submitMergeTask(oneMerge)
    end
Loading

2. Merge Task Execution (on merge thread pool)

sequenceDiagram
    participant MT as Merge Thread
    participant MH as MergeHandler
    participant M as Merger (format)
    participant E as Engine (applyMergeChanges)
    participant CSM as CatalogSnapshotManager
    participant RM as ReaderManagers

    MT->>MT: mergeStatsTracker.beforeMerge()
    MT->>MH: doMerge(oneMerge)
    MH->>M: merger.merge(MergeInput)
    Note over M: Format-specific merge:<br/>compact segment files
    M-->>MH: MergeResult
    MH-->>MT: MergeResult

    MT->>E: applyMergeChanges(result, oneMerge)
    Note over E: refreshLock.lock()
    E->>CSM: applyMergeResults(result, oneMerge)
    Note over CSM: Replace source segments<br/>with merged segment
    CSM->>CSM: commitNewSnapshot(updatedSegments)
    E->>RM: refreshListeners() — notify readers
    Note over E: refreshLock.unlock()
    E-->>MT: return

    MT->>MH: onMergeFinished(oneMerge)
    Note over MH: removeMergingSegments()<br/>findAndRegisterMerges()<br/>(check for new candidates)
    MH-->>MT: return
    MT->>MT: mergeStatsTracker.afterMerge()
    MT->>MT: activeMerges--
    MT->>MT: executeMerge() (drain more if pending)
Loading

3. Composite Merge (multi-format orchestration)

sequenceDiagram
    participant CM as CompositeMerger
    participant CE as CompositeMergeExecutor
    participant PM as Primary Merger<br/>(e.g. Parquet)
    participant SM as Secondary Merger<br/>(e.g. Lucene)

    CM->>CM: extractFilesByFormat()<br/>partition segment files per format
    CM->>CM: build MergePlan
    CM->>CE: execute(plan)

    Note over CE: Phase 1: Primary format
    CE->>PM: merge(primaryFiles, rowIdMapping=null)
    Note over PM: Compact primary segments
    PM-->>CE: FormatMergeResult<br/>(mergedFiles + rowIdMapping)

    Note over CE: Extract RowIdMapping<br/>from primary result

    Note over CE: Phase 2: Secondary formats
    CE->>SM: merge(secondaryFiles, rowIdMapping)
    Note over SM: Merge using rowIdMapping<br/>for cross-format consistency
    SM-->>CE: FormatMergeResult

    CE->>CE: Combine all FormatMergeResults<br/>into single MergeResult
    CE-->>CM: MergeResult (all formats merged)
Loading

4. Failure & Cleanup Path

sequenceDiagram
    participant CE as CompositeMergeExecutor
    participant PM as Primary Merger
    participant SM as Secondary Merger
    participant MS as MergeScheduler
    participant MH as MergeHandler

    CE->>PM: merge primary
    PM-->>CE: FormatMergeResult ✓

    CE->>SM: merge secondary
    SM--xCE: IOException ✗

    Note over CE: Catch: cleanup all<br/>completed results
    CE->>CE: primaryResult.cleanup()<br/>(delete merged output files)
    CE->>CE: re-throw exception

    Note over MS: Back in MergeScheduler
    MS->>MS: log error
    MS->>MH: onMergeFailure(oneMerge)
    Note over MH: removeMergingSegments()<br/>(unlock segments for<br/>future merge attempts)
    MH-->>MS: return
Loading

Key Changes

Server (core merge framework)

Class Description
MergeHandler Manages the merge queue, candidate selection via a pluggable MergePolicy interface, segment locking to prevent double-merging, and lifecycle callbacks (onMergeFinished / onMergeFailure).
MergeScheduler Controls merge concurrency (respects MergeSchedulerConfig), submits merge tasks to the new ThreadPool.Names.MERGE scaling thread pool, and tracks stats.
DataFormatAwareMergePolicy Adapts Lucene's TieredMergePolicy to the data-format Segment model by wrapping segments as SegmentCommitInfo shims (SegmentWrapper). Also implements MergeListener to track currently-merging segments.
MergeStatsTracker Extracted merge metric counters into a reusable class, shared by both OpenSearchConcurrentMergeScheduler (Lucene path) and the new MergeScheduler.
MergeFailedEngineException New serializable exception for merge failures that carries the ShardId.
CatalogSnapshotManager Added applyMergeResults() — atomically replaces source segments with the merged segment in the catalog snapshot.
DataFormatAwareEngine Wired merge into the engine lifecycle: creates MergeHandler + MergeScheduler at construction, calls triggerPossibleMerges() after each refresh, and applies merge results under the refresh lock. Gated behind opensearch.pluggable.dataformat.merge.enabled system property.
MergeInput Extended with rowIdMapping and newWriterGeneration fields for composite merge support.
ThreadPool Added Names.MERGE scaling thread pool for background merge execution.

Composite Engine (multi-format merge orchestration)

Class Description
CompositeMerger Top-level Merger that extracts per-format file lists from segments and delegates to CompositeMergeExecutor.
CompositeMergeExecutor Stateless executor: merges the primary format first, extracts the RowIdMapping, then merges each secondary format using that mapping. On failure, cleans up all completed format results.
MergePlan Immutable record holding the pre-validated merge plan: primary format, secondary formats, per-format file lists, and target generation.
FormatMergeResult Per-format merge result with cleanup support for rollback on failure.

Deleted

Class Reason
CompositeMergeHandler Replaced by CompositeMerger + CompositeMergeExecutor (separation of concerns).
CompositeMergePolicyTests Replaced by DataFormatAwareMergePolicyTests (moved to server module).

Merge Flow Summary

  1. After each refresh(), the engine calls triggerPossibleMerges().
  2. MergeScheduler asks MergeHandler to find and register merge candidates.
  3. MergeHandler acquires a catalog snapshot, delegates to DataFormatAwareMergePolicy which wraps segments as Lucene SegmentCommitInfo and calls TieredMergePolicy.findMerges().
  4. Valid merges (no overlap with in-progress merges) are queued.
  5. MergeScheduler drains the queue up to maxConcurrentMerges, submitting each to the merge thread pool.
  6. Each merge task calls MergeHandler.doMerge()Merger.merge() (or CompositeMerger for multi-format).
  7. On success, applyMergeChanges() atomically updates the catalog snapshot and notifies reader managers.
  8. On failure, onMergeFailure() releases the merging-segment locks and logs the error.

Tests

Test Type Description
CompositeMergeIT Integration Verifies background merges reduce segment count after repeated index/refresh cycles.
DataFormatAwareMergePolicyTests Unit Candidate selection, force merge, merging-segment exclusion, SegmentWrapper identity.
CompositeMergerTests Unit Composite merge orchestration: primary-only, primary+secondary, row-ID mapping propagation, failure cleanup.
MergeFailedEngineExceptionTests Unit Serialization round-trip.
CatalogSnapshotManagerTests Unit applyMergeResults segment replacement logic.
MergeTests Unit Refactored for MergeHandler / MergeScheduler split coverage.
# Unit tests
./gradlew :server:test --tests "*DataFormatAwareMergePolicyTests*"
./gradlew :server:test --tests "*MergeTests*"
./gradlew :server:test --tests "*MergeFailedEngineExceptionTests*"
./gradlew :server:test --tests "*CatalogSnapshotManagerTests*"
./gradlew :sandbox:plugins:composite-engine:test --tests "*CompositeMergerTests*"

# Integration tests
./gradlew :sandbox:plugins:composite-engine:internalClusterTest --tests "*CompositeMergeIT*" -Dsandbox.enabled=true

Configuration

Merge is gated behind a system property (disabled by default while format-specific merge implementations are being completed):

-Dopensearch.pluggable.dataformat.merge.enabled=true

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 5, 2026

PR Reviewer Guide 🔍

(Review updated until commit 4790673)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Core server-side merge infrastructure (MergeHandler, MergePolicy, MergeScheduler)

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java
  • server/src/main/java/org/opensearch/index/engine/dataformat/merge/DataFormatAwareMergePolicy.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/merge/MergeTests.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/merge/DataFormatAwareMergePolicyTests.java

Sub-PR theme: Composite-engine merge plugin (CompositeMerger, CompositeMergeExecutor, integration tests)

Relevant files:

  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMerger.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMergeExecutor.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/merge/CompositeMergerTests.java
  • sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.java

Sub-PR theme: CatalogSnapshotManager merge result application

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java
  • server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java

⚡ Recommended focus areas for review

Thread Safety

DataFormatMergeContext uses a non-thread-safe HashSet for mergingSegments with synchronized on individual methods. However, addMergingSegment and removeMergingSegment in DataFormatAwareMergePolicy iterate over the input collection and call the synchronized methods one-by-one, meaning the overall add/remove is not atomic. Concurrent callers could interleave adds and removes mid-iteration, leading to inconsistent state. Consider using a ConcurrentHashMap-backed set or synchronizing at the DataFormatAwareMergePolicy level.

public void addMergingSegment(Collection<Segment> segments) {
    for (Segment segment : segments) {
        mergeContext.addMergingSegment(createWrapper(segment));
    }
}

/**
 * Removes segments from the currently-merging set after a merge completes or fails.
 *
 * @param segments the segments to remove
 */
@Override
public void removeMergingSegment(Collection<Segment> segments) {
    for (Segment segment : segments) {
        mergeContext.removeMergingSegment(createWrapper(segment));
    }
}
Shared DUMMY_ID

All SegmentWrapper instances share the same DUMMY_ID byte array. Lucene's SegmentInfo may read or write this array, and sharing a mutable byte array across multiple SegmentInfo instances could cause subtle corruption or incorrect equality checks if Lucene ever mutates the ID. Each wrapper should use its own copy of the ID bytes.

private static final byte[] DUMMY_ID = new byte[16];
private static final Map<String, String> EMPTY_DIAGNOSTICS = Map.of();
private static final Map<String, String> EMPTY_ATTRIBUTES = Map.of();

private final long generation;
private final long totalSizeBytes;

public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
    super(
        new org.apache.lucene.index.SegmentInfo(
            directory,
            Version.LATEST,
            Version.LATEST,
            "segment_" + segment.generation(),
            (int) Math.min(totalNumDocs, Integer.MAX_VALUE),
            false,
            false,
            Codec.getDefault(),
            EMPTY_DIAGNOSTICS,
            DUMMY_ID,
            EMPTY_ATTRIBUTES,
            null
        ),
        0,
        0,
        0,
        -1,
        -1,
        DUMMY_ID
Possible Data Loss

In applyMergeResults, if none of the segments in oneMerge.getSegmentsToMerge() are found in segmentList (e.g., they were concurrently deleted or already removed), the new merged segment is unconditionally inserted at index 0. This could silently add a segment that should have been discarded, potentially leading to data inconsistency. The comment acknowledges this but there is no validation or logging to detect this unexpected case.

if (!inserted) {
    segmentList.add(0, segmentToAdd);
}
Exception Wrapping

In findMerges and findForceMerges, all exceptions (including RuntimeException) are caught and re-wrapped in a new RuntimeException. This loses the original exception type and may obscure the root cause. If the original exception is already a RuntimeException, it should be re-thrown directly rather than double-wrapped.

    } catch (Exception e) {
        logger.warn("Failed to acquire snapshots", e);
        throw new RuntimeException(e);
    }
    return oneMerges;
}

/**
 * Finds merges required to reduce the number of segments to at most {@code maxSegmentCount}.
 *
 * @param maxSegmentCount the maximum number of segments allowed after merging
 * @return a collection of merges to execute
 */
public Collection<OneMerge> findForceMerges(int maxSegmentCount) {
    List<OneMerge> oneMerges = new ArrayList<>();
    try (GatedCloseable<CatalogSnapshot> catalogSnapshotRef = snapshotSupplier.get()) {
        List<Segment> segmentList = catalogSnapshotRef.get().getSegments();
        List<List<Segment>> mergeCandidates = mergePolicy.findForceMergeCandidates(segmentList, maxSegmentCount);
        for (List<Segment> mergeGroup : mergeCandidates) {
            oneMerges.add(new OneMerge(mergeGroup));
        }
    } catch (Exception e) {
        logger.warn("Failed to acquire snapshots", e);
        throw new RuntimeException(e);
    }
    return oneMerges;
Flaky Test

testTriggerMergesExecutesMergeThread and testForceMergeExecutesMerges use Thread.sleep(200) after a CountDownLatch.await() to wait for the callback to be invoked. This is a timing-dependent pattern that can be flaky under load. A second latch or a polling mechanism should be used to reliably wait for the callback.

    scheduler.triggerMerges();
    assertTrue(latch.await(5, TimeUnit.SECONDS));
    Thread.sleep(200);
    assertNotNull(captured.get());
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 5, 2026

PR Code Suggestions ✨

Latest suggestions up to 4790673

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix active merge counter leak on shutdown

When isShutdown is true, the task returns early without calling
mergeHandler.onMergeFailure(oneMerge) or decrementing activeMerges in the finally
block — the return exits before finally runs. This causes activeMerges to leak
upward permanently, blocking all future merges even if the scheduler is restarted.
The shutdown early-return should fall through to the finally block or explicitly
call cleanup.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [217-249]

 private void submitMergeTask(OneMerge oneMerge) {
     activeMerges.incrementAndGet();
     threadPool.executor(ThreadPool.Names.MERGE).execute(() -> {
-        ...
+        long totalSizeInBytes = oneMerge.getTotalSizeInBytes();
+        long totalNumDocs = oneMerge.getTotalNumDocs();
+        long timeNS = System.nanoTime();
+        long tookMS = 0;
         try {
             if (isShutdown.get()) {
                 logger.debug("MergeScheduler is shutdown, skipping merge");
+                mergeHandler.onMergeFailure(oneMerge);
                 return;
             }
-            ...
+
+            mergeStatsTracker.beforeMerge(totalNumDocs, totalSizeInBytes);
+
+            MergeResult mergeResult = mergeHandler.doMerge(oneMerge);
+            applyMergeChanges.accept(mergeResult, oneMerge);
+            mergeHandler.onMergeFinished(oneMerge);
+
+            tookMS = TimeValue.nsecToMSec((System.nanoTime() - timeNS));
+            logger.info("Merge completed in {}ms", tookMS);
+
         } catch (Exception e) {
             logger.error(new ParameterizedMessage("Unexpected error during merge for: {}", oneMerge), e);
             mergeHandler.onMergeFailure(oneMerge);
         } finally {
             mergeStatsTracker.afterMerge(tookMS, totalNumDocs, totalSizeInBytes);
             activeMerges.decrementAndGet();
-            // A completed merge may free up capacity for new merges, so check again.
             executeMerge();
         }
     });
 }
Suggestion importance[1-10]: 8

__

Why: When isShutdown is true, the early return inside the try block skips the finally block, causing activeMerges to never be decremented and onMergeFailure to never be called. This is a real bug that would permanently block all future merges after shutdown is triggered.

Medium
Fix unsafe cast in exception handler

The cast (IOException) e will throw a ClassCastException if the exception is not an
IOException (e.g., an Error or other checked exception). You should handle this more
safely by checking the type before casting, or rethrowing as a general
RuntimeException.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMergeExecutor.java [65-69]

 } catch (Exception e) {
     completed.forEach(FormatMergeResult::cleanup);
     if (e instanceof RuntimeException re) throw re;
-    throw new UncheckedIOException((IOException) e);
+    if (e instanceof IOException ioe) throw new UncheckedIOException(ioe);
+    throw new RuntimeException(e);
 }
Suggestion importance[1-10]: 7

__

Why: The cast (IOException) e is unsafe and will throw a ClassCastException if the exception is not an IOException. The fix correctly uses instanceof pattern matching before casting, which is a real correctness issue.

Medium
Fix force merge to run synchronously as documented

The forceMerge method checks activeMerges.get() > 0 and throws, but the Javadoc says
it "runs synchronously on the calling thread" while the implementation actually
submits tasks asynchronously to the thread pool. This is a contract violation.
Additionally, the check for active merges is a non-atomic read-then-act pattern that
can race with concurrent triggerMerges calls. Either make the method truly
synchronous or update the documentation and use proper synchronization.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [135-139]

 public void forceMerge(int maxNumSegment) throws IOException {
     if (activeMerges.get() > 0) {
         logger.warn("Cannot force merge while background merges are active");
         throw new IllegalStateException("Cannot force merge while background merges are active");
     }
+    Collection<OneMerge> oneMerges = mergeHandler.findForceMerges(maxNumSegment);
+    for (OneMerge oneMerge : oneMerges) {
+        mergeHandler.registerMerge(oneMerge);
+        activeMerges.incrementAndGet();
+        try {
+            MergeResult mergeResult = mergeHandler.doMerge(oneMerge);
+            applyMergeChanges.accept(mergeResult, oneMerge);
+            mergeHandler.onMergeFinished(oneMerge);
+        } catch (Exception e) {
+            logger.error(new ParameterizedMessage("Force merge failed for: {}", oneMerge), e);
+            mergeHandler.onMergeFailure(oneMerge);
+        } finally {
+            activeMerges.decrementAndGet();
+        }
+    }
+}
Suggestion importance[1-10]: 6

__

Why: The Javadoc says forceMerge "runs synchronously on the calling thread" but the implementation submits tasks asynchronously to the thread pool, which is a contract violation. The race condition between the activeMerges check and concurrent triggerMerges calls is also a real concern, though the improved code doesn't fully address the synchronization issue either.

Low
Filter merged files by correct data format

dfGroupedSearchableFiles() returns a map keyed by format name, so values() may
return WriterFileSet entries from multiple formats per segment. This could cause the
mock merger to process files from unintended formats, leading to incorrect merge
behavior in tests. The mock should filter by the specific dataFormat it handles.

test/framework/src/main/java/org/opensearch/index/engine/dataformat/stub/MockMerger.java [39-42]

 List<WriterFileSet> fileMetadataList = new ArrayList<>();
 for (Segment segment : mergeInput.segments()) {
-    fileMetadataList.addAll(segment.dfGroupedSearchableFiles().values());
+    WriterFileSet wfs = segment.dfGroupedSearchableFiles().get(dataFormat.name());
+    if (wfs != null) {
+        fileMetadataList.add(wfs);
+    }
 }
Suggestion importance[1-10]: 6

__

Why: The current implementation collects WriterFileSet values from all formats in a segment, which could cause incorrect behavior in multi-format tests. Filtering by dataFormat.name() ensures the mock only processes files for its specific format.

Low
Use unique segment IDs instead of shared dummy array

All SegmentWrapper instances share the same static DUMMY_ID byte array for the
Lucene SegmentInfo ID. Lucene uses this ID to uniquely identify segments, and
sharing the same array across all wrappers may cause Lucene's internal deduplication
or identity checks to malfunction. Each wrapper should use a unique ID, for example
derived from the segment generation.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/DataFormatAwareMergePolicy.java [274-307]

-private static class SegmentWrapper extends SegmentCommitInfo {
-    private static final byte[] DUMMY_ID = new byte[16];
-    ...
-    public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
-        super(
-            new org.apache.lucene.index.SegmentInfo(
-                directory,
-                ...
-                DUMMY_ID,
-                EMPTY_ATTRIBUTES,
-                null
-            ),
-            0,
-            0,
-            0,
-            -1,
-            -1,
-            DUMMY_ID
-        );
+public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
+    super(
+        new org.apache.lucene.index.SegmentInfo(
+            directory,
+            Version.LATEST,
+            Version.LATEST,
+            "segment_" + segment.generation(),
+            (int) Math.min(totalNumDocs, Integer.MAX_VALUE),
+            false,
+            false,
+            Codec.getDefault(),
+            EMPTY_DIAGNOSTICS,
+            generateId(segment.generation()),
+            EMPTY_ATTRIBUTES,
+            null
+        ),
+        0, 0, 0, -1, -1,
+        generateId(segment.generation())
+    );
+    this.generation = segment.generation();
+    this.totalSizeBytes = totalSizeBytes;
+}
 
+private static byte[] generateId(long generation) {
+    byte[] id = new byte[16];
+    for (int i = 0; i < 8; i++) {
+        id[i] = (byte) (generation >>> (i * 8));
+    }
+    return id;
+}
+
Suggestion importance[1-10]: 5

__

Why: Sharing the same static DUMMY_ID byte array across all SegmentWrapper instances could cause issues with Lucene's internal identity checks, though the equals/hashCode override based on generation partially mitigates this. The suggestion is valid but the actual impact depends on Lucene internals.

Low
Fix non-deterministic generation from unordered map

The generation is derived from the first entry of the map, but HashMap (or other
unordered maps) does not guarantee iteration order. If writerFileSetMap is a
HashMap, the generation could be taken from any format's WriterFileSet, not
necessarily the primary format's. This could result in an incorrect generation being
assigned to the merged segment. The generation should be taken from the primary
format's WriterFileSet explicitly.

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java [329-339]

 private Segment getSegment(Map<DataFormat, WriterFileSet> writerFileSetMap) {
     if (writerFileSetMap.isEmpty()) {
         throw new IllegalArgumentException("writerFileSetMap must not be empty");
     }
-    long generation = writerFileSetMap.values().iterator().next().writerGeneration();
+    // Use the maximum generation across all formats to ensure correctness
+    long generation = writerFileSetMap.values().stream()
+        .mapToLong(WriterFileSet::writerGeneration)
+        .max()
+        .getAsLong();
     Segment.Builder segment = Segment.builder(generation);
     for (Map.Entry<DataFormat, WriterFileSet> entry : writerFileSetMap.entrySet()) {
         segment.addSearchableFiles(entry.getKey(), entry.getValue());
     }
     return segment.build();
 }
Suggestion importance[1-10]: 5

__

Why: The concern about non-deterministic iteration order of HashMap is valid, but the suggested fix of using max() generation may not be semantically correct either. The generation should ideally come from the primary format's WriterFileSet, but the method doesn't have access to the primary format. Using max() is a reasonable workaround but changes semantics.

Low
General
Avoid double-wrapping runtime exceptions in merge methods

findMerges() wraps all exceptions (including RuntimeException from the snapshot
supplier) in a new RuntimeException, which double-wraps runtime exceptions and loses
the original type. The catch block should only wrap checked exceptions (like
IOException) and rethrow RuntimeException directly to preserve the original
exception type and avoid unnecessary wrapping.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [80-93]

 public Collection<OneMerge> findMerges() {
     List<OneMerge> oneMerges = new ArrayList<>();
     try (GatedCloseable<CatalogSnapshot> catalogSnapshotRef = snapshotSupplier.get()) {
         List<Segment> segmentList = catalogSnapshotRef.get().getSegments();
         List<List<Segment>> mergeCandidates = mergePolicy.findMergeCandidates(segmentList);
         for (List<Segment> mergeGroup : mergeCandidates) {
             oneMerges.add(new OneMerge(mergeGroup));
         }
+    } catch (RuntimeException e) {
+        logger.warn("Failed to acquire snapshots", e);
+        throw e;
     } catch (Exception e) {
         logger.warn("Failed to acquire snapshots", e);
         throw new RuntimeException(e);
     }
     return oneMerges;
 }
Suggestion importance[1-10]: 5

__

Why: Wrapping all exceptions including RuntimeException in a new RuntimeException loses the original exception type and creates unnecessary nesting. The fix to rethrow RuntimeException directly is correct and improves debuggability, as seen in the test testFindMergesThrowsOnSnapshotFailure which checks the message directly.

Low
Document null primary format risk in no-arg constructor

The no-arg constructor sets primaryDataFormat to null, but getPrimaryDataFormat()
returns it without null-checking. Code that calls getPrimaryDataFormat() on an
instance created via the no-arg constructor (e.g., in CompositeMerger) will get a
NullPointerException. Consider removing the no-arg constructor or throwing an
exception if it is used in contexts that require a primary format.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java [46-49]

+/**
+ * @deprecated Use {@link #CompositeDataFormat(DataFormat, List)} instead.
+ */
+@Deprecated
 public CompositeDataFormat() {
     this.primaryDataFormat = null;
     this.dataFormats = List.of();
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion only adds a @Deprecated annotation and comment, which is a minor documentation improvement. The actual null-safety concern is valid but the improved_code doesn't address the NPE risk in getPrimaryDataFormat().

Low

Previous suggestions

Suggestions up to commit 4d3b869
CategorySuggestion                                                                                                                                    Impact
Possible issue
Track active count for force merges

The forceMerge method does not register the merge with
mergeHandler.registerMerge(oneMerge) before executing it, nor does it track the
merge in activeMerges. This means hasPendingMerges() won't reflect force merges, and
the activeMerges counter won't prevent concurrent background merges from running
simultaneously with force merges after the initial check. The activeMerges counter
should be incremented before submitting each force merge task and decremented in the
finally block.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [135-154]

 public void forceMerge(int maxNumSegment) throws IOException {
     if (activeMerges.get() > 0) {
         logger.warn("Cannot force merge while background merges are active");
         throw new IllegalStateException("Cannot force merge while background merges are active");
     }
     Collection<OneMerge> oneMerges = mergeHandler.findForceMerges(maxNumSegment);
 
     for (OneMerge oneMerge : oneMerges) {
+        activeMerges.incrementAndGet();
         threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(() -> {
             try {
                 MergeResult mergeResult = mergeHandler.doMerge(oneMerge);
                 applyMergeChanges.accept(mergeResult, oneMerge);
                 mergeHandler.onMergeFinished(oneMerge);
             } catch (Exception e) {
                 logger.error(new ParameterizedMessage("Force merge failed for: {}", oneMerge), e);
                 mergeHandler.onMergeFailure(oneMerge);
+            } finally {
+                activeMerges.decrementAndGet();
             }
         });
     }
 }
Suggestion importance[1-10]: 7

__

Why: The forceMerge method doesn't increment/decrement activeMerges, so the guard check activeMerges.get() > 0 at the start won't prevent background merges from running concurrently with force merges after the initial check passes. This is a real concurrency correctness issue.

Medium
Fix stats corruption on shutdown early exit

When isShutdown is true and the method returns early inside the try block, the
finally block still calls mergeStatsTracker.afterMerge and executeMerge() without a
corresponding beforeMerge call, which corrupts stats. Additionally,
mergeHandler.onMergeFailure(oneMerge) is never called on early shutdown exit,
leaving the merge registered as pending. The early-exit path should call
onMergeFailure and skip the stats tracking.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [217-250]

 private void submitMergeTask(OneMerge oneMerge) {
     activeMerges.incrementAndGet();
     threadPool.executor(ThreadPool.Names.MERGE).execute(() -> {
-        ...
+        long totalSizeInBytes = oneMerge.getTotalSizeInBytes();
+        long totalNumDocs = oneMerge.getTotalNumDocs();
+        long timeNS = System.nanoTime();
+        long tookMS = 0;
+        boolean started = false;
         try {
             if (isShutdown.get()) {
                 logger.debug("MergeScheduler is shutdown, skipping merge");
+                mergeHandler.onMergeFailure(oneMerge);
                 return;
             }
-            ...
+
+            mergeStatsTracker.beforeMerge(totalNumDocs, totalSizeInBytes);
+            started = true;
+
+            MergeResult mergeResult = mergeHandler.doMerge(oneMerge);
+            applyMergeChanges.accept(mergeResult, oneMerge);
+            mergeHandler.onMergeFinished(oneMerge);
+
+            tookMS = TimeValue.nsecToMSec((System.nanoTime() - timeNS));
+            logger.info("Merge completed in {}ms", tookMS);
+
         } catch (Exception e) {
-            ...
+            logger.error(new ParameterizedMessage("Unexpected error during merge for: {}", oneMerge), e);
             mergeHandler.onMergeFailure(oneMerge);
         } finally {
-            mergeStatsTracker.afterMerge(tookMS, totalNumDocs, totalSizeInBytes);
+            if (started) {
+                mergeStatsTracker.afterMerge(tookMS, totalNumDocs, totalSizeInBytes);
+            }
             activeMerges.decrementAndGet();
             executeMerge();
         }
     });
 }
Suggestion importance[1-10]: 7

__

Why: When isShutdown is true, the early return inside the try block still triggers the finally block which calls mergeStatsTracker.afterMerge without a prior beforeMerge, corrupting stats. Additionally, onMergeFailure is never called on early shutdown, leaving the merge in a pending state.

Medium
Fix unsafe cast in exception handler

The cast (IOException) e will throw a ClassCastException if the exception is neither
a RuntimeException nor an IOException (e.g., a checked non-IO exception). Use
instanceof check before casting to avoid this, or wrap with a more general handler.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMergeExecutor.java [65-69]

 } catch (Exception e) {
     completed.forEach(FormatMergeResult::cleanup);
     if (e instanceof RuntimeException re) throw re;
-    throw new UncheckedIOException((IOException) e);
+    if (e instanceof IOException ioe) throw new UncheckedIOException(ioe);
+    throw new RuntimeException(e);
 }
Suggestion importance[1-10]: 7

__

Why: The cast (IOException) e is unsafe and will throw a ClassCastException if the exception is neither a RuntimeException nor an IOException. The fix correctly adds an instanceof check before casting, preventing potential runtime errors.

Medium
Use unique IDs per segment wrapper

All SegmentWrapper instances share the same static DUMMY_ID byte array for both the
SegmentInfo ID and the SegmentCommitInfo ID. Lucene uses the segment ID for identity
and file naming, so sharing the same ID array across all wrappers may cause
collisions or incorrect behavior in the merge policy. Each wrapper should use a
unique ID, e.g., derived from the segment generation.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/DataFormatAwareMergePolicy.java [282-307]

-private static class SegmentWrapper extends SegmentCommitInfo {
-    private static final byte[] DUMMY_ID = new byte[16];
-    ...
-    public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
-        super(
-            new org.apache.lucene.index.SegmentInfo(
-                directory,
-                ...
-                DUMMY_ID,
-                EMPTY_ATTRIBUTES,
-                null
-            ),
-            0,
-            0,
-            0,
-            -1,
-            -1,
-            DUMMY_ID
-        );
+public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
+    super(
+        new org.apache.lucene.index.SegmentInfo(
+            directory,
+            Version.LATEST,
+            Version.LATEST,
+            "segment_" + segment.generation(),
+            (int) Math.min(totalNumDocs, Integer.MAX_VALUE),
+            false,
+            false,
+            Codec.getDefault(),
+            EMPTY_DIAGNOSTICS,
+            uniqueId(segment.generation()),
+            EMPTY_ATTRIBUTES,
+            null
+        ),
+        0,
+        0,
+        0,
+        -1,
+        -1,
+        uniqueId(segment.generation())
+    );
+    this.generation = segment.generation();
+    this.totalSizeBytes = totalSizeBytes;
+}
 
+private static byte[] uniqueId(long generation) {
+    byte[] id = new byte[16];
+    for (int i = 0; i < 8; i++) {
+        id[i] = (byte) (generation >>> (i * 8));
+    }
+    return id;
+}
+
Suggestion importance[1-10]: 6

__

Why: All SegmentWrapper instances share the same static DUMMY_ID byte array, which Lucene uses for segment identity. This could cause collisions or incorrect behavior in the merge policy when multiple segments are evaluated simultaneously.

Low
General
Avoid double-wrapping runtime exceptions

When snapshotSupplier.get() throws (e.g., RuntimeException), the catch block wraps
it in another RuntimeException, causing double-wrapping of the original exception.
The original exception should be re-thrown directly if it is already a
RuntimeException, or only wrapped if it is a checked exception.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [80-93]

 public Collection<OneMerge> findMerges() {
     List<OneMerge> oneMerges = new ArrayList<>();
     try (GatedCloseable<CatalogSnapshot> catalogSnapshotRef = snapshotSupplier.get()) {
         List<Segment> segmentList = catalogSnapshotRef.get().getSegments();
         List<List<Segment>> mergeCandidates = mergePolicy.findMergeCandidates(segmentList);
         for (List<Segment> mergeGroup : mergeCandidates) {
             oneMerges.add(new OneMerge(mergeGroup));
         }
+    } catch (RuntimeException e) {
+        logger.warn("Failed to acquire snapshots", e);
+        throw e;
     } catch (Exception e) {
         logger.warn("Failed to acquire snapshots", e);
         throw new RuntimeException(e);
     }
     return oneMerges;
 }
Suggestion importance[1-10]: 5

__

Why: When a RuntimeException is thrown by snapshotSupplier.get(), the catch block wraps it in another RuntimeException, making the stack trace harder to read. Splitting the catch into RuntimeException (rethrow directly) and Exception (wrap) is a clean improvement.

Low
Validate consistent generations across merged format files

The generation is derived from the first WriterFileSet's writerGeneration(), but
different formats in the map may have different generations. This could produce a
Segment with an inconsistent generation. The generation should be validated to be
consistent across all entries, or explicitly passed in from MergeResult rather than
inferred.

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java [329-339]

 private Segment getSegment(Map<DataFormat, WriterFileSet> writerFileSetMap) {
     if (writerFileSetMap.isEmpty()) {
         throw new IllegalArgumentException("writerFileSetMap must not be empty");
     }
     long generation = writerFileSetMap.values().iterator().next().writerGeneration();
     Segment.Builder segment = Segment.builder(generation);
     for (Map.Entry<DataFormat, WriterFileSet> entry : writerFileSetMap.entrySet()) {
+        if (entry.getValue().writerGeneration() != generation) {
+            throw new IllegalArgumentException(
+                "Inconsistent writer generations in writerFileSetMap: expected " + generation
+                + " but found " + entry.getValue().writerGeneration() + " for format " + entry.getKey().name()
+            );
+        }
         segment.addSearchableFiles(entry.getKey(), entry.getValue());
     }
     return segment.build();
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion identifies a real potential inconsistency where different WriterFileSet entries in the map could have different generations, leading to a Segment with an incorrect generation. Adding validation improves correctness, though in practice the MergeResult should already guarantee consistent generations.

Low
Fail fast instead of returning empty merge result

Returning an empty MergeResult (with no merged file sets) from getMerger() will
cause CompositeMergeExecutor to call
result.getMergedWriterFileSetForDataformat(format) and get null, then store null as
the mergedFiles in FormatMergeResult. This silently produces an incomplete merge
result. The TODO comment indicates this is intentional, but the stub should at
minimum throw UnsupportedOperationException to fail fast rather than silently
producing corrupt state.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [282-283]

-return mergeInput -> new MergeResult(Map.of());
+// TODO: Implement merge support as LuceneMerger
+return mergeInput -> { throw new UnsupportedOperationException("Lucene merge not yet implemented"); };
Suggestion importance[1-10]: 4

__

Why: The suggestion raises a valid concern about silent failures when an empty MergeResult is returned, but the PR's TODO comment indicates this is intentional stub behavior. Throwing UnsupportedOperationException would be a breaking change for existing callers that may handle null mergers differently.

Low
Document null primary format risk in no-arg constructor

The no-arg constructor sets primaryDataFormat to null, but getPrimaryDataFormat()
returns it without null-checking. This can cause NullPointerException when
CompositeMerger calls compositeDataFormat.getPrimaryDataFormat() on an instance
created with the no-arg constructor. Consider removing the no-arg constructor or
throwing an exception if it's used in a merge context.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java [46-49]

+/**
+ * @deprecated Use {@link #CompositeDataFormat(DataFormat, List)} instead.
+ */
+@Deprecated
 public CompositeDataFormat() {
     this.primaryDataFormat = null;
     this.dataFormats = List.of();
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies a potential NullPointerException risk when getPrimaryDataFormat() is called on an instance created with the no-arg constructor, but the proposed fix only adds a @Deprecated annotation rather than actually preventing the null issue. The improvement is minimal.

Low
Suggestions up to commit 600e264
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix active merge counter leak on shutdown

When the merge task returns early due to shutdown, activeMerges is never decremented
and mergeHandler.onMergeFailure is never called. This leaks the active merge counter
and leaves the merge in a pending state, potentially blocking future merges
indefinitely.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [217-249]

 private void submitMergeTask(OneMerge oneMerge) {
     activeMerges.incrementAndGet();
     threadPool.executor(ThreadPool.Names.MERGE).execute(() -> {
-        ...
+        long totalSizeInBytes = oneMerge.getTotalSizeInBytes();
+        long totalNumDocs = oneMerge.getTotalNumDocs();
+        long timeNS = System.nanoTime();
+        long tookMS = 0;
         try {
             if (isShutdown.get()) {
                 logger.debug("MergeScheduler is shutdown, skipping merge");
+                mergeHandler.onMergeFailure(oneMerge);
                 return;
             }
+            // ... rest of merge logic
+        } catch (Exception e) {
+            logger.error(new ParameterizedMessage("Unexpected error during merge for: {}", oneMerge), e);
+            mergeHandler.onMergeFailure(oneMerge);
+        } finally {
+            mergeStatsTracker.afterMerge(tookMS, totalNumDocs, totalSizeInBytes);
+            activeMerges.decrementAndGet();
+            executeMerge();
+        }
+    });
+}
Suggestion importance[1-10]: 8

__

Why: When isShutdown is true, the task returns early without decrementing activeMerges or calling onMergeFailure, causing a permanent counter leak that blocks all future merges. This is a significant correctness bug.

Medium
Track active count for force-merge tasks

The forceMerge method checks activeMerges.get() > 0 and throws, but this check is
not atomic with the subsequent merge submission. A background merge could start
between the check and the task submission, violating the guard. Additionally,
activeMerges is never incremented for force-merge tasks, so concurrent force merges
are not tracked and the guard will never block them either.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [135-154]

+public void forceMerge(int maxNumSegment) throws IOException {
+    if (activeMerges.get() > 0) {
+        logger.warn("Cannot force merge while background merges are active");
+        throw new IllegalStateException("Cannot force merge while background merges are active");
+    }
+    Collection<OneMerge> oneMerges = mergeHandler.findForceMerges(maxNumSegment);
 
+    for (OneMerge oneMerge : oneMerges) {
+        activeMerges.incrementAndGet();
+        threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(() -> {
+            try {
+                MergeResult mergeResult = mergeHandler.doMerge(oneMerge);
+                applyMergeChanges.accept(mergeResult, oneMerge);
+                mergeHandler.onMergeFinished(oneMerge);
+            } catch (Exception e) {
+                logger.error(new ParameterizedMessage("Force merge failed for: {}", oneMerge), e);
+                mergeHandler.onMergeFailure(oneMerge);
+            } finally {
+                activeMerges.decrementAndGet();
+            }
+        });
+    }
+}
Suggestion importance[1-10]: 7

__

Why: The forceMerge method submits tasks to the thread pool but never increments/decrements activeMerges, so the guard check activeMerges.get() > 0 will never block concurrent force merges and stats tracking is incomplete. This is a real correctness issue.

Medium
Fix unsafe cast in exception handler

The cast (IOException) e will throw a ClassCastException if the exception is neither
a RuntimeException nor an IOException (e.g., a checked non-IO exception). You should
handle this case explicitly, or use UncheckedIOException only when the exception is
actually an IOException.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMergeExecutor.java [65-69]

 } catch (Exception e) {
     completed.forEach(FormatMergeResult::cleanup);
     if (e instanceof RuntimeException re) throw re;
-    throw new UncheckedIOException((IOException) e);
+    if (e instanceof IOException ioe) throw new UncheckedIOException(ioe);
+    throw new RuntimeException(e);
 }
Suggestion importance[1-10]: 7

__

Why: The cast (IOException) e will throw a ClassCastException if the exception is neither a RuntimeException nor an IOException. The fix correctly handles this edge case by using instanceof pattern matching before casting.

Medium
Avoid shared mutable byte array across segment wrappers

All SegmentWrapper instances share the same DUMMY_ID byte array. Lucene's
SegmentInfo may store a reference to this array and could mutate it internally,
causing all wrappers to share corrupted or identical IDs. Each wrapper should use
its own copy of the ID array.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/DataFormatAwareMergePolicy.java [274-307]

-private static class SegmentWrapper extends SegmentCommitInfo {
-    private static final byte[] DUMMY_ID = new byte[16];
+public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
+    super(
+        new org.apache.lucene.index.SegmentInfo(
+            directory,
+            Version.LATEST,
+            Version.LATEST,
+            "segment_" + segment.generation(),
+            (int) Math.min(totalNumDocs, Integer.MAX_VALUE),
+            false,
+            false,
+            Codec.getDefault(),
+            EMPTY_DIAGNOSTICS,
+            new byte[16],  // unique copy per instance
+            EMPTY_ATTRIBUTES,
+            null
+        ),
+        0, 0, 0, -1, -1,
+        new byte[16]  // unique copy per instance
+    );
+    this.generation = segment.generation();
+    this.totalSizeBytes = totalSizeBytes;
+}
Suggestion importance[1-10]: 5

__

Why: Sharing a single DUMMY_ID byte array across all SegmentWrapper instances could be problematic if Lucene mutates it internally, but in practice Lucene's SegmentInfo stores the reference without mutation. The risk is real but low in practice.

Low
Guard against null primary data format access

The no-arg constructor sets primaryDataFormat to null, but getPrimaryDataFormat()
returns it without null-checking. Any code calling getPrimaryDataFormat() on a
default-constructed CompositeDataFormat (e.g., in CompositeMerger) will get a
NullPointerException. Consider deprecating or removing this constructor, or throwing
an exception if getPrimaryDataFormat() is called when primaryDataFormat is null.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java [46-49]

+/**
+ * @deprecated Use {@link #CompositeDataFormat(DataFormat, List)} instead.
+ */
+@Deprecated
 public CompositeDataFormat() {
     this.primaryDataFormat = null;
     this.dataFormats = List.of();
 }
 
+public DataFormat getPrimaryDataFormat() {
+    if (primaryDataFormat == null) {
+        throw new IllegalStateException("No primary data format configured");
+    }
+    return primaryDataFormat;
+}
+
Suggestion importance[1-10]: 5

__

Why: The no-arg constructor sets primaryDataFormat to null, which could cause NullPointerException when getPrimaryDataFormat() is called. However, the no-arg constructor appears to be a legacy/empty constructor and the improved_code adds both a deprecation annotation and modifies getPrimaryDataFormat() which is outside the existing_code snippet scope.

Low
General
Validate consistent generation across all formats

The generation is derived from the first entry in the map, but HashMap has no
guaranteed iteration order. If different WriterFileSet entries have different
writerGeneration values, the resulting segment's generation is non-deterministic.
Consider using the generation from a specific (e.g., primary) format's
WriterFileSet, or validate that all generations are consistent.

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java [329-339]

 private Segment getSegment(Map<DataFormat, WriterFileSet> writerFileSetMap) {
     if (writerFileSetMap.isEmpty()) {
         throw new IllegalArgumentException("writerFileSetMap must not be empty");
     }
-    long generation = writerFileSetMap.values().iterator().next().writerGeneration();
+    long generation = -1;
+    for (WriterFileSet wfs : writerFileSetMap.values()) {
+        if (generation == -1) {
+            generation = wfs.writerGeneration();
+        } else if (generation != wfs.writerGeneration()) {
+            throw new IllegalArgumentException(
+                "Inconsistent writer generations in writerFileSetMap: expected " + generation + " but found " + wfs.writerGeneration()
+            );
+        }
+    }
     Segment.Builder segment = Segment.builder(generation);
     for (Map.Entry<DataFormat, WriterFileSet> entry : writerFileSetMap.entrySet()) {
         segment.addSearchableFiles(entry.getKey(), entry.getValue());
     }
     return segment.build();
 }
Suggestion importance[1-10]: 6

__

Why: The generation is derived from the first map entry, but HashMap has no guaranteed iteration order. If WriterFileSet entries have different writerGeneration values, the segment generation would be non-deterministic. Validating consistency across all entries prevents subtle bugs.

Low
Filter mock merger files by data format

When a segment contains files for multiple data formats, all WriterFileSet values
are added indiscriminately. This may cause the mock merger to process files from
unrelated formats, leading to incorrect merge results in tests. The mock should
filter by the relevant dataFormat to match the expected behavior.

test/framework/src/main/java/org/opensearch/index/engine/dataformat/stub/MockMerger.java [39-42]

 List<WriterFileSet> fileMetadataList = new ArrayList<>();
 for (Segment segment : mergeInput.segments()) {
-    fileMetadataList.addAll(segment.dfGroupedSearchableFiles().values());
+    WriterFileSet wfs = segment.dfGroupedSearchableFiles().get(dataFormat.name());
+    if (wfs != null) {
+        fileMetadataList.add(wfs);
+    }
 }
Suggestion importance[1-10]: 5

__

Why: The current implementation adds all WriterFileSet values from all formats in a segment, which could cause incorrect behavior in multi-format tests. Filtering by dataFormat.name() would make the mock more accurate and consistent with expected single-format merger behavior.

Low
Preserve IOException type from merge candidate selection

The IOException thrown by mergePolicy.findMergeCandidates is caught and re-wrapped
in a RuntimeException, losing the original checked exception type. Callers that need
to handle IOException specifically (e.g., for retry logic) cannot do so. The method
signature should declare throws IOException and rethrow it directly, or at minimum
preserve the cause type.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [80-93]

-public Collection<OneMerge> findMerges() {
+public Collection<OneMerge> findMerges() throws IOException {
     List<OneMerge> oneMerges = new ArrayList<>();
     try (GatedCloseable<CatalogSnapshot> catalogSnapshotRef = snapshotSupplier.get()) {
         List<Segment> segmentList = catalogSnapshotRef.get().getSegments();
         List<List<Segment>> mergeCandidates = mergePolicy.findMergeCandidates(segmentList);
         for (List<Segment> mergeGroup : mergeCandidates) {
             oneMerges.add(new OneMerge(mergeGroup));
         }
+    } catch (IOException e) {
+        logger.warn("Failed to find merge candidates", e);
+        throw e;
     } catch (Exception e) {
         logger.warn("Failed to acquire snapshots", e);
         throw new RuntimeException(e);
     }
     return oneMerges;
 }
Suggestion importance[1-10]: 4

__

Why: Wrapping IOException in RuntimeException loses the checked exception type, making it harder for callers to handle I/O errors specifically. However, since the callers (triggerMerges, findAndRegisterMerges) don't currently declare throws IOException, changing the signature would require broader refactoring.

Low
Suggestions up to commit 50da490
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix activeMerges tracking in force merge

The forceMerge method submits tasks asynchronously to the thread pool but declares
throws IOException, which is misleading since no IOException is actually thrown from
this method — the IOException from mergeHandler.doMerge is caught inside the lambda.
Additionally, the method does not register the force-merge tasks with activeMerges,
so the guard activeMerges.get() > 0 will never prevent concurrent force merges from
being submitted simultaneously. The activeMerges counter should be incremented
before submitting each force-merge task and decremented in the finally block.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [135-154]

-public void forceMerge(int maxNumSegment) throws IOException {
+public void forceMerge(int maxNumSegment) {
     if (activeMerges.get() > 0) {
+        logger.warn("Cannot force merge while background merges are active");
+        throw new IllegalStateException("Cannot force merge while background merges are active");
+    }
+    Collection<OneMerge> oneMerges = mergeHandler.findForceMerges(maxNumSegment);
 
+    for (OneMerge oneMerge : oneMerges) {
+        activeMerges.incrementAndGet();
+        threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(() -> {
+            try {
+                MergeResult mergeResult = mergeHandler.doMerge(oneMerge);
+                applyMergeChanges.accept(mergeResult, oneMerge);
+                mergeHandler.onMergeFinished(oneMerge);
+            } catch (Exception e) {
+                logger.error(new ParameterizedMessage("Force merge failed for: {}", oneMerge), e);
+                mergeHandler.onMergeFailure(oneMerge);
+            } finally {
+                activeMerges.decrementAndGet();
+            }
+        });
+    }
+}
+
Suggestion importance[1-10]: 7

__

Why: The forceMerge method doesn't track activeMerges for submitted tasks, so the guard activeMerges.get() > 0 won't prevent concurrent force merges. Also, declaring throws IOException is misleading since no IOException propagates from this method. Both are real correctness issues.

Medium
Use unique IDs per segment wrapper instance

All SegmentWrapper instances share the same static DUMMY_ID byte array for both the
SegmentInfo ID and the SegmentCommitInfo ID. Lucene uses the segment ID for identity
and deduplication internally; sharing the same ID array across all wrappers may
cause the merge policy to treat distinct segments as identical, leading to incorrect
merge candidate selection or silent data corruption. Each wrapper should use a
unique ID.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/DataFormatAwareMergePolicy.java [274-307]

 private static class SegmentWrapper extends SegmentCommitInfo {
-    private static final byte[] DUMMY_ID = new byte[16];
-    ...
+    private static final Map<String, String> EMPTY_DIAGNOSTICS = Map.of();
+    private static final Map<String, String> EMPTY_ATTRIBUTES = Map.of();
+
+    private final long generation;
+    private final long totalSizeBytes;
+
     public SegmentWrapper(Directory directory, Segment segment, long totalSizeBytes, long totalNumDocs) {
         super(
             new org.apache.lucene.index.SegmentInfo(
                 directory,
-                ...
-                DUMMY_ID,
+                Version.LATEST,
+                Version.LATEST,
+                "segment_" + segment.generation(),
+                (int) Math.min(totalNumDocs, Integer.MAX_VALUE),
+                false,
+                false,
+                Codec.getDefault(),
+                EMPTY_DIAGNOSTICS,
+                generateUniqueId(segment.generation()),
                 EMPTY_ATTRIBUTES,
                 null
             ),
             0,
             0,
             0,
             -1,
             -1,
-            DUMMY_ID
+            generateUniqueId(segment.generation())
         );
+        this.generation = segment.generation();
+        this.totalSizeBytes = totalSizeBytes;
+    }
 
+    private static byte[] generateUniqueId(long generation) {
+        byte[] id = new byte[16];
+        for (int i = 0; i < 8; i++) {
+            id[i] = (byte) (generation >>> (i * 8));
+        }
+        return id;
+    }
+
Suggestion importance[1-10]: 7

__

Why: Sharing the same static DUMMY_ID byte array across all SegmentWrapper instances could cause Lucene's merge policy to treat distinct segments as identical, potentially leading to incorrect merge candidate selection. Using generation-based unique IDs is a valid fix for this correctness concern.

Medium
Prevent unsafe cast causing ClassCastException

The cast (IOException) e will throw a ClassCastException if the exception is neither
a RuntimeException nor an IOException (e.g., a checked non-IO exception). Use
instanceof check before casting, or wrap in a more robust handler to avoid masking
the original error.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMergeExecutor.java [65-69]

 } catch (Exception e) {
     completed.forEach(FormatMergeResult::cleanup);
     if (e instanceof RuntimeException re) throw re;
-    throw new UncheckedIOException((IOException) e);
+    if (e instanceof IOException ioe) throw new UncheckedIOException(ioe);
+    throw new RuntimeException(e);
 }
Suggestion importance[1-10]: 7

__

Why: The cast (IOException) e is unsafe if the exception is neither a RuntimeException nor an IOException. The fix correctly adds an instanceof check before casting and falls back to wrapping in RuntimeException, preventing a potential ClassCastException that would mask the original error.

Medium
Fix race condition in concurrent merge limit enforcement

The executeMerge method checks activeMerges.get() < maxConcurrentMerges and
mergeHandler.hasPendingMerges() without synchronization, but getNextMerge() is
synchronized on mergeHandler. This creates a TOCTOU race: between the
hasPendingMerges() check and getNextMerge(), another thread could drain the queue,
yet getNextMerge() returning null is already handled. However, activeMerges is
incremented inside submitMergeTask (after the while-condition check), so multiple
threads calling executeMerge concurrently could each pass the activeMerges <
maxConcurrentMerges check and submit more tasks than maxConcurrentMerges allows. The
increment should happen before submitting to enforce the limit correctly.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [198-210]

 private void executeMerge() {
-    while (activeMerges.get() < maxConcurrentMerges && mergeHandler.hasPendingMerges()) {
+    while (mergeHandler.hasPendingMerges()) {
+        int current = activeMerges.get();
+        if (current >= maxConcurrentMerges) {
+            return;
+        }
+        if (!activeMerges.compareAndSet(current, current + 1)) {
+            continue;
+        }
         OneMerge oneMerge = mergeHandler.getNextMerge();
         if (oneMerge == null) {
+            activeMerges.decrementAndGet();
             return;
         }
         try {
-            submitMergeTask(oneMerge);
+            submitMergeTaskWithPreIncrementedCounter(oneMerge);
         } catch (Exception e) {
+            activeMerges.decrementAndGet();
             mergeHandler.onMergeFailure(oneMerge);
         }
     }
 }
Suggestion importance[1-10]: 6

__

Why: The TOCTOU race between checking activeMerges < maxConcurrentMerges and incrementing inside submitMergeTask can allow more concurrent merges than maxConcurrentMerges when multiple threads call executeMerge simultaneously. The suggested CAS-based approach is a valid fix, though the improved_code references a non-existent submitMergeTaskWithPreIncrementedCounter method.

Low
Guard against null primary data format access

The no-arg constructor sets primaryDataFormat to null, but getPrimaryDataFormat()
returns it without null-checking. Code that calls getPrimaryDataFormat() on an
instance created via the no-arg constructor (e.g., in CompositeMerger) will get a
NullPointerException. Consider either removing the no-arg constructor or throwing an
UnsupportedOperationException from getPrimaryDataFormat() when primaryDataFormat is
null.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java [46-49]

 public CompositeDataFormat() {
     this.primaryDataFormat = null;
     this.dataFormats = List.of();
 }
 
+public DataFormat getPrimaryDataFormat() {
+    if (primaryDataFormat == null) {
+        throw new IllegalStateException("No primary data format configured for this CompositeDataFormat");
+    }
+    return primaryDataFormat;
+}
+
Suggestion importance[1-10]: 5

__

Why: The no-arg constructor sets primaryDataFormat to null, which could cause a NullPointerException when getPrimaryDataFormat() is called. However, the improved_code shows changes to both the constructor and getPrimaryDataFormat() method, while existing_code only covers the constructor, making the suggestion partially misaligned with the diff structure.

Low
General
Filter segments by format in mock merger

dfGroupedSearchableFiles() returns a map keyed by format name, and adding all values
indiscriminately may include WriterFileSet entries from multiple formats per
segment, leading to incorrect merge behavior in tests that use multiple formats. The
mock should filter by the specific dataFormat it handles.

test/framework/src/main/java/org/opensearch/index/engine/dataformat/stub/MockMerger.java [39-42]

 List<WriterFileSet> fileMetadataList = new ArrayList<>();
 for (Segment segment : mergeInput.segments()) {
-    fileMetadataList.addAll(segment.dfGroupedSearchableFiles().values());
+    WriterFileSet wfs = segment.dfGroupedSearchableFiles().get(dataFormat.name());
+    if (wfs != null) {
+        fileMetadataList.add(wfs);
+    }
 }
Suggestion importance[1-10]: 6

__

Why: The current implementation adds all WriterFileSet values from all formats in each segment, which could cause incorrect behavior in multi-format tests. Filtering by dataFormat.name() ensures the mock only processes files relevant to its specific format, improving test correctness.

Low
Guard against null catalog snapshot

When catalogSnapshotRef.get() returns null (as seen in tests where new
GatedCloseable<>(null, () -> {}) is used), calling .getSegments() will throw a
NullPointerException, which is caught and re-wrapped as a RuntimeException. This
masks the real issue. A null check should be added before accessing the snapshot, or
the contract should be documented and enforced that the supplier must never return a
null snapshot.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [80-93]

 public Collection<OneMerge> findMerges() {
     List<OneMerge> oneMerges = new ArrayList<>();
     try (GatedCloseable<CatalogSnapshot> catalogSnapshotRef = snapshotSupplier.get()) {
-        List<Segment> segmentList = catalogSnapshotRef.get().getSegments();
+        CatalogSnapshot snapshot = catalogSnapshotRef.get();
+        if (snapshot == null) {
+            return oneMerges;
+        }
+        List<Segment> segmentList = snapshot.getSe...

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 5, 2026

❌ Gradle check result for 458a9b7: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@darjisagar7 darjisagar7 closed this Apr 6, 2026
@darjisagar7 darjisagar7 reopened this Apr 6, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 6, 2026

Persistent review updated to latest commit 458a9b7

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 6, 2026

❌ Gradle check result for 458a9b7: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 7, 2026

Persistent review updated to latest commit 518afc9

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for da11981: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 50da490

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 50da490: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 600e264

Sagar Darji and others added 6 commits April 27, 2026 15:52
Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
Signed-off-by: Sagar Darji <darsaga@amazon.com>
…sponsibilities:

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Sagar Darji <darsaga@amazon.com>
Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java
#	server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java
#	server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java

# Conflicts:
#	server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4d3b869

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 4d3b869: SUCCESS

Signed-off-by: Sagar Darji <darsaga@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4790673

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 4790673: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 4790673: SUCCESS

@mgodwan mgodwan merged commit 4432538 into opensearch-project:main Apr 28, 2026
19 of 21 checks passed
krishna-ggk pushed a commit to krishna-ggk/OpenSearch that referenced this pull request Apr 28, 2026
…ject#21128)

* Adding CompositeMergeHandler and CompositeMergePolicy

Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

* Addressing comments

Signed-off-by: Sagar Darji <darsaga@amazon.com>

* Split the monolithic CompositeMergeHandler into classes with clear responsibilities:

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>

* Fix up tests

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>

* Addressing commits

Signed-off-by: Sagar Darji <darsaga@amazon.com>

* Integrating the merge flow with the DataFormatAwareEngine

Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java
#	server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java
#	server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java

# Conflicts:
#	server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java

* Addressed the comments

Signed-off-by: Sagar Darji <darsaga@amazon.com>

---------

Signed-off-by: Sagar Darji <darsaga@amazon.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Co-authored-by: Sagar Darji <darsaga@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
imRishN pushed a commit to imRishN/OpenSearch that referenced this pull request May 8, 2026
…ject#21128)

* Adding CompositeMergeHandler and CompositeMergePolicy

Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

* Addressing comments

Signed-off-by: Sagar Darji <darsaga@amazon.com>

* Split the monolithic CompositeMergeHandler into classes with clear responsibilities:

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>

* Fix up tests

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>

* Addressing commits

Signed-off-by: Sagar Darji <darsaga@amazon.com>

* Integrating the merge flow with the DataFormatAwareEngine

Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java
#	server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java
#	server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java

# Conflicts:
#	server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java

* Addressed the comments

Signed-off-by: Sagar Darji <darsaga@amazon.com>

---------

Signed-off-by: Sagar Darji <darsaga@amazon.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Co-authored-by: Sagar Darji <darsaga@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants