Skip to content

Commit 6700c78

Browse files
author
Sagar Darji
committed
Integrating the merge flow with the DataFormatAwareEngine
Signed-off-by: Sagar Darji <darsaga@amazon.com> # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java # server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManager.java # server/src/test/java/org/opensearch/index/engine/exec/coord/CatalogSnapshotManagerTests.java # Conflicts: # server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java
1 parent 6c31dd3 commit 6700c78

12 files changed

Lines changed: 749 additions & 159 deletions

File tree

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite;
10+
11+
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
12+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.action.admin.indices.stats.ShardStats;
14+
import org.opensearch.action.index.IndexResponse;
15+
import org.opensearch.be.lucene.LucenePlugin;
16+
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.util.FeatureFlags;
19+
import org.opensearch.core.rest.RestStatus;
20+
import org.opensearch.index.IndexSettings;
21+
import org.opensearch.index.engine.CommitStats;
22+
import org.opensearch.index.engine.dataformat.DataFormat;
23+
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
24+
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
25+
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
26+
import org.opensearch.index.engine.dataformat.DocumentInput;
27+
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
28+
import org.opensearch.index.engine.dataformat.FileInfos;
29+
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
30+
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
31+
import org.opensearch.index.engine.dataformat.MergeResult;
32+
import org.opensearch.index.engine.dataformat.Merger;
33+
import org.opensearch.index.engine.dataformat.RefreshInput;
34+
import org.opensearch.index.engine.dataformat.RefreshResult;
35+
import org.opensearch.index.engine.dataformat.WriteResult;
36+
import org.opensearch.index.engine.dataformat.Writer;
37+
import org.opensearch.index.engine.exec.Segment;
38+
import org.opensearch.index.engine.exec.WriterFileSet;
39+
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
40+
import org.opensearch.index.engine.exec.coord.DataformatAwareCatalogSnapshot;
41+
import org.opensearch.index.mapper.MappedFieldType;
42+
import org.opensearch.index.merge.MergeStats;
43+
import org.opensearch.index.store.FormatChecksumStrategy;
44+
import org.opensearch.index.store.PrecomputedChecksumStrategy;
45+
import org.opensearch.plugins.Plugin;
46+
import org.opensearch.test.OpenSearchIntegTestCase;
47+
48+
import java.io.IOException;
49+
import java.util.ArrayList;
50+
import java.util.Arrays;
51+
import java.util.Collection;
52+
import java.util.List;
53+
import java.util.Map;
54+
import java.util.Set;
55+
import java.util.function.Function;
56+
57+
/**
58+
* Integration tests for composite merge operations across single and multiple data format engines.
59+
*
60+
* Requires JDK 25 and sandbox enabled. Run with:
61+
* ./gradlew :sandbox:plugins:composite-engine:internalClusterTest \
62+
* --tests "*.CompositeMergeIT" \
63+
* -Dsandbox.enabled=true
64+
*/
65+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1)
66+
public class CompositeMergeIT extends OpenSearchIntegTestCase {
67+
68+
private static final String INDEX_NAME = "test-composite-merge";
69+
70+
// ── Stub DataFormat for "parquet" ──
71+
72+
private static final DataFormat STUB_PARQUET_FORMAT = new DataFormat() {
73+
@Override
74+
public String name() {
75+
return "parquet";
76+
}
77+
78+
@Override
79+
public long priority() {
80+
return 0;
81+
}
82+
83+
@Override
84+
public Set<FieldTypeCapabilities> supportedFields() {
85+
return Set.of();
86+
}
87+
};
88+
89+
// ── Stub DocumentInput ──
90+
91+
private static class StubDocumentInput implements DocumentInput<Object> {
92+
@Override
93+
public void addField(MappedFieldType fieldType, Object value) {}
94+
95+
@Override
96+
public void setRowId(String rowIdFieldName, long rowId) {}
97+
98+
@Override
99+
public Object getFinalInput() {
100+
return null;
101+
}
102+
103+
@Override
104+
public void close() {}
105+
}
106+
107+
// ── Stub Writer ──
108+
109+
private static class StubWriter implements Writer<StubDocumentInput> {
110+
private final long generation;
111+
private int docCount = 0;
112+
113+
StubWriter(long generation) {
114+
this.generation = generation;
115+
}
116+
117+
@Override
118+
public WriteResult addDoc(StubDocumentInput documentInput) {
119+
docCount++;
120+
return new WriteResult.Success(1L, 1L, docCount);
121+
}
122+
123+
@Override
124+
public FileInfos flush() {
125+
if (docCount == 0) {
126+
return FileInfos.empty();
127+
}
128+
WriterFileSet wfs = new WriterFileSet("/tmp/stub", generation, Set.of("stub_" + generation + ".parquet"), docCount);
129+
return new FileInfos(Map.of(STUB_PARQUET_FORMAT, wfs));
130+
}
131+
132+
@Override
133+
public void sync() {}
134+
135+
@Override
136+
public long generation() {
137+
return generation;
138+
}
139+
140+
@Override
141+
public void lock() {}
142+
143+
@Override
144+
public boolean tryLock() {
145+
return true;
146+
}
147+
148+
@Override
149+
public void unlock() {}
150+
151+
@Override
152+
public void close() throws IOException {}
153+
}
154+
155+
// ── Stub IndexingExecutionEngine ──
156+
157+
@SuppressWarnings({ "unchecked", "rawtypes" })
158+
private static class StubParquetEngine implements IndexingExecutionEngine<DataFormat, StubDocumentInput> {
159+
160+
@Override
161+
public Writer<StubDocumentInput> createWriter(long writerGeneration) {
162+
return new StubWriter(writerGeneration);
163+
}
164+
165+
@Override
166+
public Merger getMerger() {
167+
return mergeInput -> {
168+
long totalRows = mergeInput.writerFiles().stream().mapToLong(WriterFileSet::numRows).sum();
169+
WriterFileSet merged = new WriterFileSet("/tmp/stub", mergeInput.newWriterGeneration(),
170+
Set.of("merged_" + mergeInput.newWriterGeneration() + ".parquet"), totalRows);
171+
return new MergeResult(Map.of(STUB_PARQUET_FORMAT, merged));
172+
};
173+
}
174+
175+
@Override
176+
public RefreshResult refresh(RefreshInput refreshInput) {
177+
if (refreshInput == null) return new RefreshResult(List.of());
178+
List<Segment> segments = new ArrayList<>();
179+
segments.addAll(refreshInput.existingSegments());
180+
segments.addAll(refreshInput.writerFiles());
181+
return new RefreshResult(List.copyOf(segments));
182+
}
183+
184+
@Override
185+
public long getNextWriterGeneration() {
186+
return 0;
187+
}
188+
189+
@Override
190+
public DataFormat getDataFormat() {
191+
return STUB_PARQUET_FORMAT;
192+
}
193+
194+
@Override
195+
public long getNativeBytesUsed() {
196+
return 0;
197+
}
198+
199+
@Override
200+
public void deleteFiles(Map<String, Collection<String>> filesToDelete) {}
201+
202+
@Override
203+
public StubDocumentInput newDocumentInput() {
204+
return new StubDocumentInput();
205+
}
206+
207+
@Override
208+
public IndexStoreProvider getProvider() {
209+
return null;
210+
}
211+
212+
@Override
213+
public void close() {}
214+
}
215+
216+
// ── Stub DataFormatPlugin ──
217+
218+
public static class MockParquetDataFormatPlugin extends Plugin implements DataFormatPlugin {
219+
@Override
220+
public DataFormat getDataFormat() {
221+
return STUB_PARQUET_FORMAT;
222+
}
223+
224+
@Override
225+
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings, FormatChecksumStrategy checksumStrategy) {
226+
return new StubParquetEngine();
227+
}
228+
229+
@Override
230+
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
231+
return Map.of("parquet", new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
232+
}
233+
}
234+
235+
// ── Test setup ──
236+
237+
@Override
238+
public void tearDown() throws Exception {
239+
try {
240+
client().admin().indices().prepareDelete(INDEX_NAME).get();
241+
} catch (Exception e) {
242+
// index may not exist if test failed before creation
243+
}
244+
super.tearDown();
245+
}
246+
247+
@Override
248+
protected Collection<Class<? extends Plugin>> nodePlugins() {
249+
return Arrays.asList(MockParquetDataFormatPlugin.class, CompositeDataFormatPlugin.class, LucenePlugin.class);
250+
}
251+
252+
@Override
253+
protected Settings nodeSettings(int nodeOrdinal) {
254+
return Settings.builder()
255+
.put(super.nodeSettings(nodeOrdinal))
256+
.put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true)
257+
.build();
258+
}
259+
260+
// ── Tests ──
261+
262+
/**
263+
* Verifies that background merges are triggered automatically after refresh
264+
* when enough segments accumulate to exceed the TieredMergePolicy threshold.
265+
* <p>
266+
* Flow: index docs across many refresh cycles → each refresh calls
267+
* triggerPossibleMerges() → MergeScheduler picks up merge candidates
268+
* asynchronously → segment count decreases.
269+
*/
270+
public void testBackgroundMergeSingleEngine() throws Exception {
271+
createIndex(INDEX_NAME, singleEngineSettings());
272+
ensureGreen(INDEX_NAME);
273+
274+
// Create enough segments to exceed TieredMergePolicy's default threshold (~10)
275+
int totalSegmentsCreated = indexDocsAcrossMultipleRefreshes(15, 5);
276+
277+
// Wait for async background merges to complete
278+
assertBusy(() -> {
279+
flush(INDEX_NAME);
280+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
281+
assertTrue(
282+
"Expected merges to reduce segment count below " + totalSegmentsCreated
283+
+ ", but got: " + snapshot.getSegments().size(),
284+
snapshot.getSegments().size() < totalSegmentsCreated
285+
);
286+
});
287+
288+
MergeStats mergeStats = getMergeStats();
289+
assertTrue("Expected at least one merge to have occurred", mergeStats.getTotal() > 0);
290+
291+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
292+
assertEquals(Set.of("parquet"), snapshot.getDataFormats());
293+
}
294+
295+
// ── Helpers ──
296+
297+
private Settings singleEngineSettings() {
298+
return Settings.builder()
299+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
300+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
301+
.put("index.pluggable.dataformat.enabled", true)
302+
.put("index.pluggable.dataformat", "composite")
303+
.put("index.composite.primary_data_format", "parquet")
304+
.putList("index.composite.secondary_data_formats")
305+
.build();
306+
}
307+
308+
309+
private int indexDocsAcrossMultipleRefreshes(int refreshCycles, int docsPerCycle) {
310+
int totalDocs = 0;
311+
for (int cycle = 0; cycle < refreshCycles; cycle++) {
312+
for (int i = 0; i < docsPerCycle; i++) {
313+
IndexResponse response = client().prepareIndex()
314+
.setIndex(INDEX_NAME)
315+
.setSource("field_text", randomAlphaOfLength(10), "field_number", randomIntBetween(1, 1000))
316+
.get();
317+
assertEquals(RestStatus.CREATED, response.status());
318+
totalDocs++;
319+
}
320+
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(INDEX_NAME).get();
321+
assertEquals(RestStatus.OK, refreshResponse.getStatus());
322+
}
323+
return totalDocs;
324+
}
325+
326+
private DataformatAwareCatalogSnapshot getCatalogSnapshot() throws IOException {
327+
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(INDEX_NAME).clear().setStore(true).get();
328+
ShardStats shardStats = statsResponse.getIndex(INDEX_NAME).getShards()[0];
329+
CommitStats commitStats = shardStats.getCommitStats();
330+
assertNotNull(commitStats);
331+
assertTrue(commitStats.getUserData().containsKey(DataformatAwareCatalogSnapshot.CATALOG_SNAPSHOT_KEY));
332+
return DataformatAwareCatalogSnapshot.deserializeFromString(
333+
commitStats.getUserData().get(DataformatAwareCatalogSnapshot.CATALOG_SNAPSHOT_KEY),
334+
Function.identity()
335+
);
336+
}
337+
338+
private MergeStats getMergeStats() {
339+
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(INDEX_NAME).clear().setMerge(true).get();
340+
return statsResponse.getIndex(INDEX_NAME).getShards()[0].getStats().getMerge();
341+
}
342+
}

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
1919
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
2020
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
21+
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
2122
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
2223
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
2324
import org.opensearch.index.store.FormatChecksumStrategy;
@@ -28,6 +29,7 @@
2829
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.Set;
3133

3234
/**
3335
* Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for
@@ -88,7 +90,22 @@ public List<Setting<?>> getSettings() {
8890
@Override
8991
public DataFormat getDataFormat() {
9092
// TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now
91-
return new CompositeDataFormat();
93+
return new CompositeDataFormat(new DataFormat() {
94+
@Override
95+
public String name() {
96+
return "composite";
97+
}
98+
99+
@Override
100+
public long priority() {
101+
return Long.MIN_VALUE;
102+
}
103+
104+
@Override
105+
public Set<FieldTypeCapabilities> supportedFields() {
106+
return Set.of();
107+
}
108+
}, List.of());
92109
}
93110

94111
@Override

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.index.engine.dataformat.Writer;
2828
import org.opensearch.index.engine.exec.EngineReaderManager;
2929
import org.opensearch.index.engine.exec.Segment;
30+
import org.opensearch.composite.merge.CompositeMerger;
3031
import org.opensearch.index.engine.exec.WriterFileSet;
3132
import org.opensearch.index.engine.exec.commit.Committer;
3233
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
@@ -129,7 +130,7 @@ public CompositeIndexingExecutionEngine(
129130
}
130131
this.secondaryEngines = Set.copyOf(secondaries);
131132

132-
this.compositeDataFormat = new CompositeDataFormat(primaryPlugin.getDataFormat(), allFormats);
133+
this.compositeDataFormat = new CompositeDataFormat(primaryFormat, allFormats);
133134
this.committer = committer;
134135
}
135136

0 commit comments

Comments
 (0)