Skip to content

Commit 4790673

Browse files
author
Sagar Darji
committed
Addressed the comments
Signed-off-by: Sagar Darji <darsaga@amazon.com>
1 parent 6700c78 commit 4790673

33 files changed

Lines changed: 556 additions & 701 deletions

File tree

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.common.annotation.ExperimentalApi;
2727
import org.opensearch.index.engine.dataformat.DataFormat;
2828
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
29+
import org.opensearch.index.engine.dataformat.MergeResult;
2930
import org.opensearch.index.engine.dataformat.Merger;
3031
import org.opensearch.index.engine.dataformat.RefreshInput;
3132
import org.opensearch.index.engine.dataformat.RefreshResult;
@@ -278,7 +279,8 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
278279
/** Returns {@code null} — merge scheduling is not yet implemented for the Lucene format. */
279280
@Override
280281
public Merger getMerger() {
281-
return null;
282+
// TODO: Implement merge support as ParquetMerger
283+
return mergeInput -> new MergeResult(Map.of());
282284
}
283285

284286
/**

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.java

Lines changed: 39 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,27 @@
1414
import org.opensearch.action.index.IndexResponse;
1515
import org.opensearch.be.lucene.LucenePlugin;
1616
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.SuppressForbidden;
1718
import org.opensearch.common.settings.Settings;
1819
import org.opensearch.common.util.FeatureFlags;
1920
import org.opensearch.core.rest.RestStatus;
2021
import org.opensearch.index.IndexSettings;
2122
import org.opensearch.index.engine.CommitStats;
22-
import org.opensearch.index.engine.dataformat.DataFormat;
2323
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
24-
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
2524
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
26-
import org.opensearch.index.engine.dataformat.DocumentInput;
27-
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
28-
import org.opensearch.index.engine.dataformat.FileInfos;
29-
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
30-
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
31-
import org.opensearch.index.engine.dataformat.MergeResult;
32-
import org.opensearch.index.engine.dataformat.Merger;
33-
import org.opensearch.index.engine.dataformat.RefreshInput;
34-
import org.opensearch.index.engine.dataformat.RefreshResult;
35-
import org.opensearch.index.engine.dataformat.WriteResult;
36-
import org.opensearch.index.engine.dataformat.Writer;
37-
import org.opensearch.index.engine.exec.Segment;
38-
import org.opensearch.index.engine.exec.WriterFileSet;
39-
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
25+
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
26+
import org.opensearch.index.engine.dataformat.stub.MockDataFormat;
27+
import org.opensearch.index.engine.dataformat.stub.MockDataFormatPlugin;
28+
import org.opensearch.index.engine.dataformat.stub.MockReaderManager;
29+
import org.opensearch.index.engine.exec.EngineReaderManager;
4030
import org.opensearch.index.engine.exec.coord.DataformatAwareCatalogSnapshot;
41-
import org.opensearch.index.mapper.MappedFieldType;
4231
import org.opensearch.index.merge.MergeStats;
43-
import org.opensearch.index.store.FormatChecksumStrategy;
4432
import org.opensearch.index.store.PrecomputedChecksumStrategy;
4533
import org.opensearch.plugins.Plugin;
34+
import org.opensearch.plugins.SearchBackEndPlugin;
4635
import org.opensearch.test.OpenSearchIntegTestCase;
4736

4837
import java.io.IOException;
49-
import java.util.ArrayList;
5038
import java.util.Arrays;
5139
import java.util.Collection;
5240
import java.util.List;
@@ -66,174 +54,46 @@
6654
public class CompositeMergeIT extends OpenSearchIntegTestCase {
6755

6856
private static final String INDEX_NAME = "test-composite-merge";
57+
private static final String MERGE_ENABLED_PROPERTY = "opensearch.pluggable.dataformat.merge.enabled";
6958

70-
// ── Stub DataFormat for "parquet" ──
59+
// ── Mock DataFormatPlugin using test framework stubs ──
7160

72-
private static final DataFormat STUB_PARQUET_FORMAT = new DataFormat() {
73-
@Override
74-
public String name() {
75-
return "parquet";
76-
}
77-
78-
@Override
79-
public long priority() {
80-
return 0;
81-
}
82-
83-
@Override
84-
public Set<FieldTypeCapabilities> supportedFields() {
85-
return Set.of();
86-
}
87-
};
88-
89-
// ── Stub DocumentInput ──
90-
91-
private static class StubDocumentInput implements DocumentInput<Object> {
92-
@Override
93-
public void addField(MappedFieldType fieldType, Object value) {}
94-
95-
@Override
96-
public void setRowId(String rowIdFieldName, long rowId) {}
97-
98-
@Override
99-
public Object getFinalInput() {
100-
return null;
101-
}
102-
103-
@Override
104-
public void close() {}
105-
}
106-
107-
// ── Stub Writer ──
108-
109-
private static class StubWriter implements Writer<StubDocumentInput> {
110-
private final long generation;
111-
private int docCount = 0;
112-
113-
StubWriter(long generation) {
114-
this.generation = generation;
115-
}
116-
117-
@Override
118-
public WriteResult addDoc(StubDocumentInput documentInput) {
119-
docCount++;
120-
return new WriteResult.Success(1L, 1L, docCount);
121-
}
122-
123-
@Override
124-
public FileInfos flush() {
125-
if (docCount == 0) {
126-
return FileInfos.empty();
127-
}
128-
WriterFileSet wfs = new WriterFileSet("/tmp/stub", generation, Set.of("stub_" + generation + ".parquet"), docCount);
129-
return new FileInfos(Map.of(STUB_PARQUET_FORMAT, wfs));
130-
}
131-
132-
@Override
133-
public void sync() {}
134-
135-
@Override
136-
public long generation() {
137-
return generation;
138-
}
139-
140-
@Override
141-
public void lock() {}
142-
143-
@Override
144-
public boolean tryLock() {
145-
return true;
146-
}
147-
148-
@Override
149-
public void unlock() {}
150-
151-
@Override
152-
public void close() throws IOException {}
153-
}
154-
155-
// ── Stub IndexingExecutionEngine ──
156-
157-
@SuppressWarnings({ "unchecked", "rawtypes" })
158-
private static class StubParquetEngine implements IndexingExecutionEngine<DataFormat, StubDocumentInput> {
61+
public static class MockParquetDataFormatPlugin extends MockDataFormatPlugin implements SearchBackEndPlugin<Object> {
62+
private static final MockDataFormat PARQUET_FORMAT = new MockDataFormat("parquet", 0L, Set.of());
15963

160-
@Override
161-
public Writer<StubDocumentInput> createWriter(long writerGeneration) {
162-
return new StubWriter(writerGeneration);
64+
public MockParquetDataFormatPlugin() {
65+
super(PARQUET_FORMAT);
16366
}
16467

16568
@Override
166-
public Merger getMerger() {
167-
return mergeInput -> {
168-
long totalRows = mergeInput.writerFiles().stream().mapToLong(WriterFileSet::numRows).sum();
169-
WriterFileSet merged = new WriterFileSet("/tmp/stub", mergeInput.newWriterGeneration(),
170-
Set.of("merged_" + mergeInput.newWriterGeneration() + ".parquet"), totalRows);
171-
return new MergeResult(Map.of(STUB_PARQUET_FORMAT, merged));
172-
};
173-
}
174-
175-
@Override
176-
public RefreshResult refresh(RefreshInput refreshInput) {
177-
if (refreshInput == null) return new RefreshResult(List.of());
178-
List<Segment> segments = new ArrayList<>();
179-
segments.addAll(refreshInput.existingSegments());
180-
segments.addAll(refreshInput.writerFiles());
181-
return new RefreshResult(List.copyOf(segments));
182-
}
183-
184-
@Override
185-
public long getNextWriterGeneration() {
186-
return 0;
187-
}
188-
189-
@Override
190-
public DataFormat getDataFormat() {
191-
return STUB_PARQUET_FORMAT;
69+
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
70+
return Map.of("parquet", new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
19271
}
19372

19473
@Override
195-
public long getNativeBytesUsed() {
196-
return 0;
74+
public String name() {
75+
return "mock-parquet-backend";
19776
}
19877

19978
@Override
200-
public void deleteFiles(Map<String, Collection<String>> filesToDelete) {}
201-
202-
@Override
203-
public StubDocumentInput newDocumentInput() {
204-
return new StubDocumentInput();
79+
public List<String> getSupportedFormats() {
80+
return List.of("parquet");
20581
}
20682

20783
@Override
208-
public IndexStoreProvider getProvider() {
209-
return null;
84+
public EngineReaderManager<?> createReaderManager(ReaderManagerConfig settings) {
85+
return new MockReaderManager("parquet");
21086
}
211-
212-
@Override
213-
public void close() {}
21487
}
21588

216-
// ── Stub DataFormatPlugin ──
217-
218-
public static class MockParquetDataFormatPlugin extends Plugin implements DataFormatPlugin {
219-
@Override
220-
public DataFormat getDataFormat() {
221-
return STUB_PARQUET_FORMAT;
222-
}
223-
224-
@Override
225-
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings, FormatChecksumStrategy checksumStrategy) {
226-
return new StubParquetEngine();
227-
}
89+
// ── Test setup ──
22890

229-
@Override
230-
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
231-
return Map.of("parquet", new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
232-
}
91+
@Override
92+
public void setUp() throws Exception {
93+
enableMerge();
94+
super.setUp();
23395
}
23496

235-
// ── Test setup ──
236-
23797
@Override
23898
public void tearDown() throws Exception {
23999
try {
@@ -242,6 +102,17 @@ public void tearDown() throws Exception {
242102
// index may not exist if test failed before creation
243103
}
244104
super.tearDown();
105+
disableMerge();
106+
}
107+
108+
@SuppressForbidden(reason = "enable pluggable dataformat merge for integration testing")
109+
private static void enableMerge() {
110+
System.setProperty(MERGE_ENABLED_PROPERTY, "true");
111+
}
112+
113+
@SuppressForbidden(reason = "restore pluggable dataformat merge property after test")
114+
private static void disableMerge() {
115+
System.clearProperty(MERGE_ENABLED_PROPERTY);
245116
}
246117

247118
@Override
@@ -279,8 +150,7 @@ public void testBackgroundMergeSingleEngine() throws Exception {
279150
flush(INDEX_NAME);
280151
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
281152
assertTrue(
282-
"Expected merges to reduce segment count below " + totalSegmentsCreated
283-
+ ", but got: " + snapshot.getSegments().size(),
153+
"Expected merges to reduce segment count below " + totalSegmentsCreated + ", but got: " + snapshot.getSegments().size(),
284154
snapshot.getSegments().size() < totalSegmentsCreated
285155
);
286156
});
@@ -305,22 +175,19 @@ private Settings singleEngineSettings() {
305175
.build();
306176
}
307177

308-
309178
private int indexDocsAcrossMultipleRefreshes(int refreshCycles, int docsPerCycle) {
310-
int totalDocs = 0;
311179
for (int cycle = 0; cycle < refreshCycles; cycle++) {
312180
for (int i = 0; i < docsPerCycle; i++) {
313181
IndexResponse response = client().prepareIndex()
314182
.setIndex(INDEX_NAME)
315183
.setSource("field_text", randomAlphaOfLength(10), "field_number", randomIntBetween(1, 1000))
316184
.get();
317185
assertEquals(RestStatus.CREATED, response.status());
318-
totalDocs++;
319186
}
320187
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(INDEX_NAME).get();
321188
assertEquals(RestStatus.OK, refreshResponse.getStatus());
322189
}
323-
return totalDocs;
190+
return refreshCycles;
324191
}
325192

326193
private DataformatAwareCatalogSnapshot getCatalogSnapshot() throws IOException {

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ public CompositeDataFormat(DataFormat primaryDataFormat, List<DataFormat> dataFo
4040
this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null"));
4141
}
4242

43+
/**
44+
* Constructs an empty CompositeDataFormat with no constituent formats.
45+
*/
46+
public CompositeDataFormat() {
47+
this.primaryDataFormat = null;
48+
this.dataFormats = List.of();
49+
}
50+
4351
/**
4452
* Returns the list of constituent data formats.
4553
*

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
1919
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
2020
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
21-
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
2221
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
2322
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
2423
import org.opensearch.index.store.FormatChecksumStrategy;
@@ -29,7 +28,6 @@
2928
import java.util.HashMap;
3029
import java.util.List;
3130
import java.util.Map;
32-
import java.util.Set;
3331

3432
/**
3533
* Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for
@@ -90,22 +88,7 @@ public List<Setting<?>> getSettings() {
9088
@Override
9189
public DataFormat getDataFormat() {
9290
// TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now
93-
return new CompositeDataFormat(new DataFormat() {
94-
@Override
95-
public String name() {
96-
return "composite";
97-
}
98-
99-
@Override
100-
public long priority() {
101-
return Long.MIN_VALUE;
102-
}
103-
104-
@Override
105-
public Set<FieldTypeCapabilities> supportedFields() {
106-
return Set.of();
107-
}
108-
}, List.of());
91+
return new CompositeDataFormat();
10992
}
11093

11194
@Override

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.annotation.ExperimentalApi;
1414
import org.opensearch.common.settings.Settings;
1515
import org.opensearch.common.util.io.IOUtils;
16+
import org.opensearch.composite.merge.CompositeMerger;
1617
import org.opensearch.index.IndexSettings;
1718
import org.opensearch.index.engine.dataformat.DataFormat;
1819
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
@@ -27,7 +28,6 @@
2728
import org.opensearch.index.engine.dataformat.Writer;
2829
import org.opensearch.index.engine.exec.EngineReaderManager;
2930
import org.opensearch.index.engine.exec.Segment;
30-
import org.opensearch.composite.merge.CompositeMerger;
3131
import org.opensearch.index.engine.exec.WriterFileSet;
3232
import org.opensearch.index.engine.exec.commit.Committer;
3333
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;

0 commit comments

Comments
 (0)