Skip to content

Commit 3bd796d

Browse files
authored
Add generic script support for update/upsert operations in OpenSearch sink (#6744)
* Add generic script support for update/upsert operations in OpenSearch sink Adds the ability to configure a script on the OpenSearch sink that gets applied to update and upsert bulk operations. This is a generic mechanism that passes script source and params through to OpenSearch's bulk API. Script language is hardcoded to painless. The event document is automatically passed as params.doc. scripted_upsert is always true so the script runs on every write including the first create. Script param values support ${} expression syntax for dynamic resolution from event fields or metadata. Configuration example: sink: - opensearch: hosts: ["https://localhost:9200"] index: "my-index" action: "upsert" document_id: "${/id}" script: source: "ctx._source.putAll(params.doc); ctx._source.source = params.table" params: table: "${getMetadata(\"table_name\")}" Resolves #3563 Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Fix checkstyle unused imports and update compressed bulk request size expectations Remove unused imports (ScriptConfiguration, HashMap, Map) from OpenSearchSink. Update expected compressed bulk request sizes in integration tests to account for the additional resolvedScriptParameters field in SerializedJsonImpl. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent 681d118 commit 3bd796d

12 files changed

Lines changed: 623 additions & 19 deletions

File tree

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression,
529529
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
530530
assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
531531
assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
532-
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 799.0 : 2058.0;
532+
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 830.0 : 2058.0;
533533
assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
534534
assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
535535
}
@@ -615,7 +615,7 @@ void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression,
615615
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
616616
assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
617617
assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
618-
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1085.0 : 2072.0;
618+
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1114.0 : 2072.0;
619619
assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
620620
assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
621621

@@ -677,7 +677,7 @@ void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompression,
677677
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
678678
assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
679679
assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
680-
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 376.0 : 265.0;
680+
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 410.0 : 265.0;
681681
assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
682682
assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
683683

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
139139
private final String pipeline;
140140
private final String action;
141141
private final List<ActionConfiguration> actions;
142+
private final ScriptManager scriptManager;
142143
private final String documentRootKey;
143144
private String configuredIndexAlias;
144145
private final ReentrantLock lock;
@@ -208,6 +209,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
208209
this.routing = openSearchSinkConfig.getIndexConfiguration().getRouting();
209210
this.action = openSearchSinkConfig.getIndexConfiguration().getAction();
210211
this.actions = openSearchSinkConfig.getIndexConfiguration().getActions();
212+
this.scriptManager = new ScriptManager(openSearchSinkConfig.getIndexConfiguration().getScriptConfiguration(), expressionEvaluator);
211213
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
212214
this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType();
213215
this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression();
@@ -342,7 +344,7 @@ public boolean isReady() {
342344
return initialized;
343345
}
344346

345-
private BulkOperation getBulkOperationForAction(final String action,
347+
BulkOperation getBulkOperationForAction(final String action,
346348
final SerializedJson document,
347349
final Long version,
348350
final String indexName,
@@ -380,18 +382,22 @@ private BulkOperation getBulkOperationForAction(final String action,
380382
}
381383

382384

383-
final UpdateOperation.Builder<Object> updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ?
384-
new UpdateOperation.Builder<>()
385+
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
386+
final UpdateOperation.Builder<Object> updateOperationBuilder = new UpdateOperation.Builder<>()
385387
.index(indexName)
386-
.document(filteredJsonNode)
387-
.upsert(filteredJsonNode)
388-
.versionType(versionType)
389-
.version(version) :
390-
new UpdateOperation.Builder<>()
391-
.index(indexName)
392-
.document(filteredJsonNode)
393388
.versionType(versionType)
394389
.version(version);
390+
391+
if (scriptManager.isScriptEnabled()) {
392+
updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null)));
393+
updateOperationBuilder.upsert(filteredJsonNode);
394+
updateOperationBuilder.scriptedUpsert(true);
395+
} else if (isUpsert) {
396+
updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode);
397+
} else {
398+
updateOperationBuilder.document(filteredJsonNode);
399+
}
400+
395401
docId.ifPresent(updateOperationBuilder::id);
396402
routing.ifPresent(updateOperationBuilder::routing);
397403
bulkOperation = new BulkOperation.Builder()
@@ -596,7 +602,12 @@ SerializedJson getDocument(final Event event) {
596602

597603
final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys());
598604

599-
return SerializedJson.fromStringAndOptionals(document, docId, routingValue, null);
605+
return SerializedJson.builder()
606+
.withJsonString(document)
607+
.withDocumentId(docId)
608+
.withRoutingField(routingValue)
609+
.withResolvedScriptParameters(scriptManager.resolveParams(event))
610+
.build();
600611
}
601612

602613
private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch;
12+
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
import org.opensearch.client.json.JsonData;
15+
import org.opensearch.client.opensearch._types.InlineScript;
16+
import org.opensearch.client.opensearch._types.Script;
17+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
18+
import org.opensearch.dataprepper.model.event.Event;
19+
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
public class ScriptManager {
25+
26+
private static final String PAINLESS = "painless";
27+
private static final String DOC_PARAM = "doc";
28+
29+
private final ScriptConfiguration scriptConfiguration;
30+
private final ExpressionEvaluator expressionEvaluator;
31+
32+
public ScriptManager(final ScriptConfiguration scriptConfiguration, final ExpressionEvaluator expressionEvaluator) {
33+
this.scriptConfiguration = scriptConfiguration;
34+
this.expressionEvaluator = expressionEvaluator;
35+
}
36+
37+
public boolean isScriptEnabled() {
38+
return scriptConfiguration != null;
39+
}
40+
41+
public Map<String, Object> resolveParams(final Event event) {
42+
if (scriptConfiguration == null || scriptConfiguration.getParams() == null) {
43+
return null;
44+
}
45+
final Map<String, Object> resolved = new HashMap<>();
46+
for (final Map.Entry<String, Object> entry : scriptConfiguration.getParams().entrySet()) {
47+
final Object value = entry.getValue();
48+
if (value instanceof String && ((String) value).contains("${")) {
49+
resolved.put(entry.getKey(), event.formatString((String) value, expressionEvaluator));
50+
} else {
51+
resolved.put(entry.getKey(), value);
52+
}
53+
}
54+
return resolved;
55+
}
56+
57+
public Script buildScript(final JsonNode jsonNode, final Map<String, Object> resolvedParams) {
58+
final Map<String, JsonData> scriptParams = new HashMap<>();
59+
scriptParams.put(DOC_PARAM, JsonData.of(jsonNode));
60+
if (resolvedParams != null) {
61+
resolvedParams.forEach((k, v) -> scriptParams.put(k, JsonData.of(v)));
62+
}
63+
final InlineScript inlineScript = new InlineScript.Builder()
64+
.source(scriptConfiguration.getSource())
65+
.lang(PAINLESS)
66+
.params(scriptParams)
67+
.build();
68+
return Script.of(s -> s.inline(inlineScript));
69+
}
70+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.databind.JsonNode;
99
import java.nio.charset.StandardCharsets;
10+
import java.util.Map;
1011
import java.util.Objects;
1112
import java.util.Optional;
1213

@@ -18,6 +19,7 @@ public interface SerializedJson extends SizedDocument {
1819
Optional<String> getDocumentId();
1920
Optional<String> getRoutingField();
2021
Optional<String> getPipelineField();
22+
Optional<Map<String, Object>> getResolvedScriptParameters();
2123

2224
/**
2325
* Creates a new {@link SerializedJson} from a JSON string and optional documentId and routingField.
@@ -30,11 +32,54 @@ public interface SerializedJson extends SizedDocument {
3032
*/
3133
static SerializedJson fromStringAndOptionals(String jsonString, String docId, String routingField, String pipelineField) {
3234
Objects.requireNonNull(jsonString);
33-
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField);
35+
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField, null);
3436
}
3537

3638
static SerializedJson fromJsonNode(final JsonNode jsonNode, SerializedJson document) {
3739
return new SerializedJsonNode(jsonNode, document);
3840
}
39-
}
4041

42+
static Builder builder() {
43+
return new Builder();
44+
}
45+
46+
class Builder {
47+
private String jsonString;
48+
private String documentId;
49+
private String routingField;
50+
private String pipelineField;
51+
private Map<String, Object> resolvedScriptParameters;
52+
53+
public Builder withJsonString(final String jsonString) {
54+
this.jsonString = jsonString;
55+
return this;
56+
}
57+
58+
public Builder withDocumentId(final String documentId) {
59+
this.documentId = documentId;
60+
return this;
61+
}
62+
63+
public Builder withRoutingField(final String routingField) {
64+
this.routingField = routingField;
65+
return this;
66+
}
67+
68+
public Builder withPipelineField(final String pipelineField) {
69+
this.pipelineField = pipelineField;
70+
return this;
71+
}
72+
73+
public Builder withResolvedScriptParameters(final Map<String, Object> resolvedScriptParameters) {
74+
this.resolvedScriptParameters = resolvedScriptParameters;
75+
return this;
76+
}
77+
78+
public SerializedJson build() {
79+
Objects.requireNonNull(jsonString);
80+
return new SerializedJsonImpl(
81+
jsonString.getBytes(StandardCharsets.UTF_8),
82+
documentId, routingField, pipelineField, resolvedScriptParameters);
83+
}
84+
}
85+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,22 @@
66
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;
77

88
import java.io.Serializable;
9+
import java.util.Map;
910
import java.util.Optional;
1011

1112
class SerializedJsonImpl implements SerializedJson, Serializable {
1213
private byte[] document;
1314
private String documentId = null;
1415
private String routingField = null;
1516
private String pipelineField = null;
17+
private Map<String, Object> resolvedScriptParameters = null;
1618

17-
public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField) {
19+
public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField, Map<String, Object> resolvedScriptParameters) {
1820
this.document = document;
1921
this.documentId = docId;
2022
this.routingField = routingField;
2123
this.pipelineField = pipelineField;
24+
this.resolvedScriptParameters = resolvedScriptParameters;
2225
}
2326

2427
public SerializedJsonImpl(final byte[] document) {
@@ -47,4 +50,9 @@ public Optional<String> getRoutingField() {
4750

4851
@Override
4952
public Optional<String> getPipelineField() { return Optional.ofNullable(pipelineField); }
53+
54+
@Override
55+
public Optional<Map<String, Object>> getResolvedScriptParameters() {
56+
return Optional.ofNullable(resolvedScriptParameters);
57+
}
5058
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.databind.JsonNode;
99
import java.io.Serializable;
10+
import java.util.Map;
1011
import java.util.Optional;
1112

1213
class SerializedJsonNode implements SerializedJson, Serializable {
@@ -15,13 +16,15 @@ class SerializedJsonNode implements SerializedJson, Serializable {
1516
private String documentId = null;
1617
private String routingField = null;
1718
private String pipelineField = null;
19+
private Map<String, Object> resolvedScriptParameters = null;
1820

1921
public SerializedJsonNode(final JsonNode jsonNode, SerializedJson doc) {
2022
this.jsonNode = jsonNode;
2123
this.documentId = doc.getDocumentId().orElse(null);
2224
this.routingField = doc.getRoutingField().orElse(null);
2325
this.document = jsonNode.toString().getBytes();
24-
this.pipelineField = doc.getPipelineField().orElse(null);;
26+
this.pipelineField = doc.getPipelineField().orElse(null);
27+
this.resolvedScriptParameters = doc.getResolvedScriptParameters().orElse(null);
2528
}
2629

2730
public SerializedJsonNode(final JsonNode jsonNode) {
@@ -51,4 +54,9 @@ public Optional<String> getRoutingField() {
5154

5255
@Override
5356
public Optional<String> getPipelineField() { return Optional.ofNullable(pipelineField); }
57+
58+
@Override
59+
public Optional<Map<String, Object>> getResolvedScriptParameters() {
60+
return Optional.ofNullable(resolvedScriptParameters);
61+
}
5462
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,22 @@ public boolean getEnableRequestCompression() {
164164
@JsonProperty("actions")
165165
private List<ActionConfiguration> actions = null;
166166

167+
@Getter
168+
@Valid
169+
@JsonProperty("script")
170+
private ScriptConfiguration scriptConfiguration = null;
171+
172+
@AssertTrue(message = "script can only be used with update or upsert actions.")
173+
public boolean isScriptActionValid() {
174+
if (scriptConfiguration == null) {
175+
return true;
176+
}
177+
final String effectiveAction = action != null ? action : OpenSearchBulkActions.INDEX.toString();
178+
return effectiveAction.contains("${") ||
179+
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPDATE.toString()) ||
180+
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPSERT.toString());
181+
}
182+
167183
@Getter
168184
@JsonProperty("document_root_key")
169185
private String documentRootKey = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch.configuration;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.constraints.NotEmpty;
15+
import lombok.Getter;
16+
17+
import java.util.Map;
18+
19+
public class ScriptConfiguration {
20+
21+
@Getter
22+
@NotEmpty
23+
@JsonProperty("source")
24+
private String source;
25+
26+
@Getter
27+
@JsonProperty("params")
28+
private Map<String, Object> params;
29+
}

0 commit comments

Comments
 (0)