Skip to content

Commit 8cbdf31

Browse files
author
Sagar Darji
committed
ixing the race condition between Merge and Refresh flow
Signed-off-by: Sagar Darji <darsaga@amazon.com>
1 parent a00259a commit 8cbdf31

15 files changed

Lines changed: 565 additions & 253 deletions

File tree

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
package org.opensearch.be.lucene;
1010

1111
import org.apache.lucene.index.DirectoryReader;
12+
import org.apache.lucene.index.SegmentCommitInfo;
13+
import org.apache.lucene.index.SegmentReader;
1214
import org.opensearch.common.annotation.ExperimentalApi;
1315
import org.opensearch.index.engine.dataformat.DataFormat;
1416
import org.opensearch.index.engine.exec.EngineReaderManager;
17+
import org.opensearch.index.engine.exec.Segment;
1518
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1619

1720
import java.io.IOException;
@@ -20,6 +23,8 @@
2023
import java.util.Map;
2124
import java.util.Objects;
2225

26+
import static org.opensearch.be.lucene.index.LuceneWriter.WRITER_GENERATION_ATTRIBUTE;
27+
2328
/**
2429
* Lucene implementation of {@link EngineReaderManager}.
2530
* <p>
@@ -72,11 +77,60 @@ public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) th
7277
}
7378
DirectoryReader refreshed = DirectoryReader.openIfChanged(currentReader);
7479
if (refreshed != null) {
80+
// Guard against refresh/merge-apply races: a prior IT regression surfaced when
81+
// overlapping threads produced a refreshed reader whose leaves disagreed with the
82+
// catalog snapshot being registered, effectively pairing the snapshot with a stale
83+
// reader. This assert catches that drift in test builds before the mismatched pair
84+
// is published to readers.
85+
assert readersAreSame(catalogSnapshot, refreshed);
7586
currentReader = refreshed;
7687
}
7788
readers.put(catalogSnapshot, currentReader);
7889
}
7990

91+
/**
92+
* Consistency check: verifies that the refreshed {@link DirectoryReader} reflects exactly
93+
* the set of segments the given {@link CatalogSnapshot} references. Compares the sorted
94+
* list of writer generations drawn from the snapshot's {@link Segment Segments} against
95+
* the sorted list of writer generations read off each leaf of the reader (via the
96+
* {@link org.opensearch.be.lucene.index.LuceneWriter#WRITER_GENERATION_ATTRIBUTE} stamped
97+
* onto every Lucene segment at write time).
98+
*
99+
* <p>Used only in an {@code assert} to catch refresh/catalog drift in test builds — if
100+
* this ever returns {@code false} in production, it means a Lucene reader has been paired
101+
* with the wrong catalog snapshot.
102+
*
103+
* @param catalogSnapshot catalog snapshot whose referenced generations are the expected set
104+
* @param readers DirectoryReader whose leaves' generations are the actual set
105+
* @return {@code true} iff both lists contain the same generations in the same (sorted) order
106+
*/
107+
private boolean readersAreSame(CatalogSnapshot catalogSnapshot, DirectoryReader readers) {
108+
Collection<Long> generationsReferenced = catalogSnapshot.getSegments().stream().map(Segment::generation).sorted().toList();
109+
return generationsReferenced.equals(collectReferencedGenerations(readers));
110+
}
111+
112+
/**
113+
* Extracts the writer generation from each leaf of the given {@link DirectoryReader} and
114+
* returns them as a sorted list. Each leaf's {@link SegmentReader} carries a
115+
* {@link SegmentCommitInfo} whose {@code SegmentInfo} is stamped with the
116+
* {@link org.opensearch.be.lucene.index.LuceneWriter#WRITER_GENERATION_ATTRIBUTE} when the
117+
* segment is written; parsing that attribute yields the generation that produced the leaf.
118+
*
119+
* @param reader the DirectoryReader to inspect
120+
* @return generations of all leaves, sorted ascending
121+
* @throws NumberFormatException if a leaf is missing the writer-generation attribute or
122+
* its value is not parseable as a long (indicates a segment
123+
* not produced by {@link org.opensearch.be.lucene.index.LuceneWriter})
124+
* @throws ClassCastException if any leaf reader is not a {@link SegmentReader}
125+
*/
126+
private Collection<Long> collectReferencedGenerations(DirectoryReader reader) {
127+
return reader.leaves().stream().map(lrc -> {
128+
SegmentReader segmentReader = (SegmentReader) lrc.reader();
129+
SegmentCommitInfo sci = segmentReader.getSegmentInfo();
130+
return Long.parseLong(sci.info.getAttribute(WRITER_GENERATION_ATTRIBUTE));
131+
}).sorted().toList();
132+
}
133+
80134
@Override
81135
public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException {
82136
DirectoryReader reader = readers.remove(catalogSnapshot);

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneCommitter.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@
6565
* The store reference is incremented on construction and decremented on {@link #close()}.
6666
* Closing the committer also closes the underlying IndexWriter.
6767
*
68+
* <h2>Refresh-lock coordination</h2>
69+
*
70+
* <p>The engine passes a {@code preMergeCommitHook} via {@link CommitterConfig}. We wire it
71+
* into Lucene as a {@code MergedSegmentWarmer} on the {@link IndexWriterConfig}. The warmer
72+
* runs between {@code mergeMiddle} and {@code commitMerge} while the {@link IndexWriter}
73+
* monitor is <em>not</em> held, so invoking the hook there establishes the ordering
74+
* {@code refreshLock → IW monitor} on the merge thread — matching the refresh path and
75+
* avoiding the lock inversion that would occur if coordination happened inside
76+
* {@code commitMerge}. Ownership of whatever the hook acquires (currently the engine's
77+
* refresh lock) is transferred to the engine's {@code applyMergeChanges} callback, which
78+
* releases it after the catalog is updated. This committer never touches the refresh lock
79+
* directly.
80+
*
6881
* @opensearch.experimental
6982
*/
7083
@ExperimentalApi
@@ -73,7 +86,7 @@ public class LuceneCommitter extends SafeBootstrapCommitter {
7386
private static final Logger logger = LogManager.getLogger(LuceneCommitter.class);
7487

7588
private final Store store;
76-
private final IndexWriter indexWriter;
89+
private final MergeIndexWriter indexWriter;
7790
private final LuceneCommitDeletionPolicy deletionPolicy;
7891
private final AtomicBoolean isClosed = new AtomicBoolean();
7992

@@ -90,7 +103,7 @@ public LuceneCommitter(CommitterConfig committerConfig) throws IOException {
90103
this.store.incRef();
91104
try {
92105
this.deletionPolicy = new LuceneCommitDeletionPolicy();
93-
IndexWriterConfig iwc = createIndexWriterConfig(committerConfig.engineConfig());
106+
IndexWriterConfig iwc = createIndexWriterConfig(committerConfig);
94107
this.indexWriter = new MergeIndexWriter(store.directory(), iwc);
95108
} catch (Exception e) {
96109
store.decRef();
@@ -203,14 +216,15 @@ public boolean isCommitManagedFile(String fileName) {
203216
*
204217
* @return the index writer, or null if closed
205218
*/
206-
IndexWriter getIndexWriter() {
219+
MergeIndexWriter getIndexWriter() {
207220
ensureOpen();
208221
return indexWriter;
209222
}
210223

211224
// --- Internal ---
212225

213-
private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) {
226+
private IndexWriterConfig createIndexWriterConfig(CommitterConfig committerConfig) {
227+
EngineConfig engineConfig = committerConfig.engineConfig();
214228
if (engineConfig == null) {
215229
IndexWriterConfig iwc = new IndexWriterConfig();
216230
iwc.setIndexDeletionPolicy(deletionPolicy);
@@ -226,6 +240,14 @@ private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) {
226240
}
227241
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
228242
iwc.setUseCompoundFile(engineConfig.useCompoundFile());
243+
// Refresh-lock hand-off: the MergedSegmentWarmer fires on the merge thread between
244+
// mergeMiddle and commitMerge, while the IndexWriter monitor is NOT held. Invoking
245+
// the engine-provided preMergeCommitHook here gives the merge path the ordering
246+
// refreshLock → IW monitor, which matches the refresh path (DataFormatAwareEngine#refresh
247+
// takes refreshLock before calling IndexWriter#addIndexes). Ownership of whatever the
248+
// hook acquires is transferred to applyMergeChanges, which releases it after the
249+
// catalog is updated. See the class Javadoc.
250+
iwc.setMergedSegmentWarmer(_ -> committerConfig.preMergeCommitHook().run());
229251

230252
// Determine if Lucene is a secondary format in a composite setup.
231253
// When secondary, use a SortedNumericSortField on the row ID so MultiSorter can reorder

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.index.DirectoryReader;
1616
import org.apache.lucene.index.IndexWriter;
1717
import org.apache.lucene.index.LeafReaderContext;
18+
import org.apache.lucene.index.MergeIndexWriter;
1819
import org.apache.lucene.index.NoMergePolicy;
1920
import org.apache.lucene.index.SegmentCommitInfo;
2021
import org.apache.lucene.index.SegmentReader;
@@ -74,7 +75,7 @@ public class LuceneIndexingExecutionEngine implements IndexingExecutionEngine<Lu
7475
private static final Logger logger = LogManager.getLogger(LuceneIndexingExecutionEngine.class);
7576

7677
private final LuceneDataFormat dataFormat;
77-
private final IndexWriter sharedWriter;
78+
private final MergeIndexWriter sharedWriter;
7879
private final Store store;
7980
private final Path baseDirectory;
8081
private final Analyzer analyzer;
@@ -123,7 +124,7 @@ public LuceneIndexingExecutionEngine(
123124
*
124125
* @return the index writer
125126
*/
126-
public IndexWriter getWriter() {
127+
public MergeIndexWriter getWriter() {
127128
return sharedWriter;
128129
}
129130

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/LuceneMerger.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,11 @@ private static Field initSegmentInfosField() {
8080
private final Path storeDirectory;
8181
private final LuceneMergeStrategy strategy;
8282

83-
public LuceneMerger(IndexWriter indexWriter, DataFormat dataFormat, Path storeDirectory) {
83+
public LuceneMerger(MergeIndexWriter indexWriter, DataFormat dataFormat, Path storeDirectory) {
8484
if (indexWriter == null) {
8585
throw new IllegalArgumentException("IndexWriter must not be null");
8686
}
87-
if (indexWriter instanceof MergeIndexWriter == false) {
88-
throw new IllegalArgumentException("IndexWriter must be a MergeIndexWriter, got " + indexWriter.getClass().getName());
89-
}
90-
this.indexWriter = (MergeIndexWriter) indexWriter;
87+
this.indexWriter = indexWriter;
9188
this.dataFormat = dataFormat;
9289
this.storeDirectory = storeDirectory;
9390
// TODO implement primary and integrate the same here
@@ -137,8 +134,21 @@ public MergeResult merge(MergeInput mergeInput) throws IOException {
137134
MergePolicy.OneMerge oneMerge = strategy.createOneMerge(matchingSegments, rowIdMapping);
138135
indexWriter.executeMerge(oneMerge, mergeInput.newWriterGeneration());
139136

137+
// Stamp the merged segment with its writer generation so downstream lookups
138+
// (e.g. findMatchingSegments on a subsequent merge) can correlate it.
139+
//
140+
// This mutation is in-memory only: Lucene writes the .si file exactly once at
141+
// segment creation via SegmentInfoFormat.write(...) and does not rewrite it on
142+
// later commits, so this attribute will not survive a writer reopen. That is
143+
// acceptable here because the attribute is only consumed within the lifetime
144+
// of the live IndexWriter's SegmentInfos.
145+
SegmentCommitInfo mergedInfo = oneMerge.getMergeInfo();
146+
if (mergedInfo != null) {
147+
mergedInfo.info.putAttribute(WRITER_GENERATION_ATTRIBUTE, String.valueOf(mergeInput.newWriterGeneration()));
148+
}
149+
140150
// Build the merged WriterFileSet from the output segment info
141-
WriterFileSet mergedFileSet = buildMergedFileSet(oneMerge.getMergeInfo(), mergeInput.newWriterGeneration());
151+
WriterFileSet mergedFileSet = buildMergedFileSet(mergedInfo, mergeInput.newWriterGeneration());
142152

143153
// Delegate RowIdMapping production to the strategy
144154
RowIdMapping outputMapping = strategy.buildRowIdMapping(oneMerge, mergeInput);

sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@
1515
import org.apache.lucene.index.DirectoryReader;
1616
import org.apache.lucene.index.IndexWriter;
1717
import org.apache.lucene.index.IndexWriterConfig;
18+
import org.apache.lucene.index.SegmentCommitInfo;
19+
import org.apache.lucene.index.SegmentInfos;
1820
import org.apache.lucene.search.IndexSearcher;
1921
import org.apache.lucene.search.MatchAllDocsQuery;
2022
import org.apache.lucene.store.Directory;
2123
import org.apache.lucene.store.MMapDirectory;
2224
import org.apache.lucene.store.NIOFSDirectory;
2325
import org.opensearch.be.lucene.index.LuceneCommitter;
2426
import org.opensearch.be.lucene.index.LuceneIndexingExecutionEngine;
27+
import org.opensearch.be.lucene.index.LuceneWriter;
28+
import org.opensearch.common.SuppressForbidden;
2529
import org.opensearch.common.settings.Settings;
2630
import org.opensearch.core.index.shard.ShardId;
2731
import org.opensearch.index.IndexSettings;
@@ -99,6 +103,17 @@ private DirectoryReader openReader() throws IOException {
99103
}
100104

101105
private CatalogSnapshot stubSnapshot(long generation) {
106+
return stubSnapshot(generation, List.of());
107+
}
108+
109+
/**
110+
* Builds a stub snapshot whose segment list contains the given writer generations.
111+
* This is required by {@link LuceneReaderManager#afterRefresh}'s assertion, which
112+
* compares the snapshot's segment generations against the writer-generation attribute
113+
* on each leaf in the refreshed {@link DirectoryReader}.
114+
*/
115+
private CatalogSnapshot stubSnapshot(long generation, List<Long> segmentGenerations) {
116+
List<Segment> segs = segmentGenerations.stream().map(g -> Segment.builder(g).build()).toList();
102117
return new CatalogSnapshot("test", generation, 1) {
103118
@Override
104119
protected void closeInternal() {}
@@ -115,7 +130,7 @@ public long getId() {
115130

116131
@Override
117132
public List<Segment> getSegments() {
118-
return List.of();
133+
return segs;
119134
}
120135

121136
@Override
@@ -168,11 +183,36 @@ public Collection<String> getFiles(boolean includeSegmentsFile) {
168183
};
169184
}
170185

171-
private void addDoc(String id) throws IOException {
186+
private void addDoc(String id, long generation) throws IOException {
172187
Document doc = new Document();
173188
doc.add(new StringField("id", id, Field.Store.YES));
174189
indexWriter.addDocument(doc);
175190
indexWriter.commit();
191+
stampLatestSegmentGeneration(generation);
192+
}
193+
194+
/**
195+
* Stamps the most recently written segment with the {@code writer_generation} attribute
196+
* that {@link LuceneReaderManager#afterRefresh}'s assertion expects. In production this
197+
* is done by {@code LuceneWriterCodec}; tests that write directly through a plain
198+
* {@link IndexWriter} must stamp it themselves.
199+
*/
200+
@SuppressForbidden(reason = "Need reflection to stamp writer_generation on segments for testing")
201+
private void stampLatestSegmentGeneration(long generation) throws IOException {
202+
try {
203+
java.lang.reflect.Field segInfosField = IndexWriter.class.getDeclaredField("segmentInfos");
204+
segInfosField.setAccessible(true);
205+
SegmentInfos segInfos = (SegmentInfos) segInfosField.get(indexWriter);
206+
if (segInfos.size() == 0) {
207+
return;
208+
}
209+
SegmentCommitInfo last = segInfos.asList().get(segInfos.size() - 1);
210+
if (last.info.getAttribute(LuceneWriter.WRITER_GENERATION_ATTRIBUTE) == null) {
211+
last.info.putAttribute(LuceneWriter.WRITER_GENERATION_ATTRIBUTE, String.valueOf(generation));
212+
}
213+
} catch (ReflectiveOperationException e) {
214+
throw new IOException("Failed to stamp writer_generation via reflection", e);
215+
}
176216
}
177217

178218
public void testAfterRefreshCreatesReader() throws IOException {
@@ -195,21 +235,24 @@ public void testAfterRefreshNoOpWhenDidRefreshFalse() throws IOException {
195235
public void testMultipleRefreshesWithIndexing() throws IOException {
196236
LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader());
197237

238+
// Empty initial reader — no segments yet.
198239
CatalogSnapshot snap1 = stubSnapshot(1);
199240
rm.afterRefresh(true, snap1);
200241
DirectoryReader reader1 = rm.getReader(snap1);
201242
assertEquals(0, new IndexSearcher(reader1).count(new MatchAllDocsQuery()));
202243

203-
addDoc("doc1");
204-
CatalogSnapshot snap2 = stubSnapshot(2);
244+
// Add doc1 in generation 10, refresh. Reader now has one leaf stamped with gen=10.
245+
addDoc("doc1", 10L);
246+
CatalogSnapshot snap2 = stubSnapshot(2, List.of(10L));
205247
rm.afterRefresh(true, snap2);
206248
DirectoryReader reader2 = rm.getReader(snap2);
207249
assertEquals(1, new IndexSearcher(reader2).count(new MatchAllDocsQuery()));
208250

209251
assertEquals(0, new IndexSearcher(reader1).count(new MatchAllDocsQuery()));
210252

211-
addDoc("doc2");
212-
CatalogSnapshot snap3 = stubSnapshot(3);
253+
// Add doc2 in generation 20. Reader now has two leaves stamped with gens {10, 20}.
254+
addDoc("doc2", 20L);
255+
CatalogSnapshot snap3 = stubSnapshot(3, List.of(10L, 20L));
213256
rm.afterRefresh(true, snap3);
214257
DirectoryReader reader3 = rm.getReader(snap3);
215258
assertEquals(2, new IndexSearcher(reader3).count(new MatchAllDocsQuery()));
@@ -286,7 +329,7 @@ public void testCreateReaderManagerWithLuceneIndexingEngine() throws IOException
286329
)
287330
.retentionLeasesSupplier(() -> new RetentionLeases(0, 0, java.util.Collections.emptyList()))
288331
.build();
289-
CommitterConfig cs = new CommitterConfig(engineConfig);
332+
CommitterConfig cs = new CommitterConfig(engineConfig, () -> {});
290333
LuceneCommitter committer = new LuceneCommitter(cs);
291334

292335
try {

0 commit comments

Comments
 (0)