Skip to content

Commit 139b9f9

Browse files
darjisagar7Sagar Darji
andauthored
Adding Lucene Merge supports for pluggable dataformat support (opensearch-project#21422)
Adding Lucene Merge support for pluggable dataformat support Signed-off-by: Sagar Darji <darsaga@amazon.com> Co-authored-by: Sagar Darji <darsaga@amazon.com>
1 parent 29fc518 commit 139b9f9

47 files changed

Lines changed: 2548 additions & 151 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
package org.opensearch.be.lucene;
1010

1111
import org.apache.lucene.index.DirectoryReader;
12+
import org.apache.lucene.index.SegmentCommitInfo;
13+
import org.apache.lucene.index.SegmentReader;
1214
import org.opensearch.common.annotation.ExperimentalApi;
1315
import org.opensearch.index.engine.dataformat.DataFormat;
1416
import org.opensearch.index.engine.exec.EngineReaderManager;
17+
import org.opensearch.index.engine.exec.Segment;
1518
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1619

1720
import java.io.IOException;
@@ -20,6 +23,8 @@
2023
import java.util.Map;
2124
import java.util.Objects;
2225

26+
import static org.opensearch.be.lucene.index.LuceneWriter.WRITER_GENERATION_ATTRIBUTE;
27+
2328
/**
2429
* Lucene implementation of {@link EngineReaderManager}.
2530
* <p>
@@ -72,11 +77,60 @@ public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) th
7277
}
7378
DirectoryReader refreshed = DirectoryReader.openIfChanged(currentReader);
7479
if (refreshed != null) {
80+
// Guard against refresh/merge-apply races: a prior IT regression surfaced when
81+
// overlapping threads produced a refreshed reader whose leaves disagreed with the
82+
// catalog snapshot being registered, effectively pairing the snapshot with a stale
83+
// reader. This assert catches that drift in test builds before the mismatched pair
84+
// is published to readers.
85+
assert readersAreSame(catalogSnapshot, refreshed);
7586
currentReader = refreshed;
7687
}
7788
readers.put(catalogSnapshot, currentReader);
7889
}
7990

91+
/**
92+
* Consistency check: verifies that the refreshed {@link DirectoryReader} reflects exactly
93+
* the set of segments the given {@link CatalogSnapshot} references. Compares the sorted
94+
* list of writer generations drawn from the snapshot's {@link Segment Segments} against
95+
* the sorted list of writer generations read off each leaf of the reader (via the
96+
* {@link org.opensearch.be.lucene.index.LuceneWriter#WRITER_GENERATION_ATTRIBUTE} stamped
97+
* onto every Lucene segment at write time).
98+
*
99+
* <p>Used only in an {@code assert} to catch refresh/catalog drift in test builds — if
100+
* this ever returns {@code false} in production, it means a Lucene reader has been paired
101+
* with the wrong catalog snapshot.
102+
*
103+
* @param catalogSnapshot catalog snapshot whose referenced generations are the expected set
104+
* @param readers DirectoryReader whose leaves' generations are the actual set
105+
* @return {@code true} iff both lists contain the same generations in the same (sorted) order
106+
*/
107+
private boolean readersAreSame(CatalogSnapshot catalogSnapshot, DirectoryReader readers) {
108+
Collection<Long> generationsReferenced = catalogSnapshot.getSegments().stream().map(Segment::generation).sorted().toList();
109+
return generationsReferenced.equals(collectReferencedGenerations(readers));
110+
}
111+
112+
/**
113+
* Extracts the writer generation from each leaf of the given {@link DirectoryReader} and
114+
* returns them as a sorted list. Each leaf's {@link SegmentReader} carries a
115+
* {@link SegmentCommitInfo} whose {@code SegmentInfo} is stamped with the
116+
* {@link org.opensearch.be.lucene.index.LuceneWriter#WRITER_GENERATION_ATTRIBUTE} when the
117+
* segment is written; parsing that attribute yields the generation that produced the leaf.
118+
*
119+
* @param reader the DirectoryReader to inspect
120+
* @return generations of all leaves, sorted ascending
121+
* @throws NumberFormatException if a leaf is missing the writer-generation attribute or
122+
* its value is not parseable as a long (indicates a segment
123+
* not produced by {@link org.opensearch.be.lucene.index.LuceneWriter})
124+
* @throws ClassCastException if any leaf reader is not a {@link SegmentReader}
125+
*/
126+
private Collection<Long> collectReferencedGenerations(DirectoryReader reader) {
127+
return reader.leaves().stream().map(lrc -> {
128+
SegmentReader segmentReader = (SegmentReader) lrc.reader();
129+
SegmentCommitInfo sci = segmentReader.getSegmentInfo();
130+
return Long.parseLong(sci.info.getAttribute(WRITER_GENERATION_ATTRIBUTE));
131+
}).sorted().toList();
132+
}
133+
80134
@Override
81135
public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException {
82136
DirectoryReader reader = readers.remove(catalogSnapshot);

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneCommitter.java

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
import org.apache.lucene.index.IndexFileNames;
1616
import org.apache.lucene.index.IndexWriter;
1717
import org.apache.lucene.index.IndexWriterConfig;
18+
import org.apache.lucene.index.MergeIndexWriter;
1819
import org.apache.lucene.index.NoMergePolicy;
1920
import org.apache.lucene.index.SegmentInfos;
21+
import org.apache.lucene.index.SerialMergeScheduler;
22+
import org.apache.lucene.search.Sort;
23+
import org.apache.lucene.search.SortField;
24+
import org.apache.lucene.search.SortedNumericSortField;
2025
import org.opensearch.common.annotation.ExperimentalApi;
2126
import org.opensearch.index.engine.CommitStats;
2227
import org.opensearch.index.engine.EngineConfig;
2328
import org.opensearch.index.engine.SafeCommitInfo;
29+
import org.opensearch.index.engine.dataformat.DocumentInput;
2430
import org.opensearch.index.engine.exec.CombinedCatalogSnapshotDeletionPolicy;
2531
import org.opensearch.index.engine.exec.commit.Committer;
2632
import org.opensearch.index.engine.exec.commit.CommitterConfig;
@@ -59,6 +65,19 @@
5965
* The store reference is incremented on construction and decremented on {@link #close()}.
6066
* Closing the committer also closes the underlying IndexWriter.
6167
*
68+
* <h2>Refresh-lock coordination</h2>
69+
*
70+
* <p>The engine passes a {@code preMergeCommitHook} via {@link CommitterConfig}. We wire it
71+
* into Lucene as a {@code MergedSegmentWarmer} on the {@link IndexWriterConfig}. The warmer
72+
* runs between {@code mergeMiddle} and {@code commitMerge} while the {@link IndexWriter}
73+
* monitor is <em>not</em> held, so invoking the hook there establishes the ordering
74+
* {@code refreshLock → IW monitor} on the merge thread — matching the refresh path and
75+
* avoiding the lock inversion that would occur if coordination happened inside
76+
* {@code commitMerge}. Ownership of whatever the hook acquires (currently the engine's
77+
* refresh lock) is transferred to the engine's {@code applyMergeChanges} callback, which
78+
* releases it after the catalog is updated. This committer never touches the refresh lock
79+
* directly.
80+
*
6281
* @opensearch.experimental
6382
*/
6483
@ExperimentalApi
@@ -67,7 +86,7 @@ public class LuceneCommitter extends SafeBootstrapCommitter {
6786
private static final Logger logger = LogManager.getLogger(LuceneCommitter.class);
6887

6988
private final Store store;
70-
private final IndexWriter indexWriter;
89+
private final MergeIndexWriter indexWriter;
7190
private final LuceneCommitDeletionPolicy deletionPolicy;
7291
private final AtomicBoolean isClosed = new AtomicBoolean();
7392

@@ -84,8 +103,8 @@ public LuceneCommitter(CommitterConfig committerConfig) throws IOException {
84103
this.store.incRef();
85104
try {
86105
this.deletionPolicy = new LuceneCommitDeletionPolicy();
87-
IndexWriterConfig iwc = createIndexWriterConfig(committerConfig.engineConfig());
88-
this.indexWriter = new IndexWriter(store.directory(), iwc);
106+
IndexWriterConfig iwc = createIndexWriterConfig(committerConfig);
107+
this.indexWriter = new MergeIndexWriter(store.directory(), iwc);
89108
} catch (Exception e) {
90109
store.decRef();
91110
throw e;
@@ -197,18 +216,20 @@ public boolean isCommitManagedFile(String fileName) {
197216
*
198217
* @return the index writer, or null if closed
199218
*/
200-
IndexWriter getIndexWriter() {
219+
MergeIndexWriter getIndexWriter() {
201220
ensureOpen();
202221
return indexWriter;
203222
}
204223

205224
// --- Internal ---
206225

207-
private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) {
226+
private IndexWriterConfig createIndexWriterConfig(CommitterConfig committerConfig) {
227+
EngineConfig engineConfig = committerConfig.engineConfig();
208228
if (engineConfig == null) {
209229
IndexWriterConfig iwc = new IndexWriterConfig();
210230
iwc.setIndexDeletionPolicy(deletionPolicy);
211231
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
232+
iwc.setMergeScheduler(new SerialMergeScheduler());
212233
return iwc;
213234
}
214235
// TODO:: Merge Config needs to be wired in
@@ -219,13 +240,34 @@ private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) {
219240
}
220241
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
221242
iwc.setUseCompoundFile(engineConfig.useCompoundFile());
222-
if (engineConfig.getIndexSort() != null) {
243+
// Refresh-lock hand-off: the MergedSegmentWarmer fires on the merge thread between
244+
// mergeMiddle and commitMerge, while the IndexWriter monitor is NOT held. Invoking
245+
// the engine-provided preMergeCommitHook here gives the merge path the ordering
246+
// refreshLock → IW monitor, which matches the refresh path (DataFormatAwareEngine#refresh
247+
// takes refreshLock before calling IndexWriter#addIndexes). Ownership of whatever the
248+
// hook acquires is transferred to applyMergeChanges, which releases it after the
249+
// catalog is updated. See the class Javadoc.
250+
iwc.setMergedSegmentWarmer(_ -> committerConfig.preMergeCommitHook().run());
251+
252+
// Determine if Lucene is a secondary format in a composite setup.
253+
// When secondary, use a SortedNumericSortField on the row ID so MultiSorter can reorder
254+
// documents by remapped row ID during merge. When primary (or standalone), use the
255+
// engine config's IndexSort (which may be user-configured).
256+
// TODO Check what is the right way to get this information as the below one is leaky
257+
// https://github.com/opensearch-project/OpenSearch/issues/21506
258+
List<String> secondaryFormats = engineConfig.getIndexSettings().getSettings().getAsList("index.composite.secondary_data_formats");
259+
boolean isSecondary = secondaryFormats.contains("lucene");
260+
261+
if (isSecondary) {
262+
iwc.setIndexSort(new Sort(new SortedNumericSortField(DocumentInput.ROW_ID_FIELD, SortField.Type.LONG)));
263+
} else if (engineConfig.getIndexSort() != null) {
223264
iwc.setIndexSort(engineConfig.getIndexSort());
224265
}
225266
iwc.setCommitOnClose(false);
226267
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
227268
iwc.setIndexDeletionPolicy(deletionPolicy);
228269
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
270+
iwc.setMergeScheduler(new SerialMergeScheduler());
229271
return iwc;
230272
}
231273

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneDocumentInput.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
package org.opensearch.be.lucene.index;
1010

1111
import org.apache.lucene.document.Document;
12-
import org.apache.lucene.document.NumericDocValuesField;
12+
import org.apache.lucene.document.SortedNumericDocValuesField;
1313
import org.opensearch.be.lucene.LuceneFieldFactory;
1414
import org.opensearch.be.lucene.LuceneFieldFactoryRegistry;
1515
import org.opensearch.common.annotation.ExperimentalApi;
@@ -26,8 +26,9 @@
2626
* Only field types registered in the registry are accepted. Attempting to add a field
2727
* of an unregistered type throws {@link IllegalArgumentException}.
2828
*
29-
* The row ID field is stored as a {@link NumericDocValuesField} for efficient doc-value
30-
* access, maintaining 1:1 correspondence between Lucene doc IDs and Parquet row offsets.
29+
* The row ID field is stored as a {@link SortedNumericDocValuesField} for efficient doc-value
30+
* access and compatibility with the {@code SortedNumericSortField}-based IndexSort,
31+
* maintaining 1:1 correspondence between Lucene doc IDs and Parquet row offsets.
3132
*
3233
* @opensearch.experimental
3334
*/
@@ -95,15 +96,15 @@ public void addField(MappedFieldType fieldType, Object value) {
9596
}
9697

9798
/**
98-
* Stores the row ID as a {@link NumericDocValuesField} to maintain 1:1 correspondence
99+
* Stores the row ID as a {@link SortedNumericDocValuesField} to maintain 1:1 correspondence
99100
* between Lucene doc IDs and Parquet row offsets.
100101
*
101102
* @param rowIdFieldName the name of the row ID field
102103
* @param rowId the row ID value (0-based sequential within the writer)
103104
*/
104105
@Override
105106
public void setRowId(String rowIdFieldName, long rowId) {
106-
document.add(new NumericDocValuesField(rowIdFieldName, rowId));
107+
document.add(new SortedNumericDocValuesField(rowIdFieldName, rowId));
107108
}
108109

109110
/** No-op — this document input holds no closeable resources. */

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.index.DirectoryReader;
1616
import org.apache.lucene.index.IndexWriter;
1717
import org.apache.lucene.index.LeafReaderContext;
18+
import org.apache.lucene.index.MergeIndexWriter;
1819
import org.apache.lucene.index.NoMergePolicy;
1920
import org.apache.lucene.index.SegmentCommitInfo;
2021
import org.apache.lucene.index.SegmentReader;
@@ -23,10 +24,10 @@
2324
import org.apache.lucene.store.MMapDirectory;
2425
import org.opensearch.be.lucene.LuceneDataFormat;
2526
import org.opensearch.be.lucene.LuceneFieldFactoryRegistry;
27+
import org.opensearch.be.lucene.merge.LuceneMerger;
2628
import org.opensearch.common.annotation.ExperimentalApi;
2729
import org.opensearch.index.engine.dataformat.DataFormat;
2830
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
29-
import org.opensearch.index.engine.dataformat.MergeResult;
3031
import org.opensearch.index.engine.dataformat.Merger;
3132
import org.opensearch.index.engine.dataformat.RefreshInput;
3233
import org.opensearch.index.engine.dataformat.RefreshResult;
@@ -74,11 +75,12 @@ public class LuceneIndexingExecutionEngine implements IndexingExecutionEngine<Lu
7475
private static final Logger logger = LogManager.getLogger(LuceneIndexingExecutionEngine.class);
7576

7677
private final LuceneDataFormat dataFormat;
77-
private final IndexWriter sharedWriter;
78+
private final MergeIndexWriter sharedWriter;
7879
private final Store store;
7980
private final Path baseDirectory;
8081
private final Analyzer analyzer;
8182
private final Codec codec;
83+
private final LuceneMerger luceneMerger;
8284
private final LuceneFieldFactoryRegistry fieldFactoryRegistry;
8385

8486
/**
@@ -105,6 +107,8 @@ public LuceneIndexingExecutionEngine(
105107
this.codec = sharedWriter.getConfig().getCodec();
106108
this.fieldFactoryRegistry = new LuceneFieldFactoryRegistry();
107109

110+
this.luceneMerger = new LuceneMerger(sharedWriter, dataFormat, store.shardPath().resolveIndex());
111+
108112
// Create the lucene subdirectory if it doesn't exist
109113
try {
110114
Files.createDirectories(baseDirectory);
@@ -120,7 +124,7 @@ public LuceneIndexingExecutionEngine(
120124
*
121125
* @return the index writer
122126
*/
123-
public IndexWriter getWriter() {
127+
public MergeIndexWriter getWriter() {
124128
return sharedWriter;
125129
}
126130

@@ -153,7 +157,7 @@ public FormatStore getStore(DataFormat dataFormat) {
153157
public Writer<LuceneDocumentInput> createWriter(long writerGeneration) {
154158
assert sharedWriter.isOpen() : "Cannot create writer — shared IndexWriter is closed";
155159
try {
156-
return new LuceneWriter(writerGeneration, dataFormat, baseDirectory, analyzer, codec);
160+
return new LuceneWriter(writerGeneration, dataFormat, baseDirectory, analyzer, codec, sharedWriter.getConfig().getIndexSort());
157161
} catch (IOException e) {
158162
throw new RuntimeException("Failed to create LuceneWriter for generation " + writerGeneration, e);
159163
}
@@ -279,8 +283,7 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
279283
/** Returns {@code null} — merge scheduling is not yet implemented for the Lucene format. */
280284
@Override
281285
public Merger getMerger() {
282-
// TODO: Implement merge support as ParquetMerger
283-
return mergeInput -> new MergeResult(Map.of());
286+
return this.luceneMerger;
284287
}
285288

286289
/**

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.lucene.index.IndexWriterConfig;
1818
import org.apache.lucene.index.SegmentCommitInfo;
1919
import org.apache.lucene.index.SegmentInfos;
20+
import org.apache.lucene.search.Sort;
2021
import org.apache.lucene.store.Directory;
2122
import org.apache.lucene.store.MMapDirectory;
2223
import org.opensearch.be.lucene.LuceneDataFormat;
@@ -80,10 +81,18 @@ public class LuceneWriter implements Writer<LuceneDocumentInput> {
8081
* @param dataFormat the Lucene data format descriptor
8182
* @param baseDirectory the base directory under which to create the temp directory
8283
* @param analyzer the analyzer to use for tokenized fields, or null for default
84+
* @param codec the codec to use, or null for default
85+
* @param indexSort the index sort to apply to segments, or null for no sort
8386
* @throws IOException if directory creation or IndexWriter opening fails
8487
*/
85-
public LuceneWriter(long writerGeneration, LuceneDataFormat dataFormat, Path baseDirectory, Analyzer analyzer, Codec codec)
86-
throws IOException {
88+
public LuceneWriter(
89+
long writerGeneration,
90+
LuceneDataFormat dataFormat,
91+
Path baseDirectory,
92+
Analyzer analyzer,
93+
Codec codec,
94+
Sort indexSort
95+
) throws IOException {
8796
this.writerGeneration = writerGeneration;
8897
this.dataFormat = dataFormat;
8998
this.docCount = 0;
@@ -97,6 +106,9 @@ public LuceneWriter(long writerGeneration, LuceneDataFormat dataFormat, Path bas
97106
IndexWriterConfig iwc = analyzer != null ? new IndexWriterConfig(analyzer) : new IndexWriterConfig();
98107
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
99108
iwc.setRAMBufferSizeMB(RAM_BUFFER_SIZE_MB);
109+
if (indexSort != null) {
110+
iwc.setIndexSort(indexSort);
111+
}
100112

101113
iwc.setCodec(new LuceneWriterCodec(codec, writerGeneration));
102114
this.indexWriter = new IndexWriter(directory, iwc);

0 commit comments

Comments
 (0)