Skip to content

Commit 78d49a6

Browse files
committed
Add generic script support for update/upsert operations in OpenSearch sink
Adds the ability to configure a script on the OpenSearch sink that gets applied to update and upsert bulk operations. This is a generic mechanism that passes script source and params through to OpenSearch's bulk API. Script language is hardcoded to painless. The event document is automatically passed as params.doc. scripted_upsert is always true so the script runs on every write including the first create. Script param values support ${} expression syntax for dynamic resolution from event fields or metadata. Configuration example: sink: - opensearch: hosts: ["https://localhost:9200"] index: "my-index" action: "upsert" document_id: "${/id}" script: source: "ctx._source.putAll(params.doc); ctx._source.source = params.table" params: table: "${getMetadata(\"table_name\")}" Resolves #3563 Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent 1ac64aa commit 78d49a6

6 files changed

Lines changed: 505 additions & 12 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
2727
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
2828
import org.opensearch.client.transport.TransportOptions;
29+
import org.opensearch.client.json.JsonData;
30+
import org.opensearch.client.opensearch._types.InlineScript;
31+
import org.opensearch.client.opensearch._types.Script;
2932
import org.opensearch.common.unit.ByteSizeUnit;
3033
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
3134
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
@@ -61,6 +64,7 @@
6164
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest;
6265
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
6366
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ActionConfiguration;
67+
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration;
6468
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.DlqConfiguration;
6569
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
6670
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation;
@@ -90,8 +94,10 @@
9094
import java.util.ArrayList;
9195
import java.util.Collection;
9296
import java.util.Collections;
97+
import java.util.HashMap;
9398
import java.util.HashSet;
9499
import java.util.List;
100+
import java.util.Map;
95101
import java.util.Objects;
96102
import java.util.Optional;
97103
import java.util.Set;
@@ -115,6 +121,8 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
115121
public static final String DYNAMIC_INDEX_DROPPED_EVENTS = "dynamicIndexDroppedEvents";
116122
public static final String INVALID_VERSION_EXPRESSION_DROPPED_EVENTS = "dynamicDocumentVersionDroppedEvents";
117123
private static final String PLUGIN_NAME = "opensearch";
124+
private static final String PAINLESS = "painless";
125+
private static final String DOC_PARAM = "doc";
118126

119127
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
120128
private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000;
@@ -139,6 +147,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
139147
private final String pipeline;
140148
private final String action;
141149
private final List<ActionConfiguration> actions;
150+
private final ScriptConfiguration scriptConfiguration;
142151
private final String documentRootKey;
143152
private String configuredIndexAlias;
144153
private final ReentrantLock lock;
@@ -208,6 +217,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
208217
this.routing = openSearchSinkConfig.getIndexConfiguration().getRouting();
209218
this.action = openSearchSinkConfig.getIndexConfiguration().getAction();
210219
this.actions = openSearchSinkConfig.getIndexConfiguration().getActions();
220+
this.scriptConfiguration = openSearchSinkConfig.getIndexConfiguration().getScriptConfiguration();
211221
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
212222
this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType();
213223
this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression();
@@ -342,11 +352,12 @@ public boolean isReady() {
342352
return initialized;
343353
}
344354

345-
private BulkOperation getBulkOperationForAction(final String action,
355+
BulkOperation getBulkOperationForAction(final String action,
346356
final SerializedJson document,
347357
final Long version,
348358
final String indexName,
349-
final JsonNode jsonNode) {
359+
final JsonNode jsonNode,
360+
final Map<String, Object> resolvedScriptParams) {
350361
BulkOperation bulkOperation;
351362
final Optional<String> docId = document.getDocumentId();
352363
final Optional<String> routing = document.getRoutingField();
@@ -380,18 +391,32 @@ private BulkOperation getBulkOperationForAction(final String action,
380391
}
381392

382393

383-
final UpdateOperation.Builder<Object> updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ?
384-
new UpdateOperation.Builder<>()
394+
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
395+
final UpdateOperation.Builder<Object> updateOperationBuilder = new UpdateOperation.Builder<>()
385396
.index(indexName)
386-
.document(filteredJsonNode)
387-
.upsert(filteredJsonNode)
388-
.versionType(versionType)
389-
.version(version) :
390-
new UpdateOperation.Builder<>()
391-
.index(indexName)
392-
.document(filteredJsonNode)
393397
.versionType(versionType)
394398
.version(version);
399+
400+
if (scriptConfiguration != null) {
401+
final Map<String, JsonData> scriptParams = new HashMap<>();
402+
scriptParams.put(DOC_PARAM, JsonData.of(jsonNode));
403+
if (resolvedScriptParams != null) {
404+
resolvedScriptParams.forEach((k, v) -> scriptParams.put(k, JsonData.of(v)));
405+
}
406+
final InlineScript inlineScript = new InlineScript.Builder()
407+
.source(scriptConfiguration.getSource())
408+
.lang(PAINLESS)
409+
.params(scriptParams)
410+
.build();
411+
updateOperationBuilder.script(Script.of(s -> s.inline(inlineScript)));
412+
updateOperationBuilder.upsert(filteredJsonNode);
413+
updateOperationBuilder.scriptedUpsert(true);
414+
} else if (isUpsert) {
415+
updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode);
416+
} else {
417+
updateOperationBuilder.document(filteredJsonNode);
418+
}
419+
395420
docId.ifPresent(updateOperationBuilder::id);
396421
routing.ifPresent(updateOperationBuilder::routing);
397422
bulkOperation = new BulkOperation.Builder()
@@ -535,7 +560,19 @@ public void doOutput(final Collection<Record<Event>> records) {
535560
BulkOperation bulkOperation;
536561

537562
try {
538-
bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode());
563+
Map<String, Object> resolvedScriptParams = null;
564+
if (scriptConfiguration != null && scriptConfiguration.getParams() != null) {
565+
resolvedScriptParams = new HashMap<>();
566+
for (final Map.Entry<String, Object> entry : scriptConfiguration.getParams().entrySet()) {
567+
final Object value = entry.getValue();
568+
if (value instanceof String && ((String) value).contains("${")) {
569+
resolvedScriptParams.put(entry.getKey(), event.formatString((String) value, expressionEvaluator));
570+
} else {
571+
resolvedScriptParams.put(entry.getKey(), value);
572+
}
573+
}
574+
}
575+
bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode(), resolvedScriptParams);
539576
} catch (final Exception e) {
540577
LOG.error("An exception occurred while constructing the bulk operation for a document: ", e);
541578
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,22 @@ public boolean getEnableRequestCompression() {
164164
@JsonProperty("actions")
165165
private List<ActionConfiguration> actions = null;
166166

167+
@Getter
168+
@Valid
169+
@JsonProperty("script")
170+
private ScriptConfiguration scriptConfiguration = null;
171+
172+
@AssertTrue(message = "script can only be used with update or upsert actions.")
173+
public boolean isScriptActionValid() {
174+
if (scriptConfiguration == null) {
175+
return true;
176+
}
177+
final String effectiveAction = action != null ? action : OpenSearchBulkActions.INDEX.toString();
178+
return effectiveAction.contains("${") ||
179+
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPDATE.toString()) ||
180+
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPSERT.toString());
181+
}
182+
167183
@Getter
168184
@JsonProperty("document_root_key")
169185
private String documentRootKey = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch.configuration;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.constraints.NotEmpty;
15+
import lombok.Getter;
16+
17+
import java.util.Map;
18+
19+
public class ScriptConfiguration {
20+
21+
@Getter
22+
@NotEmpty
23+
@JsonProperty("source")
24+
private String source;
25+
26+
@Getter
27+
@JsonProperty("params")
28+
private Map<String, Object> params;
29+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
1717
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ActionConfiguration;
1818
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration;
19+
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration;
1920
import org.opensearch.dataprepper.plugins.sink.opensearch.index.model.QueryForExistingDocumentConfiguration;
2021
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
2122
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
@@ -103,6 +104,8 @@ public class IndexConfiguration {
103104
private final VersionType versionType;
104105
private final boolean normalizeIndex;
105106

107+
private final ScriptConfiguration scriptConfiguration;
108+
106109
private final String queryWhen;
107110

108111
private final Duration queryDuration;
@@ -174,6 +177,7 @@ private IndexConfiguration(final Builder builder) {
174177
this.ismPolicyFile = builder.ismPolicyFile;
175178
this.action = builder.action;
176179
this.actions = builder.actions;
180+
this.scriptConfiguration = builder.scriptConfiguration;
177181
this.documentRootKey = builder.documentRootKey;
178182
this.queryWhen = builder.queryWhen;
179183
this.queryTerm = builder.queryTerm;
@@ -293,6 +297,11 @@ public static IndexConfiguration readIndexConfig(final OpenSearchSinkConfig open
293297
builder = builder.withAction(openSearchSinkConfig.getAction(), expressionEvaluator);
294298
}
295299

300+
final ScriptConfiguration scriptConfiguration = openSearchSinkConfig.getScriptConfiguration();
301+
if (scriptConfiguration != null) {
302+
builder = builder.withScriptConfiguration(scriptConfiguration);
303+
}
304+
296305
AwsAuthenticationConfiguration awsAuthenticationConfiguration = openSearchSinkConfig.getAwsAuthenticationOptions();
297306
if (awsAuthenticationConfiguration != null) {
298307
builder = builder.withServerless(awsAuthenticationConfiguration.isServerlessCollection());
@@ -372,6 +381,10 @@ public List<ActionConfiguration> getActions() {
372381
return actions;
373382
}
374383

384+
public ScriptConfiguration getScriptConfiguration() {
385+
return scriptConfiguration;
386+
}
387+
375388
public String getS3AwsRegion() {
376389
return s3AwsRegion;
377390
}
@@ -501,6 +514,7 @@ public static class Builder {
501514
private Optional<String> ismPolicyFile;
502515
private String action;
503516
private List<ActionConfiguration> actions;
517+
private ScriptConfiguration scriptConfiguration;
504518
private String s3AwsRegion;
505519
private String s3AwsStsRoleArn;
506520
private String s3AwsStsExternalId;
@@ -629,6 +643,11 @@ public Builder withActions(final List<ActionConfiguration> actions, final Expres
629643
return this;
630644
}
631645

646+
public Builder withScriptConfiguration(final ScriptConfiguration scriptConfiguration) {
647+
this.scriptConfiguration = scriptConfiguration;
648+
return this;
649+
}
650+
632651
public Builder withS3AwsRegion(final String s3AwsRegion) {
633652
checkNotNull(s3AwsRegion, "s3AwsRegion cannot be null");
634653
this.s3AwsRegion = s3AwsRegion;

0 commit comments

Comments
 (0)