Skip to content

Commit 49f9918

Browse files
authored
Moves the creation of OpenSearch clients back into the initalize() method. (#6855)
Data Prepper doesn't have strong lifecycles beyond initialize() and shutdown(). So creating multiple pipelines or attempting to initialize over time resulted in running out of files. This reverts the change to use the constructor until a better way to close plugins is available. Signed-off-by: David Venable <dlv@amazon.com>
1 parent 513dcfb commit 49f9918

2 files changed

Lines changed: 59 additions & 46 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,14 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
6565
private final IndexManagerFactory indexManagerFactory;
6666
private final IndexType indexType;
6767
private final PluginConfigObservable pluginConfigObservable;
68-
private final Ingester ingester;
69-
70-
private final RestHighLevelClient restHighLevelClient;
71-
private final OpenSearchClient openSearchClient;
72-
private final OpenSearchClientRefresher openSearchClientRefresher;
73-
private final ConnectionConfiguration connectionConfiguration;
68+
private final ExpressionEvaluator expressionEvaluator;
69+
private final SinkContext sinkContext;
70+
private final String pipeline;
71+
private Ingester ingester;
72+
73+
private RestHighLevelClient restHighLevelClient;
74+
private OpenSearchClient openSearchClient;
75+
private OpenSearchClientRefresher openSearchClientRefresher;
7476
private IndexManager indexManager;
7577
private volatile boolean initialized;
7678

@@ -85,40 +87,16 @@ public OpenSearchSink(final PluginSetting pluginSetting,
8587
super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS);
8688
this.awsCredentialsSupplier = awsCredentialsSupplier;
8789
this.pluginConfigObservable = pluginConfigObservable;
90+
this.expressionEvaluator = expressionEvaluator;
8891

89-
final SinkContext resolvedSinkContext = sinkContext != null ? sinkContext :
92+
this.sinkContext = sinkContext != null ? sinkContext :
9093
new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
9194

9295
this.openSearchSinkConfig = OpenSearchSinkConfiguration.readOSConfig(openSearchSinkConfiguration, expressionEvaluator);
9396
this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType();
9497
this.indexManagerFactory = new IndexManagerFactory(new ClusterSettingsParser());
98+
this.pipeline = pipelineDescription.getPipelineName();
9599
this.initialized = false;
96-
97-
connectionConfiguration = openSearchSinkConfig.getConnectionConfiguration();
98-
restHighLevelClient = connectionConfiguration.createClient(awsCredentialsSupplier);
99-
openSearchClient = connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier);
100-
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction =
101-
(connConfig) -> {
102-
final RestHighLevelClient client = connConfig.createClient(awsCredentialsSupplier);
103-
return connConfig.createOpenSearchClient(client, awsCredentialsSupplier).withTransportOptions(
104-
TransportOptions.builder()
105-
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
106-
.build());
107-
};
108-
openSearchClientRefresher = new OpenSearchClientRefresher(
109-
pluginMetrics, connectionConfiguration, clientFunction);
110-
111-
final String pipeline = pipelineDescription.getPipelineName();
112-
final EventActionResolver eventActionResolver = new EventActionResolver(
113-
openSearchSinkConfig.getIndexConfiguration().getAction(),
114-
openSearchSinkConfig.getIndexConfiguration().getActions(),
115-
expressionEvaluator);
116-
117-
this.ingester = new BulkIngester(openSearchSinkConfig, expressionEvaluator, resolvedSinkContext,
118-
pluginMetrics, pipeline, eventActionResolver,
119-
openSearchClient, () -> openSearchClientRefresher.get(),
120-
this::getIndexManager, this::getFailurePipeline,
121-
new CustomDocumentBuilderFactory().create(this.indexType));
122100
}
123101

124102
@Override
@@ -145,6 +123,20 @@ public void doInitialize() {
145123
private void doInitializeInternal() throws IOException {
146124
LOG.info("Initializing OpenSearch sink");
147125

126+
final ConnectionConfiguration connectionConfiguration = openSearchSinkConfig.getConnectionConfiguration();
127+
restHighLevelClient = connectionConfiguration.createClient(awsCredentialsSupplier);
128+
openSearchClient = connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier);
129+
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction =
130+
(connConfig) -> {
131+
final RestHighLevelClient client = connConfig.createClient(awsCredentialsSupplier);
132+
return connConfig.createOpenSearchClient(client, awsCredentialsSupplier).withTransportOptions(
133+
TransportOptions.builder()
134+
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
135+
.build());
136+
};
137+
openSearchClientRefresher = new OpenSearchClientRefresher(
138+
pluginMetrics, connectionConfiguration, clientFunction);
139+
148140
pluginConfigObservable.addPluginConfigObserver(
149141
newOpenSearchSinkConfig -> openSearchClientRefresher.update((OpenSearchSinkConfig) newOpenSearchSinkConfig));
150142

@@ -167,6 +159,17 @@ private void doInitializeInternal() throws IOException {
167159

168160
indexManager.setupIndex();
169161

162+
final EventActionResolver eventActionResolver = new EventActionResolver(
163+
openSearchSinkConfig.getIndexConfiguration().getAction(),
164+
openSearchSinkConfig.getIndexConfiguration().getActions(),
165+
expressionEvaluator);
166+
167+
ingester = new BulkIngester(openSearchSinkConfig, expressionEvaluator, sinkContext,
168+
pluginMetrics, pipeline, eventActionResolver,
169+
openSearchClient, () -> openSearchClientRefresher.get(),
170+
this::getIndexManager, this::getFailurePipeline,
171+
new CustomDocumentBuilderFactory().create(this.indexType));
172+
170173
ingester.initialize();
171174

172175
this.initialized = true;
@@ -190,7 +193,9 @@ public void doOutput(final Collection<Record<Event>> records) {
190193
@Override
191194
public void shutdown() {
192195
super.shutdown();
193-
ingester.shutdown();
196+
if (ingester != null) {
197+
ingester.shutdown();
198+
}
194199
if (restHighLevelClient != null) {
195200
try {
196201
restHighLevelClient.close();

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_VERSION_EXPRESSION_DROPPED_EVENTS;
7070
import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_BULK_SIZE;
7171
import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_FLUSH_TIMEOUT;
72+
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;
7273

7374
@ExtendWith(MockitoExtension.class)
7475
public class OpenSearchSinkTest {
@@ -165,8 +166,7 @@ private OpenSearchSink createObjectUnderTest() throws IOException {
165166
final MockedStatic<PluginMetrics> pluginMetricsMockedStatic = mockStatic(PluginMetrics.class);
166167
final MockedConstruction<IndexManagerFactory> indexManagerFactoryMockedConstruction = mockConstruction(IndexManagerFactory.class, (mock, context) -> {
167168
indexManagerFactory = mock;
168-
});
169-
final MockedConstruction<BulkIngester> bulkIngesterMockedConstruction = mockConstruction(BulkIngester.class)) {
169+
})) {
170170
pluginMetricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics);
171171
openSearchSinkConfigurationMockedStatic.when(() -> OpenSearchSinkConfiguration.readOSConfig(openSearchSinkConfig, expressionEvaluator))
172172
.thenReturn(openSearchSinkConfiguration);
@@ -181,7 +181,10 @@ void test_initialization() throws IOException {
181181
when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any()))
182182
.thenReturn(indexManager);
183183
doNothing().when(indexManager).setupIndex();
184-
objectUnderTest.initialize();
184+
185+
try (final MockedConstruction<BulkIngester> ignored = mockConstruction(BulkIngester.class)) {
186+
objectUnderTest.initialize();
187+
}
185188

186189
verify(pluginConfigObservable).addPluginConfigObserver(any());
187190
assertThat(objectUnderTest.isReady(), equalTo(true));
@@ -193,16 +196,20 @@ void test_initialization_with_failure_and_retry() throws IOException {
193196
when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any()))
194197
.thenThrow(RuntimeException.class).thenReturn(indexManager);
195198
doNothing().when(indexManager).setupIndex();
196-
objectUnderTest.initialize();
197-
objectUnderTest.initialize();
199+
200+
try (final MockedConstruction<BulkIngester> ignored = mockConstruction(BulkIngester.class)) {
201+
objectUnderTest.initialize();
202+
objectUnderTest.initialize();
203+
}
198204
verify(pluginConfigObservable, times(2)).addPluginConfigObserver(any());
199205
}
200206

201207
@Test
202208
void doOutput_delegates_to_ingester() throws Exception {
203209
final OpenSearchSink objectUnderTest = createObjectUnderTest();
204210

205-
final Ingester ingester = getField(objectUnderTest, "ingester");
211+
final Ingester ingester = mock(Ingester.class);
212+
setField(OpenSearchSink.class, objectUnderTest, "ingester", ingester);
206213
final List<Record<Event>> records = Collections.emptyList();
207214
objectUnderTest.doOutput(records);
208215

@@ -213,16 +220,17 @@ void doOutput_delegates_to_ingester() throws Exception {
213220
void shutdown_delegates_to_ingester() throws Exception {
214221
final OpenSearchSink objectUnderTest = createObjectUnderTest();
215222

216-
final Ingester ingester = getField(objectUnderTest, "ingester");
223+
final Ingester ingester = mock(Ingester.class);
224+
setField(OpenSearchSink.class, objectUnderTest, "ingester", ingester);
217225
objectUnderTest.shutdown();
218226

219227
verify(ingester).shutdown();
220228
}
221229

222-
@SuppressWarnings("unchecked")
223-
private static <T> T getField(final Object target, final String fieldName) throws Exception {
224-
java.lang.reflect.Field field = target.getClass().getDeclaredField(fieldName);
225-
field.setAccessible(true);
226-
return (T) field.get(target);
230+
@Test
231+
void shutdown_without_initialization_does_not_throw() throws Exception {
232+
final OpenSearchSink objectUnderTest = createObjectUnderTest();
233+
objectUnderTest.shutdown();
227234
}
235+
228236
}

0 commit comments

Comments
 (0)