From f29c3e2ac76e970cedce14ce46db94c6b7671fb0 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 29 Jul 2025 15:44:48 -0500 Subject: [PATCH] Move queryExecutor submission to the end of the initialization of opensearch sink Signed-off-by: Taylor Gray --- .../plugins/sink/opensearch/OpenSearchSink.java | 12 ++++++------ .../sink/opensearch/OpenSearchSinkTest.java | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 5168861a2b..64e409a1fe 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -249,11 +249,6 @@ private void doInitializeInternal() throws IOException { openSearchClientRefresher = new OpenSearchClientRefresher( pluginMetrics, connectionConfiguration, clientFunction); - if (queryExecutorService != null) { - existingDocumentQueryManager = new ExistingDocumentQueryManager(openSearchSinkConfig.getIndexConfiguration(), pluginMetrics, openSearchClient); - queryExecutorService.submit(existingDocumentQueryManager); - } - pluginConfigObservable.addPluginConfigObserver( newOpenSearchSinkConfig -> openSearchClientRefresher.update((OpenSearchSinkConfig) newOpenSearchSinkConfig)); configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias(); @@ -304,6 +299,11 @@ private void doInitializeInternal() throws IOException { PLUGIN_NAME, openSearchSinkConfig.getIndexConfiguration().getQueryOnBulkFailures() ? existingDocumentQueryManager : null); + if (queryExecutorService != null) { + existingDocumentQueryManager = new ExistingDocumentQueryManager(openSearchSinkConfig.getIndexConfiguration(), pluginMetrics, openSearchClient); + queryExecutorService.submit(existingDocumentQueryManager); + } + this.initialized = true; LOG.info("Initialized OpenSearch sink"); } @@ -649,7 +649,7 @@ public void shutdown() { super.shutdown(); closeFiles(); openSearchClient.shutdown(); - if (queryExecutorService != null) { + if (queryExecutorService != null && existingDocumentQueryManager != null) { existingDocumentQueryManager.stop(); queryExecutorService.shutdown(); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 191951f0e1..97b374cc94 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -58,6 +58,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; @@ -203,6 +204,19 @@ void test_initialization() throws IOException { verify(pluginConfigObservable).addPluginConfigObserver(any()); } + @Test + void test_initialization_with_failure_and_retry_with_query_manager() throws IOException { + when(openSearchSinkConfiguration.getIndexConfiguration().getQueryTerm()).thenReturn(UUID.randomUUID().toString()); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenThrow(RuntimeException.class).thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + objectUnderTest.initialize(); + verify(pluginConfigObservable, times(2)).addPluginConfigObserver(any()); + } + @Test void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_creates_DLQObject() throws IOException { when(pluginSetting.getName()).thenReturn("opensearch");