Skip to content

Commit 10e3972

Browse files
author
Kamal Nayan
committed
Fixing test failures
Signed-off-by: Kamal Nayan <askkamal@amazon.com>
1 parent c2d0a6c commit 10e3972

6 files changed

Lines changed: 59 additions & 17 deletions

File tree

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class LuceneReaderManager implements EngineReaderManager<LuceneReader> {
5353

5454
private final DataFormat dataFormat;
5555
private final Map<Long, LuceneReader> readers;
56+
private volatile DirectoryReader initialReader;
5657
private volatile DirectoryReader currentReader;
5758
private final CheckedBiFunction<DirectoryReader, SegmentInfos, DirectoryReader, IOException> readerRefresher;
5859

@@ -74,6 +75,7 @@ public LuceneReaderManager(
7475
) {
7576
this.dataFormat = dataFormat;
7677
Objects.requireNonNull(initialReader, "initialReader must not be null");
78+
this.initialReader = initialReader;
7779
this.currentReader = initialReader;
7880
this.readers = readers;
7981
this.readerRefresher = readerRefresher;
@@ -195,6 +197,7 @@ public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException {
195197
if (reader != null) {
196198
reader.directoryReader().decRef();
197199
}
200+
releaseInitialReader();
198201
}
199202

200203
@Override
@@ -213,5 +216,24 @@ public void close() throws IOException {
213216
reader.directoryReader().decRef();
214217
}
215218
readers.clear();
219+
releaseInitialReader();
220+
}
221+
222+
/**
223+
* Releases the initial {@link DirectoryReader} opened by
224+
* {@link LuceneSearchBackEnd#createReaderManager} and nulls out the field. The check
225+
* makes the operation idempotent so it is safe to invoke from both {@link #close()}
226+
* and {@link #onDeleted(CatalogSnapshot)} without double-freeing.
227+
* <p>
228+
* The initial reader is owned by the manager (not the {@link #readers} map), so it is
229+
* not covered by the per-snapshot decRef loop in {@code close()}. Without this release,
230+
* the {@link java.nio.channels.FileChannel}s held by its {@link SegmentReader}s would
231+
* leak until JVM exit (detected by Lucene's {@code LeakFS} at suite teardown).
232+
*/
233+
private void releaseInitialReader() throws IOException {
234+
if (initialReader != null) {
235+
initialReader.decRef();
236+
initialReader = null;
237+
}
216238
}
217239
}

sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -512,15 +512,16 @@ public void testMultipleSnapshotsShareReaderWhenRefresherReturnsNull() throws IO
512512
// RefCount should be initial + 3 (one incRef per afterRefresh)
513513
assertEquals(initialRefCount + 3, initialReader.getRefCount());
514514

515-
// Deleting each snapshot should decRef once
515+
// First onDeleted: -1 for map entry, -1 for releaseInitialReader (idempotent thereafter).
516516
rm.onDeleted(snap1);
517-
assertEquals(initialRefCount + 2, initialReader.getRefCount());
517+
assertEquals(initialRefCount + 1, initialReader.getRefCount());
518518

519+
// Subsequent onDeleted: -1 for map entry only.
519520
rm.onDeleted(snap2);
520-
assertEquals(initialRefCount + 1, initialReader.getRefCount());
521+
assertEquals(initialRefCount, initialReader.getRefCount());
521522

522523
rm.onDeleted(snap3);
523-
assertEquals(initialRefCount, initialReader.getRefCount());
524+
assertEquals(initialRefCount - 1, initialReader.getRefCount());
524525
}
525526

526527
public void testCloseDecRefsAllAccumulatedReaders() throws IOException {
@@ -540,9 +541,9 @@ public void testCloseDecRefsAllAccumulatedReaders() throws IOException {
540541
rm.afterRefresh(true, stubSnapshot(3));
541542
assertEquals(initialRefCount + 3, initialReader.getRefCount());
542543

543-
// close() should decRef all 3
544+
// close() decRefs the 3 map entries and the initial open(writer) reference: refCount → 0.
544545
rm.close();
545-
assertEquals(initialRefCount, initialReader.getRefCount());
546+
assertEquals(initialRefCount - 1, initialReader.getRefCount());
546547
}
547548

548549
public void testMixedRefreshSomeNullSomeNew() throws IOException {

server/src/main/java/org/opensearch/index/engine/Engine.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,24 @@ public GatedCloseable<CatalogSnapshot> acquireSafeCatalogSnapshot() throws Engin
12921292
}
12931293
}
12941294

1295+
/**
1296+
* Acquires a {@link CatalogSnapshot} of the engine's current in-memory state — i.e. the
1297+
* latest visible-to-readers segments, including any uncommitted operations that have been
1298+
* refreshed. Used by replication-checkpoint listeners and any caller that wants the live
1299+
* search-visible state rather than the on-disk safe commit.
1300+
*
1301+
* <p>The default implementation bridges via {@link #getSegmentInfosSnapshot()} and wraps the
1302+
* resulting {@link SegmentInfos} as a {@link SegmentInfosCatalogSnapshot}. Engines whose
1303+
* native state isn't a single {@link SegmentInfos} (e.g. multi-format engines that aggregate
1304+
* Lucene + other formats) should override this to return their native catalog snapshot.
1305+
*/
1306+
@ExperimentalApi
1307+
public GatedCloseable<CatalogSnapshot> acquireSnapshot() {
1308+
final GatedCloseable<SegmentInfos> segmentInfosRef = getSegmentInfosSnapshot();
1309+
final CatalogSnapshot snapshot = new SegmentInfosCatalogSnapshot(segmentInfosRef.get());
1310+
return new GatedCloseable<>(snapshot, segmentInfosRef::close);
1311+
}
1312+
12951313
/**
12961314
* Acquires a {@link CatalogSnapshot} pinned to the most recent commit on disk,
12971315
* regardless of retention policy. Default wraps {@link #acquireLastIndexCommit(boolean)}.

server/src/main/java/org/opensearch/index/engine/EngineBackedIndexer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.index.engine;
1010

1111
import org.apache.lucene.index.IndexCommit;
12-
import org.apache.lucene.index.SegmentInfos;
1312
import org.apache.lucene.store.ByteBuffersDataOutput;
1413
import org.apache.lucene.store.ByteBuffersIndexOutput;
1514
import org.opensearch.common.annotation.ExperimentalApi;
@@ -418,15 +417,16 @@ public void finalizeReplication(CatalogSnapshot catalogSnapshot) throws IOExcept
418417
}
419418

420419
/**
421-
* Returns a snapshot of the catalog of segments in this engine. This snapshot is
422-
* guaranteed to be consistent and can be used for recovery purposes.
420+
* Returns a snapshot of the catalog of segments in this engine. Delegates to
421+
* {@link Engine#acquireSnapshot()} so subclasses (e.g. the read-only wrapper used during
422+
* engine reset) can route to a different snapshot source without going through the
423+
* {@link Engine#getSegmentInfosSnapshot()} bridge — which is required when the underlying
424+
* source is a non-Lucene indexer (e.g. {@link DataFormatAwareEngine}).
423425
*/
424426
@ExperimentalApi
425427
@Override
426428
public GatedCloseable<CatalogSnapshot> acquireSnapshot() {
427-
GatedCloseable<SegmentInfos> segmentInfosRef = engine.getSegmentInfosSnapshot();
428-
SegmentInfosCatalogSnapshot snapshot = new SegmentInfosCatalogSnapshot(segmentInfosRef.get());
429-
return new GatedCloseable<>(snapshot, segmentInfosRef::close);
429+
return engine.acquireSnapshot();
430430
}
431431

432432
@Override

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5581,11 +5581,12 @@ public GatedCloseable<CatalogSnapshot> acquireLastCommittedSnapshot(boolean flus
55815581
}
55825582

55835583
@Override
5584-
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
5585-
if (newEngineReference.get() == null) {
5584+
public GatedCloseable<CatalogSnapshot> acquireSnapshot() {
5585+
final Indexer ref = newEngineReference.get();
5586+
if (ref == null) {
55865587
throw new AlreadyClosedException("engine was closed");
55875588
}
5588-
return applyOnEngine(newEngineReference.get(), Engine::getSegmentInfosSnapshot);
5589+
return ref.acquireSnapshot();
55895590
}
55905591

55915592
@Override

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4861,7 +4861,7 @@ public void onBeginTranslogRecovery() {
48614861
* {@code newEngineReference} is still null -- the window between ReadOnlyEngine installation
48624862
* and {@code newEngineReference.set(newEngine)} inside {@code resetEngineToGlobalCheckpoint}.
48634863
* Covers the defensive null-check branches in {@code acquireLastIndexCommit},
4864-
* {@code acquireSafeIndexCommit}, and {@code getSegmentInfosSnapshot}.
4864+
* {@code acquireSafeIndexCommit}, and {@code acquireSnapshot}.
48654865
*/
48664866
public void testDelegateThrowsAlreadyClosedBeforeNewEngineSet() throws Exception {
48674867
CountDownLatch creatingEngineLatch = new CountDownLatch(1);
@@ -4906,7 +4906,7 @@ public void testDelegateThrowsAlreadyClosedBeforeNewEngineSet() throws Exception
49064906
// The ReadOnlyEngine is now the current engine, but newEngineReference is still null.
49074907
expectThrows(AlreadyClosedException.class, () -> shard.acquireLastIndexCommit(false));
49084908
expectThrows(AlreadyClosedException.class, shard::acquireSafeIndexCommit);
4909-
expectThrows(AlreadyClosedException.class, shard::getSegmentInfosSnapshot);
4909+
expectThrows(AlreadyClosedException.class, shard::getCatalogSnapshot);
49104910

49114911
proceedWithCreationLatch.countDown();
49124912
assertTrue("engine reset should complete", engineResetLatch.await(30, TimeUnit.SECONDS));

0 commit comments

Comments
 (0)