Skip to content

Commit 4b0dc3d

Browse files
alchemist51mgodwan
andauthored
Add catalogSnapshotManager for dataformat aware engine (#20982)
* Initial commit for CatalogSnapshot Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> * Address comments Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> * Address comment part2 Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> --------- Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> Co-authored-by: Mohit Godwani <81609427+mgodwan@users.noreply.github.com>
1 parent bd33edf commit 4b0dc3d

31 files changed

Lines changed: 1812 additions & 114 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,4 @@ testfixtures_shared/
7070
# build files generated
7171
doc-tools/missing-doclet/bin/
7272
/sandbox/plugins/engine-datafusion/target/
73+
**/Cargo.lock

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Add support for enabling pluggable data formats, starting with phase-1 of decoupling shard from engine, and introducing basic abstractions ([#20675](https://github.com/opensearch-project/OpenSearch/pull/20675))
2828
- Add concurrent queue in libs and composite engine sandbox plugin ([#20909](https://github.com/opensearch-project/OpenSearch/pull/20909))
2929
- Add interface for the Multi format merge flow ([#20908](https://github.com/opensearch-project/OpenSearch/pull/20908))
30+
- Add CatalogSnapshotManager lifecycle management with reference-counted snapshot tracking and serialization support for Segment and WriterFileSet ([#20982](https://github.com/opensearch-project/OpenSearch/pull/20982))
3031

3132
- Add warmup phase to wait for lag to catch up in pull-based ingestion before serving ([#20526](https://github.com/opensearch-project/OpenSearch/pull/20526))
3233
- Add a new static method to IndicesOptions API to expose `STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED` index option ([#20980](https://github.com/opensearch-project/OpenSearch/pull/20980))

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReaderManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
1212
import org.opensearch.index.engine.dataformat.DataFormat;
13-
import org.opensearch.index.engine.exec.CatalogSnapshot;
1413
import org.opensearch.index.engine.exec.EngineReaderManager;
14+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1515
import org.opensearch.index.shard.ShardPath;
1616

1717
import java.io.IOException;

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import org.apache.lucene.search.ReferenceManager;
1313
import org.opensearch.common.annotation.ExperimentalApi;
1414
import org.opensearch.index.engine.dataformat.DataFormat;
15-
import org.opensearch.index.engine.exec.CatalogSnapshot;
1615
import org.opensearch.index.engine.exec.EngineReaderManager;
16+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1717

1818
import java.io.IOException;
1919
import java.util.Collection;

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,17 @@
3030
import org.opensearch.cluster.metadata.IndexMetadata;
3131
import org.opensearch.cluster.metadata.Metadata;
3232
import org.opensearch.cluster.service.ClusterService;
33+
import org.opensearch.common.concurrent.GatedCloseable;
3334
import org.opensearch.core.index.Index;
3435
import org.opensearch.index.IndexService;
3536
import org.opensearch.index.engine.DataFormatAwareEngine;
3637
import org.opensearch.index.engine.dataformat.DataFormat;
3738
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
38-
import org.opensearch.index.engine.exec.CatalogSnapshot;
3939
import org.opensearch.index.engine.exec.EngineReaderManager;
4040
import org.opensearch.index.engine.exec.Segment;
4141
import org.opensearch.index.engine.exec.WriterFileSet;
42+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
43+
import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager;
4244
import org.opensearch.index.shard.IndexShard;
4345
import org.opensearch.indices.IndicesService;
4446
import org.opensearch.test.OpenSearchTestCase;
@@ -104,13 +106,15 @@ public void testEndToEndExecuteWithMockBackend() throws IOException {
104106

105107
Segment seg1 = Segment.builder(0L).addSearchableFiles(format, fs1).build();
106108
Segment seg2 = Segment.builder(1L).addSearchableFiles(format, fs2).build();
107-
MockCatalogSnapshot snapshot = new MockCatalogSnapshot(1L, List.of(seg1, seg2), format);
109+
110+
CatalogSnapshotManager snapshotManager = new CatalogSnapshotManager(1L, 1L, 0L, List.of(seg1, seg2), 2L, Map.of());
108111

109112
MockReaderManager readerManager = new MockReaderManager(format.name());
110-
readerManager.afterRefresh(true, snapshot);
113+
try (GatedCloseable<CatalogSnapshot> ref = snapshotManager.acquireSnapshot()) {
114+
readerManager.afterRefresh(true, ref.get());
115+
}
111116

112-
DataFormatAwareEngine engine = new DataFormatAwareEngine(Map.of(format, readerManager));
113-
engine.setLatestSnapshot(snapshot);
117+
DataFormatAwareEngine engine = new DataFormatAwareEngine(Map.of(format, readerManager), snapshotManager);
114118

115119
// Mock shard + cluster wiring
116120
IndexShard shard = mock(IndexShard.class);
@@ -288,16 +292,18 @@ public String serializeToString() {
288292
}
289293

290294
@Override
291-
public void setCatalogSnapshotMap(Map<Long, ? extends CatalogSnapshot> map) {}
292-
293-
@Override
294-
public void setUserData(Map<String, String> userData, boolean b) {}
295+
public void setUserData(Map<String, String> userData) {}
295296

296297
@Override
297298
public Object getReader(DataFormat dataFormat) {
298299
return null;
299300
}
300301

302+
@Override
303+
public MockCatalogSnapshot clone() {
304+
return new MockCatalogSnapshot(generation, segments, format);
305+
}
306+
301307
@Override
302308
protected void closeInternal() {}
303309
}

server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111
import org.opensearch.common.annotation.ExperimentalApi;
1212
import org.opensearch.common.concurrent.GatedCloseable;
1313
import org.opensearch.index.engine.dataformat.DataFormat;
14-
import org.opensearch.index.engine.exec.CatalogSnapshot;
1514
import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory;
1615
import org.opensearch.index.engine.exec.EngineReaderManager;
1716
import org.opensearch.index.engine.exec.IndexReaderProvider;
17+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
18+
import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager;
1819

1920
import java.io.Closeable;
2021
import java.io.IOException;
@@ -35,62 +36,56 @@
3536
public class DataFormatAwareEngine implements IndexReaderProvider, Closeable {
3637

3738
private final Map<DataFormat, EngineReaderManager<?>> readerManagers;
38-
private volatile CatalogSnapshot latestSnapshot;
39+
private volatile CatalogSnapshotManager catalogSnapshotManager;
3940

4041
/**
41-
* Constructs a new DataFormatAwareEngine with pre-built maps.
42+
* Constructs a new DataFormatAwareEngine.
4243
* Prefer using {@link DataFormatAwareEngineFactory#create()}.
4344
*/
45+
public DataFormatAwareEngine(Map<DataFormat, EngineReaderManager<?>> readerManagers, CatalogSnapshotManager catalogSnapshotManager) {
46+
this.readerManagers = readerManagers;
47+
this.catalogSnapshotManager = catalogSnapshotManager;
48+
}
49+
50+
/**
51+
* Constructs a new DataFormatAwareEngine without a snapshot manager.
52+
* The manager must be set via {@link #setCatalogSnapshotManager} before acquiring readers.
53+
*/
4454
public DataFormatAwareEngine(Map<DataFormat, EngineReaderManager<?>> readerManagers) {
4555
this.readerManagers = readerManagers;
4656
}
4757

48-
public EngineReaderManager<?> getReaderManager(DataFormat format) {
49-
return readerManagers.get(format);
58+
public void setCatalogSnapshotManager(CatalogSnapshotManager catalogSnapshotManager) {
59+
this.catalogSnapshotManager = catalogSnapshotManager;
5060
}
5161

52-
/**
53-
* Called by the catalog snapshot lifecycle listener after a refresh
54-
* to update the latest searchable snapshot.
55-
*/
56-
public void setLatestSnapshot(CatalogSnapshot snapshot) {
57-
CatalogSnapshot prev = this.latestSnapshot;
58-
this.latestSnapshot = snapshot;
59-
if (prev != null) {
60-
prev.decRef();
61-
}
62+
public EngineReaderManager<?> getReaderManager(DataFormat format) {
63+
return readerManagers.get(format);
6264
}
6365

6466
/**
6567
* Acquires a DataFormatAwareReader on the latest catalog snapshot.
66-
* The snapshot is incRef'd; the caller MUST close the returned
67-
* {@link DataFormatAwareReader} when done, which decRef's the snapshot.
68+
* The caller MUST close the returned {@link DataFormatAwareReader} when done,
69+
* which releases the snapshot reference.
6870
*/
6971
public GatedCloseable<Reader> acquireReader() throws IOException {
70-
CatalogSnapshot snapshot = latestSnapshot;
71-
if (snapshot == null) {
72-
throw new IllegalStateException("No catalog snapshot available");
72+
if (catalogSnapshotManager == null) {
73+
throw new IllegalStateException("CatalogSnapshotManager not set");
7374
}
74-
return acquireReader(snapshot);
75-
}
76-
77-
/**
78-
* Acquires a dataFormatAwareReader on a specific catalog snapshot.
79-
*/
80-
private GatedCloseable<Reader> acquireReader(CatalogSnapshot catalogSnapshot) throws IOException {
81-
catalogSnapshot.incRef();
75+
GatedCloseable<CatalogSnapshot> snapshotRef = catalogSnapshotManager.acquireSnapshot();
8276
try {
77+
CatalogSnapshot catalogSnapshot = snapshotRef.get();
8378
Map<DataFormat, Object> readers = new HashMap<>();
8479
for (Map.Entry<DataFormat, EngineReaderManager<?>> entry : readerManagers.entrySet()) {
8580
Object reader = entry.getValue().getReader(catalogSnapshot);
8681
if (reader != null) {
8782
readers.put(entry.getKey(), reader);
8883
}
8984
}
90-
DataFormatAwareReader reader = new DataFormatAwareReader(catalogSnapshot, readers);
85+
DataFormatAwareReader reader = new DataFormatAwareReader(catalogSnapshot, snapshotRef, readers);
9186
return new GatedCloseable<>(reader, reader::close);
9287
} catch (Exception e) {
93-
catalogSnapshot.decRef();
88+
snapshotRef.close();
9489
throw e;
9590
}
9691
}
@@ -102,10 +97,16 @@ private GatedCloseable<Reader> acquireReader(CatalogSnapshot catalogSnapshot) th
10297
@ExperimentalApi
10398
public static class DataFormatAwareReader implements IndexReaderProvider.Reader {
10499
private final CatalogSnapshot catalogSnapshot;
100+
private final GatedCloseable<CatalogSnapshot> snapshotRef;
105101
private final Map<DataFormat, Object> readers;
106102

107-
DataFormatAwareReader(CatalogSnapshot catalogSnapshot, Map<DataFormat, Object> readers) {
103+
DataFormatAwareReader(
104+
CatalogSnapshot catalogSnapshot,
105+
GatedCloseable<CatalogSnapshot> snapshotRef,
106+
Map<DataFormat, Object> readers
107+
) {
108108
this.catalogSnapshot = catalogSnapshot;
109+
this.snapshotRef = snapshotRef;
109110
this.readers = readers;
110111
}
111112

@@ -121,7 +122,11 @@ public CatalogSnapshot catalogSnapshot() {
121122

122123
@Override
123124
public void close() {
124-
catalogSnapshot.decRef();
125+
try {
126+
snapshotRef.close();
127+
} catch (IOException e) {
128+
throw new RuntimeException("Failed to release catalog snapshot reference", e);
129+
}
125130
}
126131
}
127132

server/src/main/java/org/opensearch/index/engine/EngineBackedIndexer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import org.opensearch.common.unit.TimeValue;
1515
import org.opensearch.core.common.unit.ByteSizeValue;
1616
import org.opensearch.index.VersionType;
17-
import org.opensearch.index.engine.exec.CatalogSnapshot;
1817
import org.opensearch.index.engine.exec.Indexer;
18+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1919
import org.opensearch.index.mapper.DocumentMapperForType;
2020
import org.opensearch.index.mapper.SourceToParse;
2121
import org.opensearch.index.merge.MergeStats;

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.opensearch.common.logging.Loggers;
1616
import org.opensearch.core.index.shard.ShardId;
1717
import org.opensearch.index.engine.dataformat.MergeResult;
18-
import org.opensearch.index.engine.exec.CatalogSnapshot;
1918
import org.opensearch.index.engine.exec.Indexer;
2019
import org.opensearch.index.engine.exec.Segment;
20+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
2121

2222
import java.util.ArrayDeque;
2323
import java.util.Collection;

server/src/main/java/org/opensearch/index/engine/exec/CatalogSnapshotLifecycleListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.index.engine.exec;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1213

1314
import java.io.IOException;
1415

server/src/main/java/org/opensearch/index/engine/exec/DataFormatEngineCatalogSnapshotListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
1212
import org.opensearch.index.engine.dataformat.DataFormat;
13+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1314

1415
import java.io.IOException;
1516
import java.util.Collection;

0 commit comments

Comments
 (0)