Skip to content

Commit b1b66a1

Browse files
committed
Extract BulkOperationFactory from OpenSearchSink
Moves getBulkOperationForAction and isUsingDocumentFilters out of OpenSearchSink into a dedicated BulkOperationFactory class. This reduces the size of OpenSearchSink and makes the bulk operation logic independently testable. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent 9eef437 commit b1b66a1

4 files changed

Lines changed: 395 additions & 445 deletions

File tree

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch;
12+
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.apache.commons.lang3.StringUtils;
16+
import org.opensearch.client.opensearch._types.VersionType;
17+
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
18+
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
19+
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
20+
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
21+
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
22+
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
23+
import org.opensearch.dataprepper.model.sink.SinkContext;
24+
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
25+
26+
import java.io.IOException;
27+
import java.util.Optional;
28+
29+
public class BulkOperationFactory {
30+
31+
private final ObjectMapper objectMapper;
32+
private final ScriptManager scriptManager;
33+
private final VersionType versionType;
34+
private final String documentRootKey;
35+
private final SinkContext sinkContext;
36+
37+
public BulkOperationFactory(final ScriptManager scriptManager,
38+
final VersionType versionType,
39+
final String documentRootKey,
40+
final SinkContext sinkContext) {
41+
this.objectMapper = new ObjectMapper();
42+
this.scriptManager = scriptManager;
43+
this.versionType = versionType;
44+
this.documentRootKey = documentRootKey;
45+
this.sinkContext = sinkContext;
46+
}
47+
48+
public BulkOperation create(final String action,
49+
final SerializedJson document,
50+
final Long version,
51+
final String indexName,
52+
final JsonNode jsonNode) {
53+
final Optional<String> docId = document.getDocumentId();
54+
final Optional<String> routing = document.getRoutingField();
55+
final Optional<String> pipeline = document.getPipelineField();
56+
57+
if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) {
58+
return createCreateOperation(document, indexName, docId, routing, pipeline);
59+
}
60+
if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) ||
61+
StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) {
62+
return createUpdateOperation(action, document, version, indexName, jsonNode, docId, routing);
63+
}
64+
if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) {
65+
return createDeleteOperation(version, indexName, docId, routing);
66+
}
67+
return createIndexOperation(document, version, indexName, docId, routing, pipeline);
68+
}
69+
70+
private BulkOperation createCreateOperation(final SerializedJson document,
71+
final String indexName,
72+
final Optional<String> docId,
73+
final Optional<String> routing,
74+
final Optional<String> pipeline) {
75+
final CreateOperation.Builder<Object> builder = new CreateOperation.Builder<>()
76+
.index(indexName)
77+
.document(document);
78+
docId.ifPresent(builder::id);
79+
routing.ifPresent(builder::routing);
80+
pipeline.ifPresent(builder::pipeline);
81+
return new BulkOperation.Builder().create(builder.build()).build();
82+
}
83+
84+
private BulkOperation createUpdateOperation(final String action,
85+
final SerializedJson document,
86+
final Long version,
87+
final String indexName,
88+
final JsonNode jsonNode,
89+
final Optional<String> docId,
90+
final Optional<String> routing) {
91+
JsonNode filteredJsonNode = jsonNode;
92+
try {
93+
if (isUsingDocumentFilters()) {
94+
filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson());
95+
}
96+
} catch (final IOException e) {
97+
throw new RuntimeException(
98+
String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage()));
99+
}
100+
101+
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
102+
final UpdateOperation.Builder<Object> builder = new UpdateOperation.Builder<>()
103+
.index(indexName)
104+
.versionType(versionType)
105+
.version(version);
106+
107+
if (scriptManager.isScriptEnabled()) {
108+
builder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null)));
109+
builder.upsert(filteredJsonNode);
110+
builder.scriptedUpsert(true);
111+
} else if (isUpsert) {
112+
builder.document(filteredJsonNode).upsert(filteredJsonNode);
113+
} else {
114+
builder.document(filteredJsonNode);
115+
}
116+
117+
docId.ifPresent(builder::id);
118+
routing.ifPresent(builder::routing);
119+
return new BulkOperation.Builder().update(builder.build()).build();
120+
}
121+
122+
private BulkOperation createDeleteOperation(final Long version,
123+
final String indexName,
124+
final Optional<String> docId,
125+
final Optional<String> routing) {
126+
final DeleteOperation.Builder builder = new DeleteOperation.Builder()
127+
.index(indexName)
128+
.versionType(versionType)
129+
.version(version);
130+
docId.ifPresent(builder::id);
131+
routing.ifPresent(builder::routing);
132+
return new BulkOperation.Builder().delete(builder.build()).build();
133+
}
134+
135+
private BulkOperation createIndexOperation(final SerializedJson document,
136+
final Long version,
137+
final String indexName,
138+
final Optional<String> docId,
139+
final Optional<String> routing,
140+
final Optional<String> pipeline) {
141+
final IndexOperation.Builder<Object> builder = new IndexOperation.Builder<>()
142+
.index(indexName)
143+
.document(document)
144+
.version(version)
145+
.versionType(versionType);
146+
docId.ifPresent(builder::id);
147+
routing.ifPresent(builder::routing);
148+
pipeline.ifPresent(builder::pipeline);
149+
return new BulkOperation.Builder().index(builder.build()).build();
150+
}
151+
152+
/**
153+
* This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down
154+
* based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to
155+
* this list to get applied for update and upsert actions
156+
* @return whether the doc needs filtering
157+
*/
158+
private boolean isUsingDocumentFilters() {
159+
return documentRootKey != null ||
160+
(sinkContext.getIncludeKeys() != null && !sinkContext.getIncludeKeys().isEmpty()) ||
161+
(sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) ||
162+
sinkContext.getTagsTargetKey() != null;
163+
}
164+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 3 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@
2121
import org.opensearch.client.opensearch._types.VersionType;
2222
import org.opensearch.client.opensearch.core.BulkRequest;
2323
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
24-
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
25-
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
26-
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
27-
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
2824
import org.opensearch.client.transport.TransportOptions;
2925
import org.opensearch.common.unit.ByteSizeUnit;
3026
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
@@ -143,6 +139,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
143139
private final String action;
144140
private final List<ActionConfiguration> actions;
145141
private final ScriptManager scriptManager;
142+
private final BulkOperationFactory bulkOperationFactory;
146143
private final String documentRootKey;
147144
private String configuredIndexAlias;
148145
private final ReentrantLock lock;
@@ -216,6 +213,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
216213
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
217214
this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType();
218215
this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression();
216+
this.bulkOperationFactory = new BulkOperationFactory(scriptManager, versionType, documentRootKey, sinkContext);
219217
this.indexManagerFactory = new IndexManagerFactory(new ClusterSettingsParser());
220218
this.failedBulkOperationConverter = new FailedBulkOperationConverter(pipeline, PLUGIN_NAME);
221219
this.initialized = false;
@@ -347,96 +345,6 @@ public boolean isReady() {
347345
return initialized;
348346
}
349347

350-
BulkOperation getBulkOperationForAction(final String action,
351-
final SerializedJson document,
352-
final Long version,
353-
final String indexName,
354-
final JsonNode jsonNode) {
355-
BulkOperation bulkOperation;
356-
final Optional<String> docId = document.getDocumentId();
357-
final Optional<String> routing = document.getRoutingField();
358-
final Optional<String> pipeline = document.getPipelineField();
359-
360-
if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) {
361-
final CreateOperation.Builder<Object> createOperationBuilder =
362-
new CreateOperation.Builder<>()
363-
.index(indexName)
364-
.document(document);
365-
docId.ifPresent(createOperationBuilder::id);
366-
routing.ifPresent(createOperationBuilder::routing);
367-
pipeline.ifPresent(createOperationBuilder::pipeline);
368-
369-
bulkOperation = new BulkOperation.Builder()
370-
.create(createOperationBuilder.build())
371-
.build();
372-
return bulkOperation;
373-
}
374-
if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) ||
375-
StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) {
376-
377-
JsonNode filteredJsonNode = jsonNode;
378-
try {
379-
if (isUsingDocumentFilters()) {
380-
filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson());
381-
}
382-
} catch (final IOException e) {
383-
throw new RuntimeException(
384-
String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage()));
385-
}
386-
387-
388-
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
389-
final UpdateOperation.Builder<Object> updateOperationBuilder = new UpdateOperation.Builder<>()
390-
.index(indexName)
391-
.versionType(versionType)
392-
.version(version);
393-
394-
if (scriptManager.isScriptEnabled()) {
395-
updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null)));
396-
updateOperationBuilder.upsert(filteredJsonNode);
397-
updateOperationBuilder.scriptedUpsert(true);
398-
} else if (isUpsert) {
399-
updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode);
400-
} else {
401-
updateOperationBuilder.document(filteredJsonNode);
402-
}
403-
404-
docId.ifPresent(updateOperationBuilder::id);
405-
routing.ifPresent(updateOperationBuilder::routing);
406-
bulkOperation = new BulkOperation.Builder()
407-
.update(updateOperationBuilder.build())
408-
.build();
409-
return bulkOperation;
410-
}
411-
if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) {
412-
final DeleteOperation.Builder deleteOperationBuilder =
413-
new DeleteOperation.Builder().index(indexName);
414-
docId.ifPresent(deleteOperationBuilder::id);
415-
routing.ifPresent(deleteOperationBuilder::routing);
416-
bulkOperation = new BulkOperation.Builder()
417-
.delete(deleteOperationBuilder
418-
.versionType(versionType)
419-
.version(version)
420-
.build())
421-
.build();
422-
return bulkOperation;
423-
}
424-
// Default to "index"
425-
final IndexOperation.Builder<Object> indexOperationBuilder =
426-
new IndexOperation.Builder<>()
427-
.index(indexName)
428-
.document(document)
429-
.version(version)
430-
.versionType(versionType);
431-
docId.ifPresent(indexOperationBuilder::id);
432-
routing.ifPresent(indexOperationBuilder::routing);
433-
pipeline.ifPresent(indexOperationBuilder::pipeline);
434-
bulkOperation = new BulkOperation.Builder()
435-
.index(indexOperationBuilder.build())
436-
.build();
437-
return bulkOperation;
438-
}
439-
440348
@Override
441349
public void doOutput(final Collection<Record<Event>> records) {
442350
final long threadId = Thread.currentThread().getId();
@@ -544,7 +452,7 @@ public void doOutput(final Collection<Record<Event>> records) {
544452
BulkOperation bulkOperation;
545453

546454
try {
547-
bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode());
455+
bulkOperation = bulkOperationFactory.create(eventAction, document, version, indexName, event.getJsonNode());
548456
} catch (final Exception e) {
549457
LOG.error("An exception occurred while constructing the bulk operation for a document: ", e);
550458
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
@@ -790,19 +698,6 @@ private DlqObject createDlqObjectFromEvent(final Event event,
790698
return builder.build();
791699
}
792700

793-
/**
794-
* This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down
795-
* based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to
796-
* this list to get applied for update and upsert actions
797-
* @return whether the doc
798-
*/
799-
private boolean isUsingDocumentFilters() {
800-
return documentRootKey != null ||
801-
(sinkContext.getIncludeKeys() != null && !sinkContext.getIncludeKeys().isEmpty()) ||
802-
(sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) ||
803-
sinkContext.getTagsTargetKey() != null;
804-
}
805-
806701
private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> flushBatch(
807702
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest,
808703
final BulkOperationWrapper bulkOperationWrapper,

0 commit comments

Comments
 (0)