Skip to content

Commit 8a14fd7

Browse files
authored
Refactor getBulkOperationForAction into BulkOperationFactory (#6748)
* Refactor getBulkOperationForAction into BulkOperationFactory Extract bulk operation creation logic from OpenSearchSink into a dedicated BulkOperationFactory class for improved testability and separation of concerns. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add BulkOperationFactory unit tests and update script tests Add BulkOperationFactoryTest covering create, index, update, upsert, delete, default action, optional fields, document filters, and pipeline setting. Update OpenSearchSinkScriptTest to test BulkOperationFactory directly instead of going through OpenSearchSink. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Fix script to use filtered document when document filters are active Pass filteredJsonNode instead of jsonNode to scriptManager.buildScript so that params.doc in the script reflects the post-filter document, consistent with how builder.document() and builder.upsert() use the filtered version. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent f110cfe commit 8a14fd7

4 files changed

Lines changed: 325 additions & 268 deletions

File tree

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch;
12+
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.apache.commons.lang3.StringUtils;
16+
import org.opensearch.client.opensearch._types.VersionType;
17+
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
18+
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
19+
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
20+
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
21+
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
22+
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
23+
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
24+
25+
import java.io.IOException;
26+
import java.util.Optional;
27+
28+
public class BulkOperationFactory {
29+
30+
private final VersionType versionType;
31+
private final ScriptManager scriptManager;
32+
private final ObjectMapper objectMapper;
33+
private final boolean usingDocumentFilters;
34+
35+
public BulkOperationFactory(final VersionType versionType,
36+
final ScriptManager scriptManager,
37+
final ObjectMapper objectMapper,
38+
final boolean usingDocumentFilters) {
39+
this.versionType = versionType;
40+
this.scriptManager = scriptManager;
41+
this.objectMapper = objectMapper;
42+
this.usingDocumentFilters = usingDocumentFilters;
43+
}
44+
45+
public BulkOperation create(final String action,
46+
final SerializedJson document,
47+
final Long version,
48+
final String indexName,
49+
final JsonNode jsonNode) {
50+
final Optional<String> docId = document.getDocumentId();
51+
final Optional<String> routing = document.getRoutingField();
52+
final Optional<String> pipeline = document.getPipelineField();
53+
54+
if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) {
55+
final CreateOperation.Builder<Object> builder =
56+
new CreateOperation.Builder<>()
57+
.index(indexName)
58+
.document(document);
59+
docId.ifPresent(builder::id);
60+
routing.ifPresent(builder::routing);
61+
pipeline.ifPresent(builder::pipeline);
62+
return new BulkOperation.Builder().create(builder.build()).build();
63+
}
64+
65+
if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) ||
66+
StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) {
67+
return createUpdateOperation(action, document, version, indexName, jsonNode, docId, routing);
68+
}
69+
70+
if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) {
71+
final DeleteOperation.Builder builder = new DeleteOperation.Builder()
72+
.index(indexName)
73+
.versionType(versionType)
74+
.version(version);
75+
docId.ifPresent(builder::id);
76+
routing.ifPresent(builder::routing);
77+
return new BulkOperation.Builder().delete(builder.build()).build();
78+
}
79+
80+
// Default to "index"
81+
final IndexOperation.Builder<Object> builder = new IndexOperation.Builder<>()
82+
.index(indexName)
83+
.document(document)
84+
.version(version)
85+
.versionType(versionType);
86+
docId.ifPresent(builder::id);
87+
routing.ifPresent(builder::routing);
88+
pipeline.ifPresent(builder::pipeline);
89+
return new BulkOperation.Builder().index(builder.build()).build();
90+
}
91+
92+
private BulkOperation createUpdateOperation(final String action,
93+
final SerializedJson document,
94+
final Long version,
95+
final String indexName,
96+
final JsonNode jsonNode,
97+
final Optional<String> docId,
98+
final Optional<String> routing) {
99+
JsonNode filteredJsonNode = jsonNode;
100+
try {
101+
if (usingDocumentFilters) {
102+
filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson());
103+
}
104+
} catch (final IOException e) {
105+
throw new RuntimeException(
106+
String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage()));
107+
}
108+
109+
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
110+
final UpdateOperation.Builder<Object> builder = new UpdateOperation.Builder<>()
111+
.index(indexName)
112+
.versionType(versionType)
113+
.version(version);
114+
115+
if (scriptManager.isScriptEnabled()) {
116+
builder.script(scriptManager.buildScript(filteredJsonNode, document.getResolvedScriptParameters().orElse(null)));
117+
if (isUpsert) {
118+
builder.upsert(filteredJsonNode);
119+
builder.scriptedUpsert(true);
120+
}
121+
} else if (isUpsert) {
122+
builder.document(filteredJsonNode).upsert(filteredJsonNode);
123+
} else {
124+
builder.document(filteredJsonNode);
125+
}
126+
127+
docId.ifPresent(builder::id);
128+
routing.ifPresent(builder::routing);
129+
return new BulkOperation.Builder().update(builder.build()).build();
130+
}
131+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 3 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.opensearch.dataprepper.plugins.sink.opensearch;
1111

1212
import com.google.common.annotations.VisibleForTesting;
13-
import com.fasterxml.jackson.databind.JsonNode;
1413
import com.fasterxml.jackson.databind.ObjectMapper;
1514
import io.micrometer.core.instrument.Counter;
1615
import io.micrometer.core.instrument.DistributionSummary;
@@ -21,10 +20,6 @@
2120
import org.opensearch.client.opensearch._types.VersionType;
2221
import org.opensearch.client.opensearch.core.BulkRequest;
2322
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
24-
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
25-
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
26-
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
27-
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
2823
import org.opensearch.client.transport.TransportOptions;
2924
import org.opensearch.common.unit.ByteSizeUnit;
3025
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
@@ -140,6 +135,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
140135
private final String action;
141136
private final List<ActionConfiguration> actions;
142137
private final ScriptManager scriptManager;
138+
private final BulkOperationFactory bulkOperationFactory;
143139
private final String documentRootKey;
144140
private String configuredIndexAlias;
145141
private final ReentrantLock lock;
@@ -221,6 +217,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
221217
this.lastFlushTimeMap = new ConcurrentHashMap<>();
222218
this.pluginConfigObservable = pluginConfigObservable;
223219
this.objectMapper = new ObjectMapper();
220+
this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters());
224221
this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ?
225222
Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null;
226223

@@ -344,96 +341,6 @@ public boolean isReady() {
344341
return initialized;
345342
}
346343

347-
BulkOperation getBulkOperationForAction(final String action,
348-
final SerializedJson document,
349-
final Long version,
350-
final String indexName,
351-
final JsonNode jsonNode) {
352-
BulkOperation bulkOperation;
353-
final Optional<String> docId = document.getDocumentId();
354-
final Optional<String> routing = document.getRoutingField();
355-
final Optional<String> pipeline = document.getPipelineField();
356-
357-
if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) {
358-
final CreateOperation.Builder<Object> createOperationBuilder =
359-
new CreateOperation.Builder<>()
360-
.index(indexName)
361-
.document(document);
362-
docId.ifPresent(createOperationBuilder::id);
363-
routing.ifPresent(createOperationBuilder::routing);
364-
pipeline.ifPresent(createOperationBuilder::pipeline);
365-
366-
bulkOperation = new BulkOperation.Builder()
367-
.create(createOperationBuilder.build())
368-
.build();
369-
return bulkOperation;
370-
}
371-
if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) ||
372-
StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) {
373-
374-
JsonNode filteredJsonNode = jsonNode;
375-
try {
376-
if (isUsingDocumentFilters()) {
377-
filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson());
378-
}
379-
} catch (final IOException e) {
380-
throw new RuntimeException(
381-
String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage()));
382-
}
383-
384-
385-
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
386-
final UpdateOperation.Builder<Object> updateOperationBuilder = new UpdateOperation.Builder<>()
387-
.index(indexName)
388-
.versionType(versionType)
389-
.version(version);
390-
391-
if (scriptManager.isScriptEnabled()) {
392-
updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null)));
393-
updateOperationBuilder.upsert(filteredJsonNode);
394-
updateOperationBuilder.scriptedUpsert(true);
395-
} else if (isUpsert) {
396-
updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode);
397-
} else {
398-
updateOperationBuilder.document(filteredJsonNode);
399-
}
400-
401-
docId.ifPresent(updateOperationBuilder::id);
402-
routing.ifPresent(updateOperationBuilder::routing);
403-
bulkOperation = new BulkOperation.Builder()
404-
.update(updateOperationBuilder.build())
405-
.build();
406-
return bulkOperation;
407-
}
408-
if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) {
409-
final DeleteOperation.Builder deleteOperationBuilder =
410-
new DeleteOperation.Builder().index(indexName);
411-
docId.ifPresent(deleteOperationBuilder::id);
412-
routing.ifPresent(deleteOperationBuilder::routing);
413-
bulkOperation = new BulkOperation.Builder()
414-
.delete(deleteOperationBuilder
415-
.versionType(versionType)
416-
.version(version)
417-
.build())
418-
.build();
419-
return bulkOperation;
420-
}
421-
// Default to "index"
422-
final IndexOperation.Builder<Object> indexOperationBuilder =
423-
new IndexOperation.Builder<>()
424-
.index(indexName)
425-
.document(document)
426-
.version(version)
427-
.versionType(versionType);
428-
docId.ifPresent(indexOperationBuilder::id);
429-
routing.ifPresent(indexOperationBuilder::routing);
430-
pipeline.ifPresent(indexOperationBuilder::pipeline);
431-
bulkOperation = new BulkOperation.Builder()
432-
.index(indexOperationBuilder.build())
433-
.build();
434-
return bulkOperation;
435-
}
436-
437344
@Override
438345
public void doOutput(final Collection<Record<Event>> records) {
439346
final long threadId = Thread.currentThread().getId();
@@ -541,7 +448,7 @@ public void doOutput(final Collection<Record<Event>> records) {
541448
BulkOperation bulkOperation;
542449

543450
try {
544-
bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode());
451+
bulkOperation = bulkOperationFactory.create(eventAction, document, version, indexName, event.getJsonNode());
545452
} catch (final Exception e) {
546453
LOG.error("An exception occurred while constructing the bulk operation for a document: ", e);
547454
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);

0 commit comments

Comments
 (0)