Skip to content

Commit 9eef437

Browse files
committed
Add generic script support for update/upsert operations in OpenSearch sink
Adds the ability to configure a script on the OpenSearch sink that gets applied to update and upsert bulk operations. This is a generic mechanism that passes script source and params through to OpenSearch's bulk API. Script language is hardcoded to painless. The event document is automatically passed as params.doc. scripted_upsert is always true so the script runs on every write including the first create. Script param values support ${} expression syntax for dynamic resolution from event fields or metadata. Configuration example: sink: - opensearch: hosts: ["https://localhost:9200"] index: "my-index" action: "upsert" document_id: "${/id}" script: source: "ctx._source.putAll(params.doc); ctx._source.source = params.table" params: table: "${getMetadata(\"table_name\")}" Resolves #3563 Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent 1ac64aa commit 9eef437

11 files changed

Lines changed: 623 additions & 16 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest;
6262
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
6363
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ActionConfiguration;
64+
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration;
6465
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.DlqConfiguration;
6566
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
6667
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation;
@@ -90,8 +91,10 @@
9091
import java.util.ArrayList;
9192
import java.util.Collection;
9293
import java.util.Collections;
94+
import java.util.HashMap;
9395
import java.util.HashSet;
9496
import java.util.List;
97+
import java.util.Map;
9598
import java.util.Objects;
9699
import java.util.Optional;
97100
import java.util.Set;
@@ -139,6 +142,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
139142
private final String pipeline;
140143
private final String action;
141144
private final List<ActionConfiguration> actions;
145+
private final ScriptManager scriptManager;
142146
private final String documentRootKey;
143147
private String configuredIndexAlias;
144148
private final ReentrantLock lock;
@@ -208,6 +212,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
208212
this.routing = openSearchSinkConfig.getIndexConfiguration().getRouting();
209213
this.action = openSearchSinkConfig.getIndexConfiguration().getAction();
210214
this.actions = openSearchSinkConfig.getIndexConfiguration().getActions();
215+
this.scriptManager = new ScriptManager(openSearchSinkConfig.getIndexConfiguration().getScriptConfiguration(), expressionEvaluator);
211216
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
212217
this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType();
213218
this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression();
@@ -342,7 +347,7 @@ public boolean isReady() {
342347
return initialized;
343348
}
344349

345-
private BulkOperation getBulkOperationForAction(final String action,
350+
BulkOperation getBulkOperationForAction(final String action,
346351
final SerializedJson document,
347352
final Long version,
348353
final String indexName,
@@ -380,18 +385,22 @@ private BulkOperation getBulkOperationForAction(final String action,
380385
}
381386

382387

383-
final UpdateOperation.Builder<Object> updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ?
384-
new UpdateOperation.Builder<>()
388+
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
389+
final UpdateOperation.Builder<Object> updateOperationBuilder = new UpdateOperation.Builder<>()
385390
.index(indexName)
386-
.document(filteredJsonNode)
387-
.upsert(filteredJsonNode)
388-
.versionType(versionType)
389-
.version(version) :
390-
new UpdateOperation.Builder<>()
391-
.index(indexName)
392-
.document(filteredJsonNode)
393391
.versionType(versionType)
394392
.version(version);
393+
394+
if (scriptManager.isScriptEnabled()) {
395+
updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null)));
396+
updateOperationBuilder.upsert(filteredJsonNode);
397+
updateOperationBuilder.scriptedUpsert(true);
398+
} else if (isUpsert) {
399+
updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode);
400+
} else {
401+
updateOperationBuilder.document(filteredJsonNode);
402+
}
403+
395404
docId.ifPresent(updateOperationBuilder::id);
396405
routing.ifPresent(updateOperationBuilder::routing);
397406
bulkOperation = new BulkOperation.Builder()
@@ -596,7 +605,12 @@ SerializedJson getDocument(final Event event) {
596605

597606
final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys());
598607

599-
return SerializedJson.fromStringAndOptionals(document, docId, routingValue, null);
608+
return SerializedJson.builder()
609+
.withJsonString(document)
610+
.withDocumentId(docId)
611+
.withRoutingField(routingValue)
612+
.withResolvedScriptParameters(scriptManager.resolveParams(event))
613+
.build();
600614
}
601615

602616
private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch;
12+
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
import org.opensearch.client.json.JsonData;
15+
import org.opensearch.client.opensearch._types.InlineScript;
16+
import org.opensearch.client.opensearch._types.Script;
17+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
18+
import org.opensearch.dataprepper.model.event.Event;
19+
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
public class ScriptManager {
25+
26+
private static final String PAINLESS = "painless";
27+
private static final String DOC_PARAM = "doc";
28+
29+
private final ScriptConfiguration scriptConfiguration;
30+
private final ExpressionEvaluator expressionEvaluator;
31+
32+
public ScriptManager(final ScriptConfiguration scriptConfiguration, final ExpressionEvaluator expressionEvaluator) {
33+
this.scriptConfiguration = scriptConfiguration;
34+
this.expressionEvaluator = expressionEvaluator;
35+
}
36+
37+
public boolean isScriptEnabled() {
38+
return scriptConfiguration != null;
39+
}
40+
41+
public Map<String, Object> resolveParams(final Event event) {
42+
if (scriptConfiguration == null || scriptConfiguration.getParams() == null) {
43+
return null;
44+
}
45+
final Map<String, Object> resolved = new HashMap<>();
46+
for (final Map.Entry<String, Object> entry : scriptConfiguration.getParams().entrySet()) {
47+
final Object value = entry.getValue();
48+
if (value instanceof String && ((String) value).contains("${")) {
49+
resolved.put(entry.getKey(), event.formatString((String) value, expressionEvaluator));
50+
} else {
51+
resolved.put(entry.getKey(), value);
52+
}
53+
}
54+
return resolved;
55+
}
56+
57+
public Script buildScript(final JsonNode jsonNode, final Map<String, Object> resolvedParams) {
58+
final Map<String, JsonData> scriptParams = new HashMap<>();
59+
scriptParams.put(DOC_PARAM, JsonData.of(jsonNode));
60+
if (resolvedParams != null) {
61+
resolvedParams.forEach((k, v) -> scriptParams.put(k, JsonData.of(v)));
62+
}
63+
final InlineScript inlineScript = new InlineScript.Builder()
64+
.source(scriptConfiguration.getSource())
65+
.lang(PAINLESS)
66+
.params(scriptParams)
67+
.build();
68+
return Script.of(s -> s.inline(inlineScript));
69+
}
70+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.databind.JsonNode;
99
import java.nio.charset.StandardCharsets;
10+
import java.util.Map;
1011
import java.util.Objects;
1112
import java.util.Optional;
1213

@@ -18,6 +19,7 @@ public interface SerializedJson extends SizedDocument {
1819
Optional<String> getDocumentId();
1920
Optional<String> getRoutingField();
2021
Optional<String> getPipelineField();
22+
Optional<Map<String, Object>> getResolvedScriptParameters();
2123

2224
/**
2325
* Creates a new {@link SerializedJson} from a JSON string and optional documentId and routingField.
@@ -30,11 +32,54 @@ public interface SerializedJson extends SizedDocument {
3032
*/
3133
static SerializedJson fromStringAndOptionals(String jsonString, String docId, String routingField, String pipelineField) {
3234
Objects.requireNonNull(jsonString);
33-
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField);
35+
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField, null);
3436
}
3537

3638
static SerializedJson fromJsonNode(final JsonNode jsonNode, SerializedJson document) {
3739
return new SerializedJsonNode(jsonNode, document);
3840
}
39-
}
4041

42+
static Builder builder() {
43+
return new Builder();
44+
}
45+
46+
class Builder {
47+
private String jsonString;
48+
private String documentId;
49+
private String routingField;
50+
private String pipelineField;
51+
private Map<String, Object> resolvedScriptParameters;
52+
53+
public Builder withJsonString(final String jsonString) {
54+
this.jsonString = jsonString;
55+
return this;
56+
}
57+
58+
public Builder withDocumentId(final String documentId) {
59+
this.documentId = documentId;
60+
return this;
61+
}
62+
63+
public Builder withRoutingField(final String routingField) {
64+
this.routingField = routingField;
65+
return this;
66+
}
67+
68+
public Builder withPipelineField(final String pipelineField) {
69+
this.pipelineField = pipelineField;
70+
return this;
71+
}
72+
73+
public Builder withResolvedScriptParameters(final Map<String, Object> resolvedScriptParameters) {
74+
this.resolvedScriptParameters = resolvedScriptParameters;
75+
return this;
76+
}
77+
78+
public SerializedJson build() {
79+
Objects.requireNonNull(jsonString);
80+
return new SerializedJsonImpl(
81+
jsonString.getBytes(StandardCharsets.UTF_8),
82+
documentId, routingField, pipelineField, resolvedScriptParameters);
83+
}
84+
}
85+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,22 @@
66
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;
77

88
import java.io.Serializable;
9+
import java.util.Map;
910
import java.util.Optional;
1011

1112
class SerializedJsonImpl implements SerializedJson, Serializable {
1213
private byte[] document;
1314
private String documentId = null;
1415
private String routingField = null;
1516
private String pipelineField = null;
17+
private Map<String, Object> resolvedScriptParameters = null;
1618

17-
public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField) {
19+
public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField, Map<String, Object> resolvedScriptParameters) {
1820
this.document = document;
1921
this.documentId = docId;
2022
this.routingField = routingField;
2123
this.pipelineField = pipelineField;
24+
this.resolvedScriptParameters = resolvedScriptParameters;
2225
}
2326

2427
public SerializedJsonImpl(final byte[] document) {
@@ -47,4 +50,9 @@ public Optional<String> getRoutingField() {
4750

4851
@Override
4952
public Optional<String> getPipelineField() { return Optional.ofNullable(pipelineField); }
53+
54+
@Override
55+
public Optional<Map<String, Object>> getResolvedScriptParameters() {
56+
return Optional.ofNullable(resolvedScriptParameters);
57+
}
5058
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.databind.JsonNode;
99
import java.io.Serializable;
10+
import java.util.Map;
1011
import java.util.Optional;
1112

1213
class SerializedJsonNode implements SerializedJson, Serializable {
@@ -15,13 +16,15 @@ class SerializedJsonNode implements SerializedJson, Serializable {
1516
private String documentId = null;
1617
private String routingField = null;
1718
private String pipelineField = null;
19+
private Map<String, Object> resolvedScriptParameters = null;
1820

1921
public SerializedJsonNode(final JsonNode jsonNode, SerializedJson doc) {
2022
this.jsonNode = jsonNode;
2123
this.documentId = doc.getDocumentId().orElse(null);
2224
this.routingField = doc.getRoutingField().orElse(null);
2325
this.document = jsonNode.toString().getBytes();
24-
this.pipelineField = doc.getPipelineField().orElse(null);;
26+
this.pipelineField = doc.getPipelineField().orElse(null);
27+
this.resolvedScriptParameters = doc.getResolvedScriptParameters().orElse(null);
2528
}
2629

2730
public SerializedJsonNode(final JsonNode jsonNode) {
@@ -51,4 +54,9 @@ public Optional<String> getRoutingField() {
5154

5255
@Override
5356
public Optional<String> getPipelineField() { return Optional.ofNullable(pipelineField); }
57+
58+
@Override
59+
public Optional<Map<String, Object>> getResolvedScriptParameters() {
60+
return Optional.ofNullable(resolvedScriptParameters);
61+
}
5462
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,22 @@ public boolean getEnableRequestCompression() {
164164
@JsonProperty("actions")
165165
private List<ActionConfiguration> actions = null;
166166

167+
@Getter
168+
@Valid
169+
@JsonProperty("script")
170+
private ScriptConfiguration scriptConfiguration = null;
171+
172+
@AssertTrue(message = "script can only be used with update or upsert actions.")
173+
public boolean isScriptActionValid() {
174+
if (scriptConfiguration == null) {
175+
return true;
176+
}
177+
final String effectiveAction = action != null ? action : OpenSearchBulkActions.INDEX.toString();
178+
return effectiveAction.contains("${") ||
179+
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPDATE.toString()) ||
180+
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPSERT.toString());
181+
}
182+
167183
@Getter
168184
@JsonProperty("document_root_key")
169185
private String documentRootKey = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch.configuration;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.constraints.NotEmpty;
15+
import lombok.Getter;
16+
17+
import java.util.Map;
18+
19+
public class ScriptConfiguration {
20+
21+
@Getter
22+
@NotEmpty
23+
@JsonProperty("source")
24+
private String source;
25+
26+
@Getter
27+
@JsonProperty("params")
28+
private Map<String, Object> params;
29+
}

0 commit comments

Comments
 (0)