Skip to content

Commit e1f3c5b

Browse files
authored
Merge branch 'main' into unified-allocator-followup
2 parents 53c0e36 + debf6ed commit e1f3c5b

126 files changed

Lines changed: 9047 additions & 644 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

modules/store-subdirectory/src/internalClusterTest/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareRecoveryTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.opensearch.index.engine.EngineException;
3131
import org.opensearch.index.engine.EngineFactory;
3232
import org.opensearch.index.engine.InternalEngine;
33+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
34+
import org.opensearch.index.engine.exec.coord.SegmentInfosCatalogSnapshot;
3335
import org.opensearch.index.shard.IndexShard;
3436
import org.opensearch.indices.IndicesService;
3537
import org.opensearch.plugins.EnginePlugin;
@@ -306,6 +308,42 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
306308
throw new EngineException(shardId, "Failed to acquire safe index commit", e);
307309
}
308310
}
311+
312+
@Override
313+
public GatedCloseable<CatalogSnapshot> acquireSafeCatalogSnapshot() throws EngineException {
314+
final GatedCloseable<CatalogSnapshot> base = super.acquireSafeCatalogSnapshot();
315+
try {
316+
// Collect subdirectory file paths (prefixed with subdirectory name).
317+
final SegmentInfos subInfos = SegmentInfos.readLatestCommit(subdirectoryDirectory);
318+
final Set<String> extra = new HashSet<>();
319+
for (String f : subInfos.files(true)) {
320+
extra.add(Path.of(SUBDIRECTORY_NAME, f).toString());
321+
}
322+
for (String f : subdirectoryDirectory.listAll()) {
323+
if (f.startsWith(NON_SEGMENT_FILE_PREFIX)) {
324+
extra.add(Path.of(SUBDIRECTORY_NAME, f).toString());
325+
}
326+
}
327+
// Reuse the SegmentInfos from the base snapshot — no redundant disk read.
328+
SegmentInfos mainInfos = ((SegmentInfosCatalogSnapshot) base.get()).getSegmentInfos();
329+
CatalogSnapshot wrapped = new SegmentInfosCatalogSnapshot(mainInfos) {
330+
@Override
331+
public Collection<String> getFiles(boolean includeSegmentsFile) throws IOException {
332+
Set<String> files = new HashSet<>(super.getFiles(includeSegmentsFile));
333+
files.addAll(extra);
334+
return files;
335+
}
336+
};
337+
return new GatedCloseable<>(wrapped, base::close);
338+
} catch (Exception e) {
339+
try {
340+
base.close();
341+
} catch (IOException ex) {
342+
e.addSuppressed(ex);
343+
}
344+
throw new EngineException(shardId, "Failed to acquire safe catalog snapshot", e);
345+
}
346+
}
309347
}
310348

311349
/**

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ protected void doStop() {
9494
@Override
9595
protected void doClose() throws IOException {
9696
releaseRuntime();
97+
NativeBridge.shutdownTokioRuntimeManager();
9798
}
9899

99100
/**

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReaderManager.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DatafusionReaderManager implements EngineReaderManager<DatafusionRe
3737

3838
private static final Logger logger = LogManager.getLogger(DatafusionReaderManager.class);
3939

40-
private final Map<CatalogSnapshot, DatafusionReader> readers = new HashMap<>();
40+
private final Map<Long, DatafusionReader> readers = new HashMap<>();
4141
private final DataFormat dataFormat;
4242
private final String directoryPath;
4343
private final DataFusionService dataFusionService;
@@ -66,15 +66,19 @@ public DatafusionReaderManager(
6666

6767
@Override
6868
public DatafusionReader getReader(CatalogSnapshot catalogSnapshot) throws IOException {
69-
if (readers.containsKey(catalogSnapshot)) {
70-
return readers.get(catalogSnapshot);
69+
if (catalogSnapshot == null) {
70+
throw new IllegalArgumentException("catalogSnapshot must not be null");
7171
}
72-
throw new IOException("No DataFusion reader available");
72+
DatafusionReader reader = readers.get(catalogSnapshot.getId());
73+
if (reader == null) {
74+
throw new IOException("No DataFusion reader available for catalog snapshot [version=" + catalogSnapshot.getId() + "]");
75+
}
76+
return reader;
7377
}
7478

7579
@Override
7680
public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException {
77-
DatafusionReader removed = readers.remove(catalogSnapshot);
81+
DatafusionReader removed = readers.remove(catalogSnapshot.getId());
7882
if (removed != null) {
7983
removed.close();
8084
}
@@ -98,13 +102,13 @@ public void beforeRefresh() throws IOException {}
98102
@Override
99103
public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException {
100104
if (didRefresh == false) return;
101-
if (readers.containsKey(catalogSnapshot)) return;
105+
if (readers.containsKey(catalogSnapshot.getId())) return;
102106
DatafusionReader reader = new DatafusionReader(
103107
directoryPath,
104108
catalogSnapshot.getSearchableFiles(dataFormat.name()),
105109
dataformatAwareStoreHandle
106110
);
107-
readers.put(catalogSnapshot, reader);
111+
readers.put(catalogSnapshot.getId(), reader);
108112
}
109113

110114
private Collection<String> toAbsolutePaths(Collection<String> fileNames) {

sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DatafusionReaderManagerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void testGetReaderWithNoRefreshThrows() throws IOException {
219219
DataFusionService mockService = mock(DataFusionService.class);
220220

221221
DatafusionReaderManager manager = new DatafusionReaderManager(TEST_FORMAT, shardPath, mockService, null);
222-
expectThrows(IOException.class, () -> manager.getReader(null));
222+
expectThrows(IllegalArgumentException.class, () -> manager.getReader(null));
223223
manager.close();
224224
}
225225
}

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LucenePlugin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,26 @@
1515
import org.opensearch.common.annotation.ExperimentalApi;
1616
import org.opensearch.index.IndexSettings;
1717
import org.opensearch.index.engine.dataformat.DataFormat;
18+
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
1819
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
20+
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
1921
import org.opensearch.index.engine.dataformat.DeleteExecutionEngine;
2022
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
2123
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
2224
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
2325
import org.opensearch.index.engine.exec.EngineReaderManager;
2426
import org.opensearch.index.engine.exec.commit.Committer;
2527
import org.opensearch.index.engine.exec.commit.CommitterFactory;
28+
import org.opensearch.index.store.checksum.LuceneChecksumHandler;
2629
import org.opensearch.plugins.EnginePlugin;
2730
import org.opensearch.plugins.Plugin;
2831
import org.opensearch.plugins.SearchBackEndPlugin;
2932

3033
import java.io.IOException;
3134
import java.util.List;
35+
import java.util.Map;
3236
import java.util.Optional;
37+
import java.util.function.Supplier;
3338

3439
/**
3540
* Plugin providing Lucene as a data format, search back-end, and committer
@@ -86,6 +91,14 @@ public DataFormat getDataFormat() {
8691
);
8792
}
8893

94+
@Override
95+
public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(
96+
IndexSettings indexSettings,
97+
DataFormatRegistry dataFormatRegistry
98+
) {
99+
return Map.of(DATA_FORMAT.name(), () -> new DataFormatDescriptor(DATA_FORMAT.name(), new LuceneChecksumHandler()));
100+
}
101+
89102
// --- SearchBackEndPlugin ---
90103

91104
/** {@inheritDoc} Returns {@code "lucene"}. */

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
import org.apache.lucene.index.DirectoryReader;
1212
import org.apache.lucene.index.LeafReaderContext;
1313
import org.apache.lucene.index.SegmentCommitInfo;
14+
import org.apache.lucene.index.SegmentInfos;
1415
import org.apache.lucene.index.SegmentReader;
16+
import org.opensearch.be.lucene.index.LuceneReplicaCommitter;
17+
import org.opensearch.common.CheckedBiFunction;
18+
import org.opensearch.common.SuppressForbidden;
1519
import org.opensearch.common.annotation.ExperimentalApi;
1620
import org.opensearch.index.engine.dataformat.DataFormat;
1721
import org.opensearch.index.engine.exec.EngineReaderManager;
@@ -44,30 +48,42 @@
4448
* @opensearch.experimental
4549
*/
4650
@ExperimentalApi
51+
@SuppressForbidden(reason = "reference counting is required here")
4752
public class LuceneReaderManager implements EngineReaderManager<LuceneReader> {
4853

4954
private final DataFormat dataFormat;
50-
private final Map<CatalogSnapshot, LuceneReader> readers = new HashMap<>();
55+
private final Map<Long, LuceneReader> readers;
5156
private volatile DirectoryReader currentReader;
57+
private final CheckedBiFunction<DirectoryReader, SegmentInfos, DirectoryReader, IOException> readerRefresher;
5258

5359
/**
5460
* Creates a new LuceneReaderManager.
5561
*
56-
* @param dataFormat the data format this reader manager serves
57-
* @param initialReader the initial DirectoryReader, must not be null
62+
* @param dataFormat the data format this reader manager serves
63+
* @param initialReader the initial DirectoryReader, must not be null
64+
* @param readers shared map of generation to DirectoryReader for segment-level reader reuse
65+
* @param readerRefresher function that opens a refreshed reader given the current reader and new
66+
* {@link SegmentInfos}; returns {@code null} if no refresh is needed
5867
* @throws NullPointerException if initialReader is null
5968
*/
60-
public LuceneReaderManager(DataFormat dataFormat, DirectoryReader initialReader) {
69+
public LuceneReaderManager(
70+
DataFormat dataFormat,
71+
DirectoryReader initialReader,
72+
Map<Long, LuceneReader> readers,
73+
CheckedBiFunction<DirectoryReader, SegmentInfos, DirectoryReader, IOException> readerRefresher
74+
) {
6175
this.dataFormat = dataFormat;
6276
Objects.requireNonNull(initialReader, "initialReader must not be null");
6377
this.currentReader = initialReader;
78+
this.readers = readers;
79+
this.readerRefresher = readerRefresher;
6480
}
6581

6682
@Override
6783
public LuceneReader getReader(CatalogSnapshot catalogSnapshot) throws IOException {
68-
LuceneReader reader = readers.get(catalogSnapshot);
84+
LuceneReader reader = readers.get(catalogSnapshot.getId());
6985
if (reader == null) {
70-
throw new IllegalStateException("No reader available for catalog snapshot [gen=" + catalogSnapshot.getGeneration() + "]");
86+
throw new IllegalStateException("No reader available for catalog snapshot [version=" + catalogSnapshot.getId() + "]");
7187
}
7288
return reader;
7389
}
@@ -79,17 +95,25 @@ public void beforeRefresh() throws IOException {
7995

8096
@Override
8197
public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException {
82-
if (didRefresh == false || readers.containsKey(catalogSnapshot)) {
98+
if (didRefresh == false || readers.containsKey(catalogSnapshot.getId())) {
8399
return;
84100
}
85-
DirectoryReader refreshed = DirectoryReader.openIfChanged(currentReader);
101+
DirectoryReader refreshed = readerRefresher.apply(currentReader, LuceneReplicaCommitter.getSegmentInfos(catalogSnapshot));
86102
if (refreshed != null) {
87-
assert readersAreSame(catalogSnapshot, refreshed);
103+
// Guard against refresh/merge-apply races: a prior IT regression surfaced when
104+
// overlapping threads produced a refreshed reader whose leaves disagreed with the
105+
// catalog snapshot being registered, effectively pairing the snapshot with a stale
106+
// reader. This assert catches that drift in test builds before the mismatched pair
107+
// is published to readers.
88108
currentReader = refreshed;
109+
} else {
110+
// If same reader is used, assert that calalog snapshot is same.
111+
currentReader.incRef();
89112
}
113+
assert readersAreSame(catalogSnapshot, currentReader);
90114

91115
Map<Long, String> generationToSegmentName = buildGenerationToSegmentName(catalogSnapshot, currentReader.leaves());
92-
readers.put(catalogSnapshot, new LuceneReader(currentReader, generationToSegmentName));
116+
readers.put(catalogSnapshot.getId(), new LuceneReader(currentReader, generationToSegmentName));
93117
}
94118

95119
private static Map<Long, String> buildGenerationToSegmentName(CatalogSnapshot catalogSnapshot, List<LeafReaderContext> leaves) {
@@ -135,12 +159,12 @@ private static Map<Long, String> buildGenerationToSegmentName(CatalogSnapshot ca
135159
* with the wrong catalog snapshot.
136160
*
137161
* @param catalogSnapshot catalog snapshot whose referenced generations are the expected set
138-
* @param readers DirectoryReader whose leaves' generations are the actual set
162+
* @param reader DirectoryReader whose leaves' generations are the actual set
139163
* @return {@code true} iff both lists contain the same generations in the same (sorted) order
140164
*/
141-
private boolean readersAreSame(CatalogSnapshot catalogSnapshot, DirectoryReader readers) {
165+
private boolean readersAreSame(CatalogSnapshot catalogSnapshot, DirectoryReader reader) {
142166
Collection<Long> generationsReferenced = catalogSnapshot.getSegments().stream().map(Segment::generation).sorted().toList();
143-
return generationsReferenced.equals(collectReferencedGenerations(readers));
167+
return generationsReferenced.equals(collectReferencedGenerations(reader));
144168
}
145169

146170
/**
@@ -167,9 +191,9 @@ private Collection<Long> collectReferencedGenerations(DirectoryReader reader) {
167191

168192
@Override
169193
public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException {
170-
LuceneReader reader = readers.remove(catalogSnapshot);
194+
LuceneReader reader = readers.remove(catalogSnapshot.getId());
171195
if (reader != null) {
172-
reader.directoryReader().close();
196+
reader.directoryReader().decRef();
173197
}
174198
}
175199

@@ -186,7 +210,7 @@ public void onFilesAdded(Collection<String> files) throws IOException {
186210
@Override
187211
public void close() throws IOException {
188212
for (LuceneReader reader : readers.values()) {
189-
reader.directoryReader().close();
213+
reader.directoryReader().decRef();
190214
}
191215
readers.clear();
192216
}

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchBackEnd.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,22 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.lucene.index.DirectoryReader;
14+
import org.apache.lucene.index.LeafReader;
15+
import org.apache.lucene.index.LeafReaderContext;
16+
import org.apache.lucene.index.SegmentInfos;
1417
import org.apache.lucene.index.StandardDirectoryReader;
1518
import org.opensearch.be.lucene.index.LuceneIndexingExecutionEngine;
19+
import org.opensearch.common.CheckedBiFunction;
1620
import org.opensearch.common.annotation.ExperimentalApi;
1721
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
1822
import org.opensearch.index.engine.exec.EngineReaderManager;
1923
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
2024

2125
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
2230

2331
/**
2432
* Static helpers for creating Lucene-based {@link EngineReaderManager} instances.
@@ -51,12 +59,30 @@ static EngineReaderManager<LuceneReader> createReaderManager(ReaderManagerConfig
5159
IndexStoreProvider provider = settings.indexStoreProvider()
5260
.orElseThrow(() -> new IllegalStateException("IndexStoreProvider is required to create LuceneReaderManager"));
5361
DirectoryReader directoryReader;
62+
Map<Long, LuceneReader> readers = new ConcurrentHashMap<>();
63+
CheckedBiFunction<DirectoryReader, SegmentInfos, DirectoryReader, IOException> readerRefresher = null;
5464
if (provider.getStore(settings.format()) instanceof LuceneIndexingExecutionEngine.LuceneFormatStore luceneProvider) {
5565
directoryReader = DirectoryReader.open(luceneProvider.writer());
66+
readers = luceneProvider.readers();
67+
readerRefresher = (dr, sis) -> DirectoryReader.openIfChanged(dr);
5668
} else {
5769
logger.warn("Initialising it with a DirectorReader instead of a writer");
5870
directoryReader = StandardDirectoryReader.open(provider.getStore(settings.format()).store().directory());
71+
readerRefresher = LuceneSearchBackEnd::buildReader;
5972
}
60-
return new LuceneReaderManager(settings.format(), directoryReader);
73+
return new LuceneReaderManager(settings.format(), directoryReader, readers, readerRefresher);
74+
}
75+
76+
private static DirectoryReader buildReader(DirectoryReader oldReader, SegmentInfos newSis) throws IOException {
77+
if (newSis == null || ((StandardDirectoryReader) oldReader).getSegmentInfos().version == newSis.version) {
78+
return null;
79+
}
80+
final List<LeafReader> subs = new ArrayList<>();
81+
for (LeafReaderContext ctx : oldReader.leaves()) {
82+
subs.add(ctx.reader());
83+
}
84+
// Segment_n here is ignored because it is either already committed on disk as part of previous commit point or
85+
// does not yet exist on store (not yet committed)
86+
return StandardDirectoryReader.open(oldReader.directory(), newSis, subs, null, null);
6187
}
6288
}

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneCommitDeletionPolicy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
4949
// which will delete it once a CS commit exists.
5050
for (IndexCommit commit : commits) {
5151
if (commit.getUserData().get(CatalogSnapshot.CATALOG_SNAPSHOT_ID) == null) {
52+
assert nonCatalogSnapshotCommit == null;
5253
nonCatalogSnapshotCommit = commit;
5354
}
5455
}
@@ -74,6 +75,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
7475
// since it is no longer needed for recovery.
7576
if (hasCSCommit && nonCatalogSnapshotCommit != null) {
7677
nonCatalogSnapshotCommit.delete();
78+
pendingDeletes.remove(0L);
7779
nonCatalogSnapshotCommit = null;
7880
}
7981
}
@@ -86,7 +88,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
8688
* @param snapshotId the CatalogSnapshot ID to purge
8789
*/
8890
void purgeCommit(long snapshotId) {
89-
assert trackedCommits.containsKey(snapshotId);
91+
assert (snapshotId == 0L) || trackedCommits.containsKey(snapshotId);
9092
pendingDeletes.add(snapshotId);
9193
}
9294
}

0 commit comments

Comments
 (0)