Skip to content

Commit a00259a

Browse files
author
Sagar Darji
committed
Adding the Implementation of the RowIdMapping, this will support Parquet as Primary engine and Lucene as Secondary Engine
Signed-off-by: Sagar Darji <darsaga@amazon.com>
1 parent 85da2a8 commit a00259a

27 files changed

Lines changed: 740 additions & 183 deletions

File tree

sandbox/plugins/analytics-backend-lucene/src/main/java/org/apache/lucene/index/MergeIndexWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.apache.lucene.index;
1010

1111
import org.apache.lucene.store.Directory;
12-
import org.opensearch.common.annotation.ExperimentalApi;
1312

1413
import java.io.IOException;
1514

@@ -31,7 +30,6 @@
3130
*
3231
* @opensearch.experimental
3332
*/
34-
@ExperimentalApi
3533
public class MergeIndexWriter extends IndexWriter {
3634

3735
public MergeIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
@@ -68,4 +66,11 @@ public void executeMerge(MergePolicy.OneMerge oneMerge, long mergeGeneration) th
6866
// merge() must be called without holding the lock — mergeInit asserts !Thread.holdsLock(this)
6967
merge(oneMerge);
7068
}
69+
70+
@Override
71+
protected void mergeSuccess(MergePolicy.OneMerge merge) {
72+
// TODO update this for lucene as a primary engine
73+
// https://github.com/opensearch-project/OpenSearch/issues/21505
74+
super.mergeSuccess(merge);
75+
}
7176
}

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneCommitter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) {
232232
// documents by remapped row ID during merge. When primary (or standalone), use the
233233
// engine config's IndexSort (which may be user-configured).
234234
// TODO Check what is the right way to get this information as the below one is leaky
235+
// https://github.com/opensearch-project/OpenSearch/issues/21506
235236
List<String> secondaryFormats = engineConfig.getIndexSettings().getSettings().getAsList("index.composite.secondary_data_formats");
236237
boolean isSecondary = secondaryFormats.contains("lucene");
237238

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/RowIdRemappingDocValuesProducer.java

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,11 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
6666
if (DocumentInput.ROW_ID_FIELD.equals(field.name)) {
6767
if (rowIdMapping != null) {
6868
return new MappedRowIdDocValues(delegate.getSortedNumeric(field), rowIdMapping, generation);
69+
} else {
70+
// https://github.com/opensearch-project/OpenSearch/issues/21508
71+
// TODO check how this will work for primary engine when rowIdMapping will be null.
72+
throw new UnsupportedOperationException("Lucene as Primary Format is not supported yet");
6973
}
70-
return new SequentialRowIdDocValues(maxDoc, rowIdOffset);
7174
}
7275
return delegate.getSortedNumeric(field);
7376
}
@@ -153,57 +156,4 @@ public long cost() {
153156
return delegate.cost();
154157
}
155158
}
156-
157-
/**
158-
* Assigns sequential {@code ___row_id} = {@code rowIdOffset + docID}.
159-
* Used when no RowIdMapping is provided.
160-
*/
161-
private static class SequentialRowIdDocValues extends SortedNumericDocValues {
162-
163-
private final int maxDoc;
164-
private final int rowIdOffset;
165-
private int docID = -1;
166-
167-
SequentialRowIdDocValues(int maxDoc, int rowIdOffset) {
168-
this.maxDoc = maxDoc;
169-
this.rowIdOffset = rowIdOffset;
170-
}
171-
172-
@Override
173-
public long nextValue() {
174-
return rowIdOffset + docID;
175-
}
176-
177-
@Override
178-
public int docValueCount() {
179-
return 1;
180-
}
181-
182-
@Override
183-
public boolean advanceExact(int target) {
184-
docID = target;
185-
return true;
186-
}
187-
188-
@Override
189-
public int docID() {
190-
return docID;
191-
}
192-
193-
@Override
194-
public int nextDoc() {
195-
return ++docID < maxDoc ? docID : NO_MORE_DOCS;
196-
}
197-
198-
@Override
199-
public int advance(int target) {
200-
docID = target;
201-
return docID < maxDoc ? docID : NO_MORE_DOCS;
202-
}
203-
204-
@Override
205-
public long cost() {
206-
return maxDoc;
207-
}
208-
}
209159
}

sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneMergerTests.java

Lines changed: 10 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,8 @@
4242
import java.nio.file.Path;
4343
import java.util.ArrayList;
4444
import java.util.HashMap;
45-
import java.util.HashSet;
4645
import java.util.List;
4746
import java.util.Map;
48-
import java.util.Set;
4947

5048
import static org.opensearch.be.lucene.index.LuceneWriter.WRITER_GENERATION_ATTRIBUTE;
5149

@@ -117,44 +115,6 @@ public void testMergeWithNoMatchingSegments() throws IOException {
117115
assertTrue(result.getMergedWriterFileSet().isEmpty());
118116
}
119117

120-
/**
121-
* Merge without RowIdMapping (primary merge) preserves all documents and field data.
122-
*/
123-
public void testMergeWithoutRowIdMappingPreservesData() throws IOException {
124-
writeSegment(writer, 1L, 0, 3);
125-
writeSegment(writer, 2L, 3, 2);
126-
writer.commit();
127-
128-
SegmentInfos infos = getSegmentInfos(writer);
129-
assertEquals(2, infos.size());
130-
assertEquals(5, writer.getDocStats().numDocs);
131-
132-
LuceneMerger merger = new LuceneMerger(writer, new LuceneDataFormat(), dataPath);
133-
List<Segment> segments = buildSegments(infos);
134-
135-
MergeInput input = MergeInput.builder().segments(segments).newWriterGeneration(10L).build();
136-
MergeResult result = merger.merge(input);
137-
assertNotNull(result);
138-
139-
writer.commit();
140-
// addIndexes adds a new merged segment; original segments remain.
141-
// Total docs = original 5 + merged 5 = 10
142-
try (DirectoryReader reader = DirectoryReader.open(writer)) {
143-
assertTrue("Should have at least 5 docs after merge", reader.numDocs() >= 5);
144-
Set<String> foundIds = new HashSet<>();
145-
for (LeafReaderContext ctx : reader.leaves()) {
146-
for (int i = 0; i < ctx.reader().maxDoc(); i++) {
147-
Document doc = ctx.reader().storedFields().document(i);
148-
foundIds.add(doc.get("id"));
149-
assertNotNull("data field should be preserved", doc.get("data"));
150-
}
151-
}
152-
for (int i = 0; i < 5; i++) {
153-
assertTrue("Missing doc id: doc_" + i, foundIds.contains("doc_" + i));
154-
}
155-
}
156-
}
157-
158118
/**
159119
* Merge with RowIdMapping remaps ___row_id doc values AND reorders documents.
160120
* Verifies that the merged segment has documents sorted by remapped row IDs
@@ -242,6 +202,11 @@ public void testMergeWithRowIdMappingRemapsRowIds() throws IOException {
242202

243203
/**
244204
* Merge preserves keyword, numeric, and stored field data integrity.
205+
*
206+
* <p>Uses an identity {@link RowIdMapping} so the merge exercises the real
207+
* secondary-format path; the assertions focus on field-data survival rather
208+
* than on row-id remapping (which is covered by
209+
* {@link #testMergeWithRowIdMappingRemapsRowIds()}).
245210
*/
246211
public void testMergePreservesFieldDataIntegrity() throws IOException {
247212
writeSegmentWithRichFields(writer, 1L, 0, 3);
@@ -252,7 +217,11 @@ public void testMergePreservesFieldDataIntegrity() throws IOException {
252217
SegmentInfos infos = getSegmentInfos(writer);
253218
List<Segment> segments = buildSegments(infos);
254219

255-
MergeInput input = MergeInput.builder().segments(segments).newWriterGeneration(10L).build();
220+
// Identity mapping — writeSegmentWithRichFields already writes globally-unique row IDs
221+
// (0,1,2 in gen=1 and 3,4 in gen=2), so returning the original row ID is well-formed.
222+
RowIdMapping identityMapping = (oldId, oldGeneration) -> oldId;
223+
224+
MergeInput input = MergeInput.builder().segments(segments).rowIdMapping(identityMapping).newWriterGeneration(10L).build();
256225
merger.merge(input);
257226
writer.commit();
258227

0 commit comments

Comments
 (0)