Adding CompositeMergeHandler and CompositeMergePolicy#21128
Adding CompositeMergeHandler and CompositeMergePolicy#21128mgodwan merged 7 commits intoopensearch-project:mainfrom
Conversation
PR Reviewer Guide 🔍(Review updated until commit 4790673)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 4790673 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 4d3b869
Suggestions up to commit 600e264
Suggestions up to commit 50da490
|
|
❌ Gradle check result for 458a9b7: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit 458a9b7 |
|
❌ Gradle check result for 458a9b7: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
458a9b7 to
518afc9
Compare
|
Persistent review updated to latest commit 518afc9 |
da11981 to
50da490
Compare
|
Persistent review updated to latest commit 50da490 |
|
❌ Gradle check result for 50da490: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
50da490 to
600e264
Compare
|
Persistent review updated to latest commit 600e264 |
Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
Signed-off-by: Sagar Darji <darsaga@amazon.com>
…sponsibilities: Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Sagar Darji <darsaga@amazon.com>
Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java # server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java # server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java
600e264 to
4d3b869
Compare
|
Persistent review updated to latest commit 4d3b869 |
Signed-off-by: Sagar Darji <darsaga@amazon.com>
4d3b869 to
4790673
Compare
|
Persistent review updated to latest commit 4790673 |
|
❌ Gradle check result for 4790673: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
…ject#21128) * Adding CompositeMergeHandler and CompositeMergePolicy Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java * Addressing comments Signed-off-by: Sagar Darji <darsaga@amazon.com> * Split the monolithic CompositeMergeHandler into classes with clear responsibilities: Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Fix up tests Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Addressing commits Signed-off-by: Sagar Darji <darsaga@amazon.com> * Integrating the merge flow with the DataFormatAwareEngine Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java # server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java # server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java * Addressed the comments Signed-off-by: Sagar Darji <darsaga@amazon.com> --------- Signed-off-by: Sagar Darji <darsaga@amazon.com> Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Sagar Darji <darsaga@amazon.com> Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
…ject#21128) * Adding CompositeMergeHandler and CompositeMergePolicy Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java * Addressing comments Signed-off-by: Sagar Darji <darsaga@amazon.com> * Split the monolithic CompositeMergeHandler into classes with clear responsibilities: Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Fix up tests Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Addressing commits Signed-off-by: Sagar Darji <darsaga@amazon.com> * Integrating the merge flow with the DataFormatAwareEngine Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java # server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java # server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java * Addressed the comments Signed-off-by: Sagar Darji <darsaga@amazon.com> --------- Signed-off-by: Sagar Darji <darsaga@amazon.com> Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Sagar Darji <darsaga@amazon.com> Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
Add Segment Merge Infrastructure for Data-Format-Aware Engine
Description
This PR introduces a complete segment merge pipeline for the pluggable data-format-aware engine (
DataFormatAwareEngine), enabling background and forced merge operations across single and composite (multi-format) data formats. Previously, the data-format engine could only index and refresh — segments accumulated indefinitely with no compaction.The merge infrastructure reuses Lucene's battle-tested
TieredMergePolicyfor candidate selection while keeping the actual merge execution pluggable per data format. For composite indices (e.g., Parquet + Lucene), a two-phase merge ensures cross-format row-ID consistency.Architecture Overview
Sequence Diagrams
1. Background Merge Trigger (after refresh)
sequenceDiagram participant E as DataFormatAwareEngine participant MS as MergeScheduler participant MH as MergeHandler participant MP as DataFormatAwareMergePolicy participant CSM as CatalogSnapshotManager participant TP as ThreadPool (merge) E->>MS: triggerPossibleMerges() MS->>MH: findAndRegisterMerges() MH->>CSM: acquireSnapshot() CSM-->>MH: CatalogSnapshot (segments) MH->>MP: findMergeCandidates(segments) Note over MP: Wrap Segments as<br/>SegmentWrapper shims MP->>MP: TieredMergePolicy.findMerges() MP-->>MH: List<List<Segment>> candidates loop For each merge candidate MH->>MH: validate (no overlap with<br/>in-progress merges) MH->>MH: registerMerge(oneMerge)<br/>add to pending queue,<br/>lock segments end MH-->>MS: return loop While activeMerges < max && hasPending MS->>MH: getNextMerge() MH-->>MS: OneMerge MS->>TP: submitMergeTask(oneMerge) end2. Merge Task Execution (on merge thread pool)
sequenceDiagram participant MT as Merge Thread participant MH as MergeHandler participant M as Merger (format) participant E as Engine (applyMergeChanges) participant CSM as CatalogSnapshotManager participant RM as ReaderManagers MT->>MT: mergeStatsTracker.beforeMerge() MT->>MH: doMerge(oneMerge) MH->>M: merger.merge(MergeInput) Note over M: Format-specific merge:<br/>compact segment files M-->>MH: MergeResult MH-->>MT: MergeResult MT->>E: applyMergeChanges(result, oneMerge) Note over E: refreshLock.lock() E->>CSM: applyMergeResults(result, oneMerge) Note over CSM: Replace source segments<br/>with merged segment CSM->>CSM: commitNewSnapshot(updatedSegments) E->>RM: refreshListeners() — notify readers Note over E: refreshLock.unlock() E-->>MT: return MT->>MH: onMergeFinished(oneMerge) Note over MH: removeMergingSegments()<br/>findAndRegisterMerges()<br/>(check for new candidates) MH-->>MT: return MT->>MT: mergeStatsTracker.afterMerge() MT->>MT: activeMerges-- MT->>MT: executeMerge() (drain more if pending)3. Composite Merge (multi-format orchestration)
sequenceDiagram participant CM as CompositeMerger participant CE as CompositeMergeExecutor participant PM as Primary Merger<br/>(e.g. Parquet) participant SM as Secondary Merger<br/>(e.g. Lucene) CM->>CM: extractFilesByFormat()<br/>partition segment files per format CM->>CM: build MergePlan CM->>CE: execute(plan) Note over CE: Phase 1: Primary format CE->>PM: merge(primaryFiles, rowIdMapping=null) Note over PM: Compact primary segments PM-->>CE: FormatMergeResult<br/>(mergedFiles + rowIdMapping) Note over CE: Extract RowIdMapping<br/>from primary result Note over CE: Phase 2: Secondary formats CE->>SM: merge(secondaryFiles, rowIdMapping) Note over SM: Merge using rowIdMapping<br/>for cross-format consistency SM-->>CE: FormatMergeResult CE->>CE: Combine all FormatMergeResults<br/>into single MergeResult CE-->>CM: MergeResult (all formats merged)4. Failure & Cleanup Path
sequenceDiagram participant CE as CompositeMergeExecutor participant PM as Primary Merger participant SM as Secondary Merger participant MS as MergeScheduler participant MH as MergeHandler CE->>PM: merge primary PM-->>CE: FormatMergeResult ✓ CE->>SM: merge secondary SM--xCE: IOException ✗ Note over CE: Catch: cleanup all<br/>completed results CE->>CE: primaryResult.cleanup()<br/>(delete merged output files) CE->>CE: re-throw exception Note over MS: Back in MergeScheduler MS->>MS: log error MS->>MH: onMergeFailure(oneMerge) Note over MH: removeMergingSegments()<br/>(unlock segments for<br/>future merge attempts) MH-->>MS: returnKey Changes
Server (core merge framework)
MergeHandlerMergePolicyinterface, segment locking to prevent double-merging, and lifecycle callbacks (onMergeFinished/onMergeFailure).MergeSchedulerMergeSchedulerConfig), submits merge tasks to the newThreadPool.Names.MERGEscaling thread pool, and tracks stats.DataFormatAwareMergePolicyTieredMergePolicyto the data-formatSegmentmodel by wrapping segments asSegmentCommitInfoshims (SegmentWrapper). Also implementsMergeListenerto track currently-merging segments.MergeStatsTrackerOpenSearchConcurrentMergeScheduler(Lucene path) and the newMergeScheduler.MergeFailedEngineExceptionShardId.CatalogSnapshotManagerapplyMergeResults()— atomically replaces source segments with the merged segment in the catalog snapshot.DataFormatAwareEngineMergeHandler+MergeSchedulerat construction, callstriggerPossibleMerges()after each refresh, and applies merge results under the refresh lock. Gated behindopensearch.pluggable.dataformat.merge.enabledsystem property.MergeInputrowIdMappingandnewWriterGenerationfields for composite merge support.ThreadPoolNames.MERGEscaling thread pool for background merge execution.Composite Engine (multi-format merge orchestration)
CompositeMergerMergerthat extracts per-format file lists from segments and delegates toCompositeMergeExecutor.CompositeMergeExecutorRowIdMapping, then merges each secondary format using that mapping. On failure, cleans up all completed format results.MergePlanFormatMergeResultDeleted
CompositeMergeHandlerCompositeMerger+CompositeMergeExecutor(separation of concerns).CompositeMergePolicyTestsDataFormatAwareMergePolicyTests(moved to server module).Merge Flow Summary
refresh(), the engine callstriggerPossibleMerges().MergeSchedulerasksMergeHandlerto find and register merge candidates.MergeHandleracquires a catalog snapshot, delegates toDataFormatAwareMergePolicywhich wraps segments as LuceneSegmentCommitInfoand callsTieredMergePolicy.findMerges().MergeSchedulerdrains the queue up tomaxConcurrentMerges, submitting each to themergethread pool.MergeHandler.doMerge()→Merger.merge()(orCompositeMergerfor multi-format).applyMergeChanges()atomically updates the catalog snapshot and notifies reader managers.onMergeFailure()releases the merging-segment locks and logs the error.Tests
CompositeMergeITDataFormatAwareMergePolicyTestsSegmentWrapperidentity.CompositeMergerTestsMergeFailedEngineExceptionTestsCatalogSnapshotManagerTestsapplyMergeResultssegment replacement logic.MergeTestsMergeHandler/MergeSchedulersplit coverage.Configuration
Merge is gated behind a system property (disabled by default while format-specific merge implementations are being completed):
Related Issues
Resolves #[Issue number to be closed when this PR is merged]