Skip to content

Commit da11981

Browse files
author
Sagar Darji
committed
Addressed the comments
Signed-off-by: Sagar Darji <darsaga@amazon.com>
1 parent 5782393 commit da11981

32 files changed

Lines changed: 522 additions & 706 deletions

File tree

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.java

Lines changed: 8 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,19 @@
1919
import org.opensearch.core.rest.RestStatus;
2020
import org.opensearch.index.IndexSettings;
2121
import org.opensearch.index.engine.CommitStats;
22-
import org.opensearch.index.engine.dataformat.DataFormat;
2322
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
24-
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
2523
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
26-
import org.opensearch.index.engine.dataformat.DocumentInput;
27-
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
28-
import org.opensearch.index.engine.dataformat.FileInfos;
29-
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
30-
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
31-
import org.opensearch.index.engine.dataformat.MergeResult;
32-
import org.opensearch.index.engine.dataformat.Merger;
33-
import org.opensearch.index.engine.dataformat.RefreshInput;
34-
import org.opensearch.index.engine.dataformat.RefreshResult;
35-
import org.opensearch.index.engine.dataformat.WriteResult;
36-
import org.opensearch.index.engine.dataformat.Writer;
37-
import org.opensearch.index.engine.exec.Segment;
38-
import org.opensearch.index.engine.exec.WriterFileSet;
39-
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
24+
import org.opensearch.index.engine.dataformat.stub.MockDataFormat;
25+
import org.opensearch.index.engine.dataformat.stub.MockDataFormatPlugin;
4026
import org.opensearch.index.engine.exec.coord.DataformatAwareCatalogSnapshot;
41-
import org.opensearch.index.mapper.MappedFieldType;
4227
import org.opensearch.index.merge.MergeStats;
43-
import org.opensearch.index.store.FormatChecksumStrategy;
4428
import org.opensearch.index.store.PrecomputedChecksumStrategy;
4529
import org.opensearch.plugins.Plugin;
4630
import org.opensearch.test.OpenSearchIntegTestCase;
4731

4832
import java.io.IOException;
49-
import java.util.ArrayList;
5033
import java.util.Arrays;
5134
import java.util.Collection;
52-
import java.util.List;
5335
import java.util.Map;
5436
import java.util.Set;
5537
import java.util.function.Function;
@@ -67,163 +49,13 @@ public class CompositeMergeIT extends OpenSearchIntegTestCase {
6749

6850
private static final String INDEX_NAME = "test-composite-merge";
6951

70-
// ── Stub DataFormat for "parquet" ──
52+
// ── Mock DataFormatPlugin using test framework stubs ──
7153

72-
private static final DataFormat STUB_PARQUET_FORMAT = new DataFormat() {
73-
@Override
74-
public String name() {
75-
return "parquet";
76-
}
77-
78-
@Override
79-
public long priority() {
80-
return 0;
81-
}
82-
83-
@Override
84-
public Set<FieldTypeCapabilities> supportedFields() {
85-
return Set.of();
86-
}
87-
};
88-
89-
// ── Stub DocumentInput ──
90-
91-
private static class StubDocumentInput implements DocumentInput<Object> {
92-
@Override
93-
public void addField(MappedFieldType fieldType, Object value) {}
94-
95-
@Override
96-
public void setRowId(String rowIdFieldName, long rowId) {}
97-
98-
@Override
99-
public Object getFinalInput() {
100-
return null;
101-
}
102-
103-
@Override
104-
public void close() {}
105-
}
106-
107-
// ── Stub Writer ──
108-
109-
private static class StubWriter implements Writer<StubDocumentInput> {
110-
private final long generation;
111-
private int docCount = 0;
112-
113-
StubWriter(long generation) {
114-
this.generation = generation;
115-
}
116-
117-
@Override
118-
public WriteResult addDoc(StubDocumentInput documentInput) {
119-
docCount++;
120-
return new WriteResult.Success(1L, 1L, docCount);
121-
}
122-
123-
@Override
124-
public FileInfos flush() {
125-
if (docCount == 0) {
126-
return FileInfos.empty();
127-
}
128-
WriterFileSet wfs = new WriterFileSet("/tmp/stub", generation, Set.of("stub_" + generation + ".parquet"), docCount);
129-
return new FileInfos(Map.of(STUB_PARQUET_FORMAT, wfs));
130-
}
131-
132-
@Override
133-
public void sync() {}
134-
135-
@Override
136-
public long generation() {
137-
return generation;
138-
}
139-
140-
@Override
141-
public void lock() {}
142-
143-
@Override
144-
public boolean tryLock() {
145-
return true;
146-
}
147-
148-
@Override
149-
public void unlock() {}
150-
151-
@Override
152-
public void close() throws IOException {}
153-
}
154-
155-
// ── Stub IndexingExecutionEngine ──
156-
157-
@SuppressWarnings({ "unchecked", "rawtypes" })
158-
private static class StubParquetEngine implements IndexingExecutionEngine<DataFormat, StubDocumentInput> {
54+
public static class MockParquetDataFormatPlugin extends MockDataFormatPlugin {
55+
private static final MockDataFormat PARQUET_FORMAT = new MockDataFormat("parquet", 0L, Set.of());
15956

160-
@Override
161-
public Writer<StubDocumentInput> createWriter(long writerGeneration) {
162-
return new StubWriter(writerGeneration);
163-
}
164-
165-
@Override
166-
public Merger getMerger() {
167-
return mergeInput -> {
168-
long totalRows = mergeInput.writerFiles().stream().mapToLong(WriterFileSet::numRows).sum();
169-
WriterFileSet merged = new WriterFileSet("/tmp/stub", mergeInput.newWriterGeneration(),
170-
Set.of("merged_" + mergeInput.newWriterGeneration() + ".parquet"), totalRows);
171-
return new MergeResult(Map.of(STUB_PARQUET_FORMAT, merged));
172-
};
173-
}
174-
175-
@Override
176-
public RefreshResult refresh(RefreshInput refreshInput) {
177-
if (refreshInput == null) return new RefreshResult(List.of());
178-
List<Segment> segments = new ArrayList<>();
179-
segments.addAll(refreshInput.existingSegments());
180-
segments.addAll(refreshInput.writerFiles());
181-
return new RefreshResult(List.copyOf(segments));
182-
}
183-
184-
@Override
185-
public long getNextWriterGeneration() {
186-
return 0;
187-
}
188-
189-
@Override
190-
public DataFormat getDataFormat() {
191-
return STUB_PARQUET_FORMAT;
192-
}
193-
194-
@Override
195-
public long getNativeBytesUsed() {
196-
return 0;
197-
}
198-
199-
@Override
200-
public void deleteFiles(Map<String, Collection<String>> filesToDelete) {}
201-
202-
@Override
203-
public StubDocumentInput newDocumentInput() {
204-
return new StubDocumentInput();
205-
}
206-
207-
@Override
208-
public IndexStoreProvider getProvider() {
209-
return null;
210-
}
211-
212-
@Override
213-
public void close() {}
214-
}
215-
216-
// ── Stub DataFormatPlugin ──
217-
218-
public static class MockParquetDataFormatPlugin extends Plugin implements DataFormatPlugin {
219-
@Override
220-
public DataFormat getDataFormat() {
221-
return STUB_PARQUET_FORMAT;
222-
}
223-
224-
@Override
225-
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings, FormatChecksumStrategy checksumStrategy) {
226-
return new StubParquetEngine();
57+
public MockParquetDataFormatPlugin() {
58+
super(PARQUET_FORMAT);
22759
}
22860

22961
@Override
@@ -279,8 +111,7 @@ public void testBackgroundMergeSingleEngine() throws Exception {
279111
flush(INDEX_NAME);
280112
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
281113
assertTrue(
282-
"Expected merges to reduce segment count below " + totalSegmentsCreated
283-
+ ", but got: " + snapshot.getSegments().size(),
114+
"Expected merges to reduce segment count below " + totalSegmentsCreated + ", but got: " + snapshot.getSegments().size(),
284115
snapshot.getSegments().size() < totalSegmentsCreated
285116
);
286117
});
@@ -305,7 +136,6 @@ private Settings singleEngineSettings() {
305136
.build();
306137
}
307138

308-
309139
private int indexDocsAcrossMultipleRefreshes(int refreshCycles, int docsPerCycle) {
310140
int totalDocs = 0;
311141
for (int cycle = 0; cycle < refreshCycles; cycle++) {

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ public CompositeDataFormat(DataFormat primaryDataFormat, List<DataFormat> dataFo
4040
this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null"));
4141
}
4242

43+
/**
44+
* Constructs an empty CompositeDataFormat with no constituent formats.
45+
*/
46+
public CompositeDataFormat() {
47+
this.primaryDataFormat = null;
48+
this.dataFormats = List.of();
49+
}
50+
4351
/**
4452
* Returns the list of constituent data formats.
4553
*

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
1919
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
2020
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
21-
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
2221
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
2322
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
2423
import org.opensearch.index.store.FormatChecksumStrategy;
@@ -29,7 +28,6 @@
2928
import java.util.HashMap;
3029
import java.util.List;
3130
import java.util.Map;
32-
import java.util.Set;
3331

3432
/**
3533
* Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for
@@ -90,22 +88,7 @@ public List<Setting<?>> getSettings() {
9088
@Override
9189
public DataFormat getDataFormat() {
9290
// TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now
93-
return new CompositeDataFormat(new DataFormat() {
94-
@Override
95-
public String name() {
96-
return "composite";
97-
}
98-
99-
@Override
100-
public long priority() {
101-
return Long.MIN_VALUE;
102-
}
103-
104-
@Override
105-
public Set<FieldTypeCapabilities> supportedFields() {
106-
return Set.of();
107-
}
108-
}, List.of());
91+
return new CompositeDataFormat();
10992
}
11093

11194
@Override

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.annotation.ExperimentalApi;
1414
import org.opensearch.common.settings.Settings;
1515
import org.opensearch.common.util.io.IOUtils;
16+
import org.opensearch.composite.merge.CompositeMerger;
1617
import org.opensearch.index.IndexSettings;
1718
import org.opensearch.index.engine.dataformat.DataFormat;
1819
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
@@ -27,7 +28,6 @@
2728
import org.opensearch.index.engine.dataformat.Writer;
2829
import org.opensearch.index.engine.exec.EngineReaderManager;
2930
import org.opensearch.index.engine.exec.Segment;
30-
import org.opensearch.composite.merge.CompositeMerger;
3131
import org.opensearch.index.engine.exec.WriterFileSet;
3232
import org.opensearch.index.engine.exec.commit.Committer;
3333
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/merge/CompositeMergeExecutor.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.opensearch.index.engine.dataformat.MergeResult;
1515
import org.opensearch.index.engine.dataformat.Merger;
1616
import org.opensearch.index.engine.dataformat.RowIdMapping;
17-
import org.opensearch.index.engine.dataformat.merge.OneMerge;
17+
import org.opensearch.index.engine.exec.Segment;
1818
import org.opensearch.index.engine.exec.WriterFileSet;
1919

2020
import java.io.IOException;
@@ -49,14 +49,12 @@ public CompositeMergeExecutor(Map<DataFormat, Merger> mergers) {
4949
public MergeResult execute(MergePlan plan) {
5050
List<FormatMergeResult> completed = new ArrayList<>();
5151
try {
52-
FormatMergeResult primaryResult = mergeFormat(
53-
plan, plan.primaryFormat(), null
54-
);
52+
FormatMergeResult primaryResult = mergeFormat(plan, plan.primaryFormat(), null);
5553
completed.add(primaryResult);
5654

5755
RowIdMapping mapping = plan.hasSecondaries()
58-
? primaryResult.rowIdMappingOpt().orElseThrow(() -> new IllegalStateException(
59-
"Primary merge did not produce row-ID mapping required by secondaries"))
56+
? primaryResult.rowIdMappingOpt()
57+
.orElseThrow(() -> new IllegalStateException("Primary merge did not produce row-ID mapping required by secondaries"))
6058
: null;
6159

6260
for (DataFormat secondary : plan.secondaryFormats()) {
@@ -71,24 +69,18 @@ public MergeResult execute(MergePlan plan) {
7169
}
7270
}
7371

74-
private FormatMergeResult mergeFormat(
75-
MergePlan plan, DataFormat format, RowIdMapping mapping
76-
) throws IOException {
72+
private FormatMergeResult mergeFormat(MergePlan plan, DataFormat format, RowIdMapping mapping) throws IOException {
7773
Merger merger = mergers.get(format);
7874
List<WriterFileSet> files = plan.filesFor(format);
79-
MergeResult result = merger.merge(
80-
new MergeInput(files, mapping, plan.mergedWriterGeneration())
81-
);
82-
return new FormatMergeResult(
83-
format,
84-
result.getMergedWriterFileSetForDataformat(format),
85-
result.rowIdMapping().orElse(null)
86-
);
75+
List<Segment> segments = new ArrayList<>();
76+
for (WriterFileSet wfs : files) {
77+
segments.add(Segment.builder(wfs.writerGeneration()).addSearchableFiles(format, wfs).build());
78+
}
79+
MergeResult result = merger.merge(new MergeInput(segments, mapping, plan.mergedWriterGeneration()));
80+
return new FormatMergeResult(format, result.getMergedWriterFileSetForDataformat(format), result.rowIdMapping().orElse(null));
8781
}
8882

89-
private static MergeResult toMergeResult(
90-
List<FormatMergeResult> results, RowIdMapping mapping
91-
) {
83+
private static MergeResult toMergeResult(List<FormatMergeResult> results, RowIdMapping mapping) {
9284
Map<DataFormat, WriterFileSet> merged = new HashMap<>();
9385
for (FormatMergeResult r : results) {
9486
merged.put(r.format(), r.mergedFiles());

0 commit comments

Comments
 (0)