Skip to content

Commit e098ebe

Browse files
committed
Move queryExecutor submission to the end of the initialization of opensearch sink
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent d75ab26 commit e098ebe

1 file changed

Lines changed: 5 additions & 5 deletions

File tree

  • data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,6 @@ private void doInitializeInternal() throws IOException {
249249
openSearchClientRefresher = new OpenSearchClientRefresher(
250250
pluginMetrics, connectionConfiguration, clientFunction);
251251

252-
if (queryExecutorService != null) {
253-
existingDocumentQueryManager = new ExistingDocumentQueryManager(openSearchSinkConfig.getIndexConfiguration(), pluginMetrics, openSearchClient);
254-
queryExecutorService.submit(existingDocumentQueryManager);
255-
}
256-
257252
pluginConfigObservable.addPluginConfigObserver(
258253
newOpenSearchSinkConfig -> openSearchClientRefresher.update((OpenSearchSinkConfig) newOpenSearchSinkConfig));
259254
configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias();
@@ -304,6 +299,11 @@ private void doInitializeInternal() throws IOException {
304299
PLUGIN_NAME,
305300
openSearchSinkConfig.getIndexConfiguration().getQueryOnBulkFailures() ? existingDocumentQueryManager : null);
306301

302+
if (queryExecutorService != null) {
303+
existingDocumentQueryManager = new ExistingDocumentQueryManager(openSearchSinkConfig.getIndexConfiguration(), pluginMetrics, openSearchClient);
304+
queryExecutorService.submit(existingDocumentQueryManager);
305+
}
306+
307307
this.initialized = true;
308308
LOG.info("Initialized OpenSearch sink");
309309
}

0 commit comments

Comments
 (0)