Skip to content

Commit 4432538

Browse files
darjisagar7Sagar DarjiBukhtawar
authored
Adding CompositeMergeHandler and CompositeMergePolicy (#21128)
* Adding CompositeMergeHandler and CompositeMergePolicy Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java * Addressing comments Signed-off-by: Sagar Darji <darsaga@amazon.com> * Split the monolithic CompositeMergeHandler into classes with clear responsibilities: Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Fix up tests Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Addressing commits Signed-off-by: Sagar Darji <darsaga@amazon.com> * Integrating the merge flow with the DataFormatAwareEngine Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java # server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java # server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java * Addressed the comments Signed-off-by: Sagar Darji <darsaga@amazon.com> --------- Signed-off-by: Sagar Darji <darsaga@amazon.com> Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Sagar Darji <darsaga@amazon.com> Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent 30920ea commit 4432538

35 files changed

Lines changed: 2972 additions & 282 deletions

File tree

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.common.annotation.ExperimentalApi;
2727
import org.opensearch.index.engine.dataformat.DataFormat;
2828
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
29+
import org.opensearch.index.engine.dataformat.MergeResult;
2930
import org.opensearch.index.engine.dataformat.Merger;
3031
import org.opensearch.index.engine.dataformat.RefreshInput;
3132
import org.opensearch.index.engine.dataformat.RefreshResult;
@@ -278,7 +279,8 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
278279
/** Returns {@code null} — merge scheduling is not yet implemented for the Lucene format. */
279280
@Override
280281
public Merger getMerger() {
281-
return null;
282+
// TODO: Implement merge support as ParquetMerger
283+
return mergeInput -> new MergeResult(Map.of());
282284
}
283285

284286
/**
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite;
10+
11+
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
12+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.action.admin.indices.stats.ShardStats;
14+
import org.opensearch.action.index.IndexResponse;
15+
import org.opensearch.be.lucene.LucenePlugin;
16+
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.SuppressForbidden;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.util.FeatureFlags;
20+
import org.opensearch.core.rest.RestStatus;
21+
import org.opensearch.index.IndexSettings;
22+
import org.opensearch.index.engine.CommitStats;
23+
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
24+
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
25+
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
26+
import org.opensearch.index.engine.dataformat.stub.MockDataFormat;
27+
import org.opensearch.index.engine.dataformat.stub.MockDataFormatPlugin;
28+
import org.opensearch.index.engine.dataformat.stub.MockReaderManager;
29+
import org.opensearch.index.engine.exec.EngineReaderManager;
30+
import org.opensearch.index.engine.exec.coord.DataformatAwareCatalogSnapshot;
31+
import org.opensearch.index.merge.MergeStats;
32+
import org.opensearch.index.store.PrecomputedChecksumStrategy;
33+
import org.opensearch.plugins.Plugin;
34+
import org.opensearch.plugins.SearchBackEndPlugin;
35+
import org.opensearch.test.OpenSearchIntegTestCase;
36+
37+
import java.io.IOException;
38+
import java.util.Arrays;
39+
import java.util.Collection;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
43+
import java.util.function.Function;
44+
45+
/**
46+
* Integration tests for composite merge operations across single and multiple data format engines.
47+
*
48+
* Requires JDK 25 and sandbox enabled. Run with:
49+
* ./gradlew :sandbox:plugins:composite-engine:internalClusterTest \
50+
* --tests "*.CompositeMergeIT" \
51+
* -Dsandbox.enabled=true
52+
*/
53+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1)
54+
public class CompositeMergeIT extends OpenSearchIntegTestCase {
55+
56+
private static final String INDEX_NAME = "test-composite-merge";
57+
private static final String MERGE_ENABLED_PROPERTY = "opensearch.pluggable.dataformat.merge.enabled";
58+
59+
// ── Mock DataFormatPlugin using test framework stubs ──
60+
61+
public static class MockParquetDataFormatPlugin extends MockDataFormatPlugin implements SearchBackEndPlugin<Object> {
62+
private static final MockDataFormat PARQUET_FORMAT = new MockDataFormat("parquet", 0L, Set.of());
63+
64+
public MockParquetDataFormatPlugin() {
65+
super(PARQUET_FORMAT);
66+
}
67+
68+
@Override
69+
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
70+
return Map.of("parquet", new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
71+
}
72+
73+
@Override
74+
public String name() {
75+
return "mock-parquet-backend";
76+
}
77+
78+
@Override
79+
public List<String> getSupportedFormats() {
80+
return List.of("parquet");
81+
}
82+
83+
@Override
84+
public EngineReaderManager<?> createReaderManager(ReaderManagerConfig settings) {
85+
return new MockReaderManager("parquet");
86+
}
87+
}
88+
89+
// ── Test setup ──
90+
91+
@Override
92+
public void setUp() throws Exception {
93+
enableMerge();
94+
super.setUp();
95+
}
96+
97+
@Override
98+
public void tearDown() throws Exception {
99+
try {
100+
client().admin().indices().prepareDelete(INDEX_NAME).get();
101+
} catch (Exception e) {
102+
// index may not exist if test failed before creation
103+
}
104+
super.tearDown();
105+
disableMerge();
106+
}
107+
108+
@SuppressForbidden(reason = "enable pluggable dataformat merge for integration testing")
109+
private static void enableMerge() {
110+
System.setProperty(MERGE_ENABLED_PROPERTY, "true");
111+
}
112+
113+
@SuppressForbidden(reason = "restore pluggable dataformat merge property after test")
114+
private static void disableMerge() {
115+
System.clearProperty(MERGE_ENABLED_PROPERTY);
116+
}
117+
118+
@Override
119+
protected Collection<Class<? extends Plugin>> nodePlugins() {
120+
return Arrays.asList(MockParquetDataFormatPlugin.class, CompositeDataFormatPlugin.class, LucenePlugin.class);
121+
}
122+
123+
@Override
124+
protected Settings nodeSettings(int nodeOrdinal) {
125+
return Settings.builder()
126+
.put(super.nodeSettings(nodeOrdinal))
127+
.put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true)
128+
.build();
129+
}
130+
131+
// ── Tests ──
132+
133+
/**
134+
* Verifies that background merges are triggered automatically after refresh
135+
* when enough segments accumulate to exceed the TieredMergePolicy threshold.
136+
* <p>
137+
* Flow: index docs across many refresh cycles → each refresh calls
138+
* triggerPossibleMerges() → MergeScheduler picks up merge candidates
139+
* asynchronously → segment count decreases.
140+
*/
141+
public void testBackgroundMergeSingleEngine() throws Exception {
142+
createIndex(INDEX_NAME, singleEngineSettings());
143+
ensureGreen(INDEX_NAME);
144+
145+
// Create enough segments to exceed TieredMergePolicy's default threshold (~10)
146+
int totalSegmentsCreated = indexDocsAcrossMultipleRefreshes(15, 5);
147+
148+
// Wait for async background merges to complete
149+
assertBusy(() -> {
150+
flush(INDEX_NAME);
151+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
152+
assertTrue(
153+
"Expected merges to reduce segment count below " + totalSegmentsCreated + ", but got: " + snapshot.getSegments().size(),
154+
snapshot.getSegments().size() < totalSegmentsCreated
155+
);
156+
});
157+
158+
MergeStats mergeStats = getMergeStats();
159+
assertTrue("Expected at least one merge to have occurred", mergeStats.getTotal() > 0);
160+
161+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
162+
assertEquals(Set.of("parquet"), snapshot.getDataFormats());
163+
}
164+
165+
// ── Helpers ──
166+
167+
private Settings singleEngineSettings() {
168+
return Settings.builder()
169+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
170+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
171+
.put("index.pluggable.dataformat.enabled", true)
172+
.put("index.pluggable.dataformat", "composite")
173+
.put("index.composite.primary_data_format", "parquet")
174+
.putList("index.composite.secondary_data_formats")
175+
.build();
176+
}
177+
178+
private int indexDocsAcrossMultipleRefreshes(int refreshCycles, int docsPerCycle) {
179+
for (int cycle = 0; cycle < refreshCycles; cycle++) {
180+
for (int i = 0; i < docsPerCycle; i++) {
181+
IndexResponse response = client().prepareIndex()
182+
.setIndex(INDEX_NAME)
183+
.setSource("field_text", randomAlphaOfLength(10), "field_number", randomIntBetween(1, 1000))
184+
.get();
185+
assertEquals(RestStatus.CREATED, response.status());
186+
}
187+
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(INDEX_NAME).get();
188+
assertEquals(RestStatus.OK, refreshResponse.getStatus());
189+
}
190+
return refreshCycles;
191+
}
192+
193+
private DataformatAwareCatalogSnapshot getCatalogSnapshot() throws IOException {
194+
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(INDEX_NAME).clear().setStore(true).get();
195+
ShardStats shardStats = statsResponse.getIndex(INDEX_NAME).getShards()[0];
196+
CommitStats commitStats = shardStats.getCommitStats();
197+
assertNotNull(commitStats);
198+
assertTrue(commitStats.getUserData().containsKey(DataformatAwareCatalogSnapshot.CATALOG_SNAPSHOT_KEY));
199+
return DataformatAwareCatalogSnapshot.deserializeFromString(
200+
commitStats.getUserData().get(DataformatAwareCatalogSnapshot.CATALOG_SNAPSHOT_KEY),
201+
Function.identity()
202+
);
203+
}
204+
205+
private MergeStats getMergeStats() {
206+
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(INDEX_NAME).clear().setMerge(true).get();
207+
return statsResponse.getIndex(INDEX_NAME).getShards()[0].getStats().getMerge();
208+
}
209+
}

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,25 @@
2626
@ExperimentalApi
2727
public class CompositeDataFormat extends DataFormat {
2828

29+
private final DataFormat primaryDataFormat;
2930
private final List<DataFormat> dataFormats;
3031

3132
/**
32-
* Constructs a CompositeDataFormat from the given list of data formats.
33+
* Constructs a CompositeDataFormat with a designated primary format and a list of all constituent formats.
3334
*
34-
* @param dataFormats the constituent data formats
35+
* @param primaryDataFormat the authoritative data format used for merge operations
36+
* @param dataFormats all constituent data formats (including the primary)
3537
*/
36-
public CompositeDataFormat(List<DataFormat> dataFormats) {
38+
public CompositeDataFormat(DataFormat primaryDataFormat, List<DataFormat> dataFormats) {
39+
this.primaryDataFormat = Objects.requireNonNull(primaryDataFormat, "primaryDataFormat must not be null");
3740
this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null"));
3841
}
3942

4043
/**
4144
* Constructs an empty CompositeDataFormat with no constituent formats.
4245
*/
4346
public CompositeDataFormat() {
47+
this.primaryDataFormat = null;
4448
this.dataFormats = List.of();
4549
}
4650

@@ -53,6 +57,15 @@ public List<DataFormat> getDataFormats() {
5357
return dataFormats;
5458
}
5559

60+
/**
61+
* Returns the primary data format used for merge operations.
62+
*
63+
* @return the primary data format
64+
*/
65+
public DataFormat getPrimaryDataFormat() {
66+
return primaryDataFormat;
67+
}
68+
5669
@Override
5770
public String name() {
5871
return "composite";

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.annotation.ExperimentalApi;
1414
import org.opensearch.common.settings.Settings;
1515
import org.opensearch.common.util.io.IOUtils;
16+
import org.opensearch.composite.merge.CompositeMerger;
1617
import org.opensearch.index.IndexSettings;
1718
import org.opensearch.index.engine.dataformat.DataFormat;
1819
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
@@ -129,7 +130,7 @@ public CompositeIndexingExecutionEngine(
129130
}
130131
this.secondaryEngines = Set.copyOf(secondaries);
131132

132-
this.compositeDataFormat = new CompositeDataFormat(allFormats);
133+
this.compositeDataFormat = new CompositeDataFormat(primaryFormat, allFormats);
133134
this.committer = committer;
134135
}
135136

@@ -181,7 +182,7 @@ public Writer<CompositeDocumentInput> createWriter(long writerGeneration) {
181182
/** {@inheritDoc} Delegates to the primary engine's merger. */
182183
@Override
183184
public Merger getMerger() {
184-
return primaryEngine.getMerger();
185+
return new CompositeMerger(this, compositeDataFormat);
185186
}
186187

187188
/**
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite.merge;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.index.engine.dataformat.DataFormat;
13+
import org.opensearch.index.engine.dataformat.MergeInput;
14+
import org.opensearch.index.engine.dataformat.MergeResult;
15+
import org.opensearch.index.engine.dataformat.Merger;
16+
import org.opensearch.index.engine.dataformat.RowIdMapping;
17+
import org.opensearch.index.engine.exec.Segment;
18+
import org.opensearch.index.engine.exec.WriterFileSet;
19+
20+
import java.io.IOException;
21+
import java.io.UncheckedIOException;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
/**
28+
* Executes a composite merge: primary format first, then secondaries using the
29+
* row-ID mapping from the primary. Stateless — all state comes from the
30+
* {@link MergePlan} and the merger map.
31+
*
32+
* @opensearch.experimental
33+
*/
34+
@ExperimentalApi
35+
public class CompositeMergeExecutor {
36+
37+
private final Map<DataFormat, Merger> mergers;
38+
39+
public CompositeMergeExecutor(Map<DataFormat, Merger> mergers) {
40+
this.mergers = Map.copyOf(mergers);
41+
}
42+
43+
/**
44+
* Executes the merge described by the plan.
45+
*
46+
* @param plan the pre-validated merge plan
47+
* @return the combined merge result across all formats
48+
*/
49+
public MergeResult execute(MergePlan plan) {
50+
List<FormatMergeResult> completed = new ArrayList<>();
51+
try {
52+
FormatMergeResult primaryResult = mergeFormat(plan, plan.primaryFormat(), null);
53+
completed.add(primaryResult);
54+
55+
RowIdMapping mapping = plan.hasSecondaries()
56+
? primaryResult.rowIdMappingOpt()
57+
.orElseThrow(() -> new IllegalStateException("Primary merge did not produce row-ID mapping required by secondaries"))
58+
: null;
59+
60+
for (DataFormat secondary : plan.secondaryFormats()) {
61+
completed.add(mergeFormat(plan, secondary, mapping));
62+
}
63+
64+
return toMergeResult(completed, mapping);
65+
} catch (Exception e) {
66+
completed.forEach(FormatMergeResult::cleanup);
67+
if (e instanceof RuntimeException re) throw re;
68+
throw new UncheckedIOException((IOException) e);
69+
}
70+
}
71+
72+
private FormatMergeResult mergeFormat(MergePlan plan, DataFormat format, RowIdMapping mapping) throws IOException {
73+
Merger merger = mergers.get(format);
74+
List<WriterFileSet> files = plan.filesFor(format);
75+
List<Segment> segments = new ArrayList<>();
76+
for (WriterFileSet wfs : files) {
77+
segments.add(Segment.builder(wfs.writerGeneration()).addSearchableFiles(format, wfs).build());
78+
}
79+
MergeResult result = merger.merge(new MergeInput(segments, mapping, plan.mergedWriterGeneration()));
80+
return new FormatMergeResult(format, result.getMergedWriterFileSetForDataformat(format), result.rowIdMapping().orElse(null));
81+
}
82+
83+
private static MergeResult toMergeResult(List<FormatMergeResult> results, RowIdMapping mapping) {
84+
Map<DataFormat, WriterFileSet> merged = new HashMap<>();
85+
for (FormatMergeResult r : results) {
86+
merged.put(r.format(), r.mergedFiles());
87+
}
88+
return new MergeResult(merged, mapping);
89+
}
90+
}

0 commit comments

Comments
 (0)