Skip to content

[DFAE] wait force merge for ongoing merges#22261

Open
mgodwan wants to merge 2 commits into
opensearch-project:mainfrom
mgodwan:converge_force_merge
Open

[DFAE] wait force merge for ongoing merges#22261
mgodwan wants to merge 2 commits into
opensearch-project:mainfrom
mgodwan:converge_force_merge

Conversation

@mgodwan

@mgodwan mgodwan commented Jun 21, 2026

Copy link
Copy Markdown
Member

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions

github-actions Bot commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 217e1cf)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Deadlock

The drainCondition.await(1, ...) call at line 221 occurs while holding drainLock, but the corresponding signalAll() at line 447 also requires drainLock. If the wait times out and re-checks hasOverlappingMerges while a merge thread is between decrementing activeMerges and acquiring drainLock to signal, the force merge thread may loop indefinitely without being woken. This can cause force merge to hang if overlapping merges complete during the 1-second wait window but before the signal is sent.

        drainLock.lock();
        try {
            drainCondition.await(1, java.util.concurrent.TimeUnit.SECONDS);
        } finally {
            drainLock.unlock();
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        logger.warn("Force merge interrupted while waiting for overlapping merges");
        return;
    }
}
Race Condition

reserveMergeOutputIfNeeded at line 323 checks reservedGenerations.isEmpty() without synchronization, then enters a synchronized block. If releaseReservation() is called concurrently between the check and the synchronized block, the method may incorrectly modify a cleared reservedGenerations set, causing force merge to track stale generations and potentially wait indefinitely for merges that have already completed.

public synchronized void reserveMergeOutputIfNeeded(OneMerge source, MergeResult result) {
    if (reservedGenerations.isEmpty()) {
        return;
    }
    boolean overlaps = source.getSegmentsToMerge().stream().anyMatch(seg -> reservedGenerations.contains(seg.generation()));
    if (overlaps) {
        // Remove consumed source generations
        for (Segment seg : source.getSegmentsToMerge()) {
            reservedGenerations.remove(seg.generation());
        }
        // Add output generation from merge result
        Map<DataFormat, WriterFileSet> mergedFiles = result.getMergedWriterFileSet();
        if (!mergedFiles.isEmpty()) {
            long outputGeneration = mergedFiles.values().iterator().next().writerGeneration();
            reservedGenerations.add(outputGeneration);
        }
    }
}
Missing Cleanup

If findForceMerges at line 233 throws an exception, the reservation is released in the finally block at line 262, but the force merge lock (forceMergeLock) is not released until the outer finally at line 265. If the exception propagates, the scheduler may be left in an inconsistent state where frozen is true but no merges were registered, blocking future operations until an explicit unfreeze.

        Collection<OneMerge> oneMerges = mergeHandler.findForceMerges(maxNumSegment);
        List<OneMerge> pending = new ArrayList<>(oneMerges);

        // 5. Release reservation — background merges on new segments can proceed freely
        mergeHandler.releaseReservation();

        // 6. Execute force merges
        int executed = 0;
        for (int i = 0; i < pending.size(); i++) {
            if (isShutdown.get()) {
                logger.warn("MergeScheduler shutdown during force merge, cleaning up remaining {} merges", pending.size() - i);
                for (int j = i; j < pending.size(); j++) {
                    mergeHandler.onMergeFailure(pending.get(j));
                }
                break;
            }
            try {
                runMerge(pending.get(i));
                executed++;
            } catch (Exception e) {
                for (int j = i + 1; j < pending.size(); j++) {
                    mergeHandler.onMergeFailure(pending.get(j));
                }
                throw e;
            }
        }
        logger.debug("Force merge completed: executed {} of {} merge groups", executed, pending.size());
    } finally {
        // Defensive: ensure reservation is always cleared on any exit path
        mergeHandler.releaseReservation();
    }
} finally {
    forceMergeLock.release();

@github-actions

github-actions Bot commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 217e1cf

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix lock acquisition pattern

The lock acquisition should occur outside the loop to prevent repeated lock/unlock
cycles. Move drainLock.lock() before the while loop and ensure drainLock.unlock() is
in a finally block after the loop completes. This prevents potential deadlocks and
improves performance by holding the lock for the entire wait period.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [213-230]

-while (mergeHandler.hasOverlappingMerges(reservedSegments)) {
-    if (isShutdown.get()) {
-        logger.debug("MergeScheduler shutdown while waiting for overlapping merges");
-        return;
-    }
-    try {
-        drainLock.lock();
+drainLock.lock();
+try {
+    while (mergeHandler.hasOverlappingMerges(reservedSegments)) {
+        if (isShutdown.get()) {
+            logger.debug("MergeScheduler shutdown while waiting for overlapping merges");
+            return;
+        }
         try {
             drainCondition.await(1, java.util.concurrent.TimeUnit.SECONDS);
-        } finally {
-            drainLock.unlock();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Force merge interrupted while waiting for overlapping merges");
+            return;
         }
-    } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.warn("Force merge interrupted while waiting for overlapping merges");
-        return;
     }
+} finally {
+    drainLock.unlock();
 }
Suggestion importance[1-10]: 9

__

Why: The current implementation acquires and releases the lock on every iteration, which is inefficient and can cause race conditions. The drainCondition.await() requires the lock to be held when called, and the current pattern violates proper condition variable usage. Moving lock acquisition outside the loop is critical for correctness.

High
General
Handle multiple format generations

The method assumes all formats in mergedFiles share the same writer generation, but
only checks the first one. If different formats have different generations, this
could cause incorrect reservation tracking. Verify all formats have the same
generation or handle multiple generations appropriately.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [323-340]

 public synchronized void reserveMergeOutputIfNeeded(OneMerge source, MergeResult result) {
     if (reservedGenerations.isEmpty()) {
         return;
     }
     boolean overlaps = source.getSegmentsToMerge().stream().anyMatch(seg -> reservedGenerations.contains(seg.generation()));
     if (overlaps) {
         // Remove consumed source generations
         for (Segment seg : source.getSegmentsToMerge()) {
             reservedGenerations.remove(seg.generation());
         }
-        // Add output generation from merge result
+        // Add output generation(s) from merge result
         Map<DataFormat, WriterFileSet> mergedFiles = result.getMergedWriterFileSet();
-        if (!mergedFiles.isEmpty()) {
-            long outputGeneration = mergedFiles.values().iterator().next().writerGeneration();
-            reservedGenerations.add(outputGeneration);
+        for (WriterFileSet fileSet : mergedFiles.values()) {
+            reservedGenerations.add(fileSet.writerGeneration());
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The method only reserves the generation from the first WriterFileSet in mergedFiles, which could be incorrect if different formats have different generations. Iterating through all formats ensures all output generations are properly tracked in reservedGenerations.

Medium
Ensure failed merge cleanup

When an exception occurs during force merge execution, the current merge that failed
(pending.get(i)) is not explicitly cleaned up via onMergeFailure. The runMerge
method may handle this, but it's safer to ensure cleanup is explicit for all failed
merges including the current one.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [252-257]

 } catch (Exception e) {
+    // Clean up current failed merge if not already handled
+    mergeHandler.onMergeFailure(pending.get(i));
+    // Clean up remaining unexecuted merges
     for (int j = i + 1; j < pending.size(); j++) {
         mergeHandler.onMergeFailure(pending.get(j));
     }
     throw e;
 }
Suggestion importance[1-10]: 3

__

Why: While runMerge likely handles cleanup for the failed merge through its exception handling, explicitly calling onMergeFailure for the current merge adds defensive redundancy. However, this may result in duplicate cleanup calls, so the impact is limited.

Low

Previous suggestions

Suggestions up to commit 24a5bc0
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix lock acquisition pattern

The lock acquisition should occur outside the while loop to avoid repeated
lock/unlock cycles. Move drainLock.lock() before the while loop and ensure
drainLock.unlock() is called in a finally block after the loop completes. This
prevents potential deadlock and improves efficiency.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [213-230]

-while (mergeHandler.hasOverlappingMerges(reservedSegments)) {
-    if (isShutdown.get()) {
-        logger.debug("MergeScheduler shutdown while waiting for overlapping merges");
-        return;
-    }
-    try {
-        drainLock.lock();
+drainLock.lock();
+try {
+    while (mergeHandler.hasOverlappingMerges(reservedSegments)) {
+        if (isShutdown.get()) {
+            logger.debug("MergeScheduler shutdown while waiting for overlapping merges");
+            return;
+        }
         try {
             drainCondition.await(1, java.util.concurrent.TimeUnit.SECONDS);
-        } finally {
-            drainLock.unlock();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Force merge interrupted while waiting for overlapping merges");
+            return;
         }
-    } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.warn("Force merge interrupted while waiting for overlapping merges");
-        return;
     }
+} finally {
+    drainLock.unlock();
 }
Suggestion importance[1-10]: 9

__

Why: The current implementation acquires and releases the lock on every iteration of the while loop, which is inefficient and could lead to race conditions. The suggested pattern of acquiring the lock once before the loop and releasing it in a finally block is the correct approach for condition variable usage and prevents potential issues.

High
General
Clean up failed merge state

When an exception occurs during force merge execution, the current merge
(pending.get(i)) that threw the exception is not cleaned up via onMergeFailure. This
could leave it in an inconsistent state in currentlyMergingSegments. Call
onMergeFailure for the current merge as well.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [252-257]

 } catch (Exception e) {
+    mergeHandler.onMergeFailure(pending.get(i));
     for (int j = i + 1; j < pending.size(); j++) {
         mergeHandler.onMergeFailure(pending.get(j));
     }
     throw e;
 }
Suggestion importance[1-10]: 8

__

Why: The current exception handling cleans up remaining merges but not the merge that actually failed (pending.get(i)). This could leave the failed merge in currentlyMergingSegments, causing state inconsistency. The suggestion correctly identifies that onMergeFailure should be called for the current merge as well.

Medium
Handle multiple format generations

The method assumes all formats in mergedFiles share the same writer generation, but
only checks the first one. If different formats have different generations, this
could cause incorrect reservation tracking. Verify all formats have the same
generation or handle multiple generations appropriately.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [323-340]

 public synchronized void reserveMergeOutputIfNeeded(OneMerge source, MergeResult result) {
     if (reservedGenerations.isEmpty()) {
         return;
     }
     boolean overlaps = source.getSegmentsToMerge().stream().anyMatch(seg -> reservedGenerations.contains(seg.generation()));
     if (overlaps) {
         // Remove consumed source generations
         for (Segment seg : source.getSegmentsToMerge()) {
             reservedGenerations.remove(seg.generation());
         }
-        // Add output generation from merge result
+        // Add output generation(s) from merge result
         Map<DataFormat, WriterFileSet> mergedFiles = result.getMergedWriterFileSet();
-        if (!mergedFiles.isEmpty()) {
-            long outputGeneration = mergedFiles.values().iterator().next().writerGeneration();
-            reservedGenerations.add(outputGeneration);
+        for (WriterFileSet fileSet : mergedFiles.values()) {
+            reservedGenerations.add(fileSet.writerGeneration());
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The current code only reserves the generation from the first WriterFileSet in the map, which could be incorrect if different formats have different generations. The suggestion to iterate over all values and add all generations is more robust and ensures correct reservation tracking across all formats.

Medium
Suggestions up to commit 1c10d1f
CategorySuggestion                                                                                                                                    Impact
Possible issue
Check condition inside synchronized block

The wait loop should check hasOverlappingMerges inside the synchronized block to
avoid race conditions. Currently, the condition is checked outside, which could lead
to missed notifications if a merge completes between the check and the wait call.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [218-220]

 synchronized (drainCondition) {
-    drainCondition.wait(1000);
+    while (mergeHandler.hasOverlappingMerges(reservedSegments) && !isShutdown.get()) {
+        drainCondition.wait(1000);
+    }
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid concurrency issue. The hasOverlappingMerges check at line 212 happens outside the synchronized block, creating a race condition where a merge could complete between the check and the wait call, potentially causing missed notifications and indefinite waiting. Moving the condition check inside the synchronized block ensures atomicity.

Medium
General
Handle all format generations consistently

The method assumes all formats in mergedFiles have the same writerGeneration, but
only checks the first one. If different formats produce different generations, this
could cause inconsistent reservation state. Verify all formats have the same
generation or handle multiple generations explicitly.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [323-340]

 public synchronized void reserveMergeOutputIfNeeded(OneMerge source, MergeResult result) {
     if (reservedGenerations.isEmpty()) {
         return;
     }
     boolean overlaps = source.getSegmentsToMerge().stream().anyMatch(seg -> reservedGenerations.contains(seg.generation()));
     if (overlaps) {
-        // Remove consumed source generations
         for (Segment seg : source.getSegmentsToMerge()) {
             reservedGenerations.remove(seg.generation());
         }
-        // Add output generation from merge result
         Map<DataFormat, WriterFileSet> mergedFiles = result.getMergedWriterFileSet();
-        if (!mergedFiles.isEmpty()) {
-            long outputGeneration = mergedFiles.values().iterator().next().writerGeneration();
-            reservedGenerations.add(outputGeneration);
+        for (WriterFileSet fileSet : mergedFiles.values()) {
+            reservedGenerations.add(fileSet.writerGeneration());
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The current implementation at line 336 only adds the first format's writerGeneration to reservedGenerations. If multiple formats exist with different generations, this could lead to incomplete reservation tracking. The suggestion to iterate all formats is more robust, though it's unclear from the PR context if multiple generations are actually possible.

Medium
Ensure cleanup on all exception paths

The initial releaseReservation() call outside the try-finally block could leave the
reservation in an inconsistent state if reserveSegmentsForForceMerge() throws an
exception. Move the initial cleanup inside the try block or ensure it's also covered
by the finally clause.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [205-210]

-// 1. Clear any stale reservation from a previous interrupted call
-mergeHandler.releaseReservation();
-
-// 2. Reserve current segments — background merges won't pick them for new merges
-Set<Segment> reservedSegments = mergeHandler.reserveSegmentsForForceMerge();
-
+Set<Segment> reservedSegments = null;
 try {
+    // 1. Clear any stale reservation from a previous interrupted call
+    mergeHandler.releaseReservation();
+    
+    // 2. Reserve current segments
+    reservedSegments = mergeHandler.reserveSegmentsForForceMerge();
+    
     // 3. Wait only for in-flight merges that overlap reserved segments
     while (mergeHandler.hasOverlappingMerges(reservedSegments)) {
Suggestion importance[1-10]: 6

__

Why: The initial releaseReservation() at line 205 is outside the try-finally block. If reserveSegmentsForForceMerge() at line 208 throws an exception, the finally block at line 257 will still execute releaseReservation(), so cleanup is guaranteed. However, moving it inside the try block improves code clarity and makes the exception handling more explicit.

Low

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1c10d1f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>
@mgodwan mgodwan force-pushed the converge_force_merge branch from 1c10d1f to 24a5bc0 Compare June 21, 2026 15:44
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 24a5bc0

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 24a5bc0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 24a5bc0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mgodwan mgodwan marked this pull request as ready for review June 22, 2026 09:16
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 217e1cf

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 217e1cf: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 217e1cf: SUCCESS

@codecov

codecov Bot commented Jun 22, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 66.27907% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.34%. Comparing base (c73dc2e) to head (217e1cf).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
...ch/index/engine/dataformat/merge/MergeHandler.java 51.28% 16 Missing and 3 partials ⚠️
.../index/engine/dataformat/merge/MergeScheduler.java 78.72% 8 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22261      +/-   ##
============================================
+ Coverage     73.32%   73.34%   +0.02%     
- Complexity    75934    76034     +100     
============================================
  Files          6075     6075              
  Lines        345282   345405     +123     
  Branches      49697    49723      +26     
============================================
+ Hits         253177   253343     +166     
- Misses        71786    71862      +76     
+ Partials      20319    20200     -119     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant