Skip to content

Commit e6d708d

Browse files
ask-kamal-nayanKamal Nayan
andauthored
Share single FormatChecksumStrategy instance per shard between engine and store (#21232)
* Share single FormatChecksumStrategy instance per shard, between engine and directory Signed-off-by: Kamal Nayan <askkamal@amazon.com> * Minor refactoring Signed-off-by: Kamal Nayan <askkamal@amazon.com> * Added Tests for checkum strategy sharing Signed-off-by: Kamal Nayan <askkamal@amazon.com> * Add checksum read-back assertion to shared strategy test Signed-off-by: Kamal Nayan <askkamal@amazon.com> * Fix indexingEngine call in CompositeIndexingExecutionEngineTests for updated API Signed-off-by: Kamal Nayan <askkamal@amazon.com> * Fix compilation after rebase with upstream Lucene engine plugin Signed-off-by: Kamal Nayan <askkamal@amazon.com> * Updated DataFormatRegistry tests to increase test coverage Signed-off-by: Kamal Nayan <askkamal@amazon.com> * updated the code to use supplier for DataFormatDescriptors for lazy initialization Signed-off-by: Kamal Nayan <askkamal@amazon.com> --------- Signed-off-by: Kamal Nayan <askkamal@amazon.com> Co-authored-by: Kamal Nayan <askkamal@amazon.com>
1 parent e361637 commit e6d708d

32 files changed

Lines changed: 446 additions & 183 deletions

File tree

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LucenePlugin.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.opensearch.index.engine.exec.EngineReaderManager;
2323
import org.opensearch.index.engine.exec.commit.Committer;
2424
import org.opensearch.index.engine.exec.commit.CommitterFactory;
25-
import org.opensearch.index.store.FormatChecksumStrategy;
2625
import org.opensearch.plugins.EnginePlugin;
2726
import org.opensearch.plugins.Plugin;
2827
import org.opensearch.plugins.SearchBackEndPlugin;
@@ -66,15 +65,11 @@ public DataFormat getDataFormat() {
6665
* Requires the committer to be a {@link LuceneCommitter}.
6766
*
6867
* @param indexingEngineConfig the engine configuration containing committer, mapper service, and store
69-
* @param checksumStrategy the checksum strategy for the format (unused by Lucene)
7068
* @return a new Lucene indexing execution engine
7169
* @throws IllegalStateException if the committer is not a {@link LuceneCommitter}
7270
*/
7371
@Override
74-
public IndexingExecutionEngine<?, ?> indexingEngine(
75-
IndexingEngineConfig indexingEngineConfig,
76-
FormatChecksumStrategy checksumStrategy
77-
) {
72+
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig indexingEngineConfig) {
7873
Committer committer = indexingEngineConfig.committer();
7974
if (committer instanceof LuceneCommitter luceneCommitter) {
8075
return new LuceneIndexingExecutionEngine(

sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/index/LuceneCommitterTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ private CommitterConfig createCommitterConfig() throws IOException {
9494
null,
9595
null,
9696
null,
97+
null,
9798
null
9899
);
99100
return new CommitterConfig(engineConfig);

sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngineTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private LuceneCommitter createCommitter() throws IOException {
123123
null,
124124
null,
125125
null,
126+
null,
126127
null
127128
);
128129
CommitterConfig settings = new CommitterConfig(engineConfig);

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.Set;
4343
import java.util.function.Function;
44+
import java.util.function.Supplier;
4445

4546
/**
4647
* Integration tests for composite merge operations across single and multiple data format engines.
@@ -66,8 +67,8 @@ public MockParquetDataFormatPlugin() {
6667
}
6768

6869
@Override
69-
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
70-
return Map.of("parquet", new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
70+
public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
71+
return Map.of("parquet", () -> new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
7172
}
7273

7374
@Override

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
2121
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
2222
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
23-
import org.opensearch.index.store.FormatChecksumStrategy;
2423
import org.opensearch.plugins.ExtensiblePlugin;
2524
import org.opensearch.plugins.Plugin;
2625

2726
import java.util.Collections;
2827
import java.util.HashMap;
2928
import java.util.List;
3029
import java.util.Map;
30+
import java.util.function.Supplier;
3131

3232
/**
3333
* Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for
@@ -92,35 +92,33 @@ public DataFormat getDataFormat() {
9292
}
9393

9494
@Override
95-
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings, FormatChecksumStrategy checksumStrategy) {
96-
Map<String, FormatChecksumStrategy> strategies = new HashMap<>();
97-
for (Map.Entry<String, DataFormatDescriptor> entry : getFormatDescriptors(settings.indexSettings(), settings.registry())
98-
.entrySet()) {
99-
strategies.put(entry.getKey(), entry.getValue().getChecksumStrategy());
100-
}
95+
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings) {
10196
return new CompositeIndexingExecutionEngine(
10297
settings.indexSettings(),
10398
settings.mapperService(),
10499
settings.committer(),
105100
settings.registry(),
106101
settings.store(),
107-
strategies
102+
settings.checksumStrategies()
108103
);
109104
}
110105

111106
@Override
112-
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
107+
public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(
108+
IndexSettings indexSettings,
109+
DataFormatRegistry dataFormatRegistry
110+
) {
113111
Settings settings = indexSettings.getSettings();
114112
String primaryFormatName = PRIMARY_DATA_FORMAT.get(settings);
115113
List<String> secondaryFormatNames = SECONDARY_DATA_FORMATS.get(settings);
116114

117-
Map<String, DataFormatDescriptor> descriptors = new HashMap<>();
115+
Map<String, Supplier<DataFormatDescriptor>> descriptors = new HashMap<>();
118116
if (primaryFormatName != null) {
119-
descriptors.putAll(dataFormatRegistry.getFormatDescriptors(indexSettings));
117+
descriptors.putAll(dataFormatRegistry.getFormatDescriptors(indexSettings, dataFormatRegistry.format(primaryFormatName)));
120118
}
121119
for (String secondaryName : secondaryFormatNames) {
122120
if (secondaryName != null) {
123-
descriptors.putAll(dataFormatRegistry.getFormatDescriptors(indexSettings));
121+
descriptors.putAll(dataFormatRegistry.getFormatDescriptors(indexSettings, dataFormatRegistry.format(secondaryName)));
124122
}
125123
}
126124
return Map.copyOf(descriptors);

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,14 @@ public CompositeIndexingExecutionEngine(
115115
validateFormatsRegistered(dataFormatRegistry, primaryFormatName, secondaryFormatNames);
116116

117117
Map<String, FormatChecksumStrategy> strategies = checksumStrategies != null ? checksumStrategies : Map.of();
118-
IndexingEngineConfig engineSettings = new IndexingEngineConfig(committer, mapperService, indexSettings, store, dataFormatRegistry);
118+
IndexingEngineConfig engineSettings = new IndexingEngineConfig(
119+
committer,
120+
mapperService,
121+
indexSettings,
122+
store,
123+
dataFormatRegistry,
124+
strategies
125+
);
119126

120127
List<DataFormat> allFormats = new ArrayList<>();
121128
DataFormat primaryFormat = dataFormatRegistry.format(primaryFormatName);

sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatPluginTests.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import org.opensearch.common.settings.Setting;
1212
import org.opensearch.common.settings.Settings;
1313
import org.opensearch.index.IndexSettings;
14+
import org.opensearch.index.engine.dataformat.DataFormat;
1415
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
1516
import org.opensearch.test.OpenSearchTestCase;
1617

1718
import java.util.List;
1819
import java.util.Map;
20+
import java.util.function.Supplier;
1921

2022
import static org.mockito.Mockito.mock;
2123
import static org.mockito.Mockito.when;
@@ -60,24 +62,27 @@ public void testGetFormatDescriptorsDelegatestoPlugins() {
6062
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
6163

6264
DataFormatRegistry registry = mock(DataFormatRegistry.class);
63-
when(registry.format("parquet")).thenReturn(CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of()));
64-
when(registry.getFormatDescriptors(indexSettings)).thenReturn(
65+
DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of());
66+
when(registry.format("parquet")).thenReturn(parquetFormat);
67+
when(registry.format("lucene")).thenReturn(CompositeTestHelper.stubFormat("lucene", 1, java.util.Set.of()));
68+
when(registry.getFormatDescriptors(indexSettings, parquetFormat)).thenReturn(
6569
Map.of(
6670
"parquet",
67-
new org.opensearch.index.engine.dataformat.DataFormatDescriptor(
68-
"parquet",
69-
new org.opensearch.index.store.checksum.GenericCRC32ChecksumHandler()
70-
)
71+
(Supplier<
72+
org.opensearch.index.engine.dataformat.DataFormatDescriptor>) () -> new org.opensearch.index.engine.dataformat.DataFormatDescriptor(
73+
"parquet",
74+
new org.opensearch.index.store.checksum.GenericCRC32ChecksumHandler()
75+
)
7176
)
7277
);
7378

74-
Map<String, org.opensearch.index.engine.dataformat.DataFormatDescriptor> descriptors = plugin.getFormatDescriptors(
79+
Map<String, Supplier<org.opensearch.index.engine.dataformat.DataFormatDescriptor>> descriptors = plugin.getFormatDescriptors(
7580
indexSettings,
7681
registry
7782
);
7883
assertEquals(1, descriptors.size());
7984
assertTrue(descriptors.containsKey("parquet"));
80-
assertEquals("parquet", descriptors.get("parquet").getFormatName());
85+
assertEquals("parquet", descriptors.get("parquet").get().getFormatName());
8186
}
8287

8388
public void testGetFormatDescriptorsEmptyWhenNoPluginsMatch() {
@@ -94,7 +99,7 @@ public void testGetFormatDescriptorsEmptyWhenNoPluginsMatch() {
9499
.build();
95100
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
96101

97-
Map<String, org.opensearch.index.engine.dataformat.DataFormatDescriptor> descriptors = plugin.getFormatDescriptors(
102+
Map<String, Supplier<org.opensearch.index.engine.dataformat.DataFormatDescriptor>> descriptors = plugin.getFormatDescriptors(
98103
indexSettings,
99104
registry
100105
);

sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testConstructorThrowsWhenSecondaryFormatNotRegistered() {
7676
when(registry.getRegisteredFormats()).thenReturn(Set.of(CompositeTestHelper.stubFormat("lucene", 1, Set.of())));
7777
when(registry.getIndexingEngine(any(), any())).thenAnswer(invocation -> {
7878
DataFormatPlugin plugin = CompositeTestHelper.stubPlugin("lucene", 1);
79-
return plugin.indexingEngine(null, null);
79+
return plugin.indexingEngine(null);
8080
});
8181

8282
Settings settings = Settings.builder()

sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.index.engine.exec.commit.Committer;
3232
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
3333
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
34-
import org.opensearch.index.store.FormatChecksumStrategy;
3534

3635
import java.util.Collection;
3736
import java.util.Collections;
@@ -72,7 +71,7 @@ static CompositeIndexingExecutionEngine createStubEngine(String primaryName, Str
7271
when(registry.getIndexingEngine(any(), any())).thenAnswer(invocation -> {
7372
DataFormat format = invocation.getArgument(1);
7473
DataFormatPlugin plugin = plugins.get(format.name());
75-
return plugin.indexingEngine(null, null);
74+
return plugin.indexingEngine(null);
7675
});
7776

7877
Settings.Builder settingsBuilder = Settings.builder()
@@ -101,7 +100,7 @@ public DataFormat getDataFormat() {
101100
}
102101

103102
@Override
104-
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings, FormatChecksumStrategy checksumStrategy) {
103+
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings) {
105104
return new StubIndexingExecutionEngine(format);
106105
}
107106
};
@@ -116,7 +115,7 @@ public DataFormat getDataFormat() {
116115
}
117116

118117
@Override
119-
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings, FormatChecksumStrategy checksumStrategy) {
118+
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig settings) {
120119
return new StubIndexingExecutionEngine(format);
121120
}
122121
};

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
2525
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
2626
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
27-
import org.opensearch.index.store.FormatChecksumStrategy;
2827
import org.opensearch.index.store.PrecomputedChecksumStrategy;
2928
import org.opensearch.parquet.engine.ParquetDataFormat;
3029
import org.opensearch.parquet.engine.ParquetIndexingEngine;
@@ -52,10 +51,9 @@
5251
* {@link #createComponents} and passes them to the per-shard
5352
* {@link ParquetIndexingEngine} instances created in {@link #indexingEngine}.
5453
*
55-
* <p>The descriptor provides a {@link PrecomputedChecksumStrategy} that the directory
56-
* holds at construction time. The {@link ParquetIndexingEngine} receives the same
57-
* strategy instance from the directory via
58-
* {@link org.opensearch.index.store.DataFormatAwareStoreDirectory#getChecksumStrategy},
54+
* <p>The descriptor provides a {@link PrecomputedChecksumStrategy} that is created once
55+
* per shard during initialization. The same strategy instance is shared between the
56+
* directory and the {@link ParquetIndexingEngine} via the checksum strategies map,
5957
* so pre-computed CRC32 values registered during write are directly visible to the
6058
* upload path — no post-construction wiring needed.
6159
*
@@ -99,23 +97,23 @@ public DataFormat getDataFormat() {
9997
}
10098

10199
@Override
102-
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig engineConfig, FormatChecksumStrategy checksumStrategy) {
100+
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig engineConfig) {
103101
return new ParquetIndexingEngine(
104102
settings,
105103
dataFormat,
106104
engineConfig.store().shardPath(),
107105
() -> ArrowSchemaBuilder.getSchema(engineConfig.mapperService()),
108106
engineConfig.indexSettings(),
109107
threadPool,
110-
checksumStrategy
108+
engineConfig.checksumStrategies().get(ParquetDataFormat.PARQUET_DATA_FORMAT_NAME)
111109
);
112110
}
113111

114112
@Override
115-
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
113+
public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
116114
return Map.of(
117115
ParquetDataFormat.PARQUET_DATA_FORMAT_NAME,
118-
new DataFormatDescriptor(ParquetDataFormat.PARQUET_DATA_FORMAT_NAME, new PrecomputedChecksumStrategy())
116+
() -> new DataFormatDescriptor(ParquetDataFormat.PARQUET_DATA_FORMAT_NAME, new PrecomputedChecksumStrategy())
119117
);
120118
}
121119

0 commit comments

Comments
 (0)