Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression,
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 799.0 : 2058.0;
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 830.0 : 2058.0;
assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
}
Expand Down Expand Up @@ -615,7 +615,7 @@ void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression,
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1085.0 : 2072.0;
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1114.0 : 2072.0;
assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));

Expand Down Expand Up @@ -677,7 +677,7 @@ void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompression,
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 376.0 : 265.0;
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 410.0 : 265.0;
assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private final String pipeline;
private final String action;
private final List<ActionConfiguration> actions;
private final ScriptManager scriptManager;
private final String documentRootKey;
private String configuredIndexAlias;
private final ReentrantLock lock;
Expand Down Expand Up @@ -208,6 +209,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.routing = openSearchSinkConfig.getIndexConfiguration().getRouting();
this.action = openSearchSinkConfig.getIndexConfiguration().getAction();
this.actions = openSearchSinkConfig.getIndexConfiguration().getActions();
this.scriptManager = new ScriptManager(openSearchSinkConfig.getIndexConfiguration().getScriptConfiguration(), expressionEvaluator);
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType();
this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression();
Expand Down Expand Up @@ -342,7 +344,7 @@ public boolean isReady() {
return initialized;
}

private BulkOperation getBulkOperationForAction(final String action,
BulkOperation getBulkOperationForAction(final String action,
final SerializedJson document,
final Long version,
final String indexName,
Expand Down Expand Up @@ -380,18 +382,22 @@ private BulkOperation getBulkOperationForAction(final String action,
}


final UpdateOperation.Builder<Object> updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ?
new UpdateOperation.Builder<>()
final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString());
final UpdateOperation.Builder<Object> updateOperationBuilder = new UpdateOperation.Builder<>()
.index(indexName)
.document(filteredJsonNode)
.upsert(filteredJsonNode)
.versionType(versionType)
.version(version) :
new UpdateOperation.Builder<>()
.index(indexName)
.document(filteredJsonNode)
.versionType(versionType)
.version(version);

if (scriptManager.isScriptEnabled()) {
updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null)));
updateOperationBuilder.upsert(filteredJsonNode);
updateOperationBuilder.scriptedUpsert(true);
} else if (isUpsert) {
updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode);
} else {
updateOperationBuilder.document(filteredJsonNode);
}

docId.ifPresent(updateOperationBuilder::id);
routing.ifPresent(updateOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
Expand Down Expand Up @@ -596,7 +602,12 @@ SerializedJson getDocument(final Event event) {

final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys());

return SerializedJson.fromStringAndOptionals(document, docId, routingValue, null);
return SerializedJson.builder()
.withJsonString(document)
.withDocumentId(docId)
.withRoutingField(routingValue)
.withResolvedScriptParameters(scriptManager.resolveParams(event))
.build();
}

private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.sink.opensearch;

import com.fasterxml.jackson.databind.JsonNode;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch._types.InlineScript;
import org.opensearch.client.opensearch._types.Script;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration;

import java.util.HashMap;
import java.util.Map;

public class ScriptManager {

private static final String PAINLESS = "painless";
private static final String DOC_PARAM = "doc";

private final ScriptConfiguration scriptConfiguration;
private final ExpressionEvaluator expressionEvaluator;

public ScriptManager(final ScriptConfiguration scriptConfiguration, final ExpressionEvaluator expressionEvaluator) {
this.scriptConfiguration = scriptConfiguration;
this.expressionEvaluator = expressionEvaluator;
}

public boolean isScriptEnabled() {
return scriptConfiguration != null;
}

public Map<String, Object> resolveParams(final Event event) {
if (scriptConfiguration == null || scriptConfiguration.getParams() == null) {
return null;
}
final Map<String, Object> resolved = new HashMap<>();
for (final Map.Entry<String, Object> entry : scriptConfiguration.getParams().entrySet()) {
final Object value = entry.getValue();
if (value instanceof String && ((String) value).contains("${")) {
resolved.put(entry.getKey(), event.formatString((String) value, expressionEvaluator));
} else {
resolved.put(entry.getKey(), value);
}
}
return resolved;
}

public Script buildScript(final JsonNode jsonNode, final Map<String, Object> resolvedParams) {
final Map<String, JsonData> scriptParams = new HashMap<>();
scriptParams.put(DOC_PARAM, JsonData.of(jsonNode));
if (resolvedParams != null) {
resolvedParams.forEach((k, v) -> scriptParams.put(k, JsonData.of(v)));
}
final InlineScript inlineScript = new InlineScript.Builder()
.source(scriptConfiguration.getSource())
.lang(PAINLESS)
.params(scriptParams)
.build();
return Script.of(s -> s.inline(inlineScript));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -18,6 +19,7 @@ public interface SerializedJson extends SizedDocument {
Optional<String> getDocumentId();
Optional<String> getRoutingField();
Optional<String> getPipelineField();
Optional<Map<String, Object>> getResolvedScriptParameters();

/**
* Creates a new {@link SerializedJson} from a JSON string and optional documentId and routingField.
Expand All @@ -30,11 +32,54 @@ public interface SerializedJson extends SizedDocument {
*/
static SerializedJson fromStringAndOptionals(String jsonString, String docId, String routingField, String pipelineField) {
Objects.requireNonNull(jsonString);
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField);
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField, null);
}

static SerializedJson fromJsonNode(final JsonNode jsonNode, SerializedJson document) {
return new SerializedJsonNode(jsonNode, document);
}
}

static Builder builder() {
return new Builder();
}

class Builder {
private String jsonString;
private String documentId;
private String routingField;
private String pipelineField;
private Map<String, Object> resolvedScriptParameters;

public Builder withJsonString(final String jsonString) {
this.jsonString = jsonString;
return this;
}

public Builder withDocumentId(final String documentId) {
this.documentId = documentId;
return this;
}

public Builder withRoutingField(final String routingField) {
this.routingField = routingField;
return this;
}

public Builder withPipelineField(final String pipelineField) {
this.pipelineField = pipelineField;
return this;
}

public Builder withResolvedScriptParameters(final Map<String, Object> resolvedScriptParameters) {
this.resolvedScriptParameters = resolvedScriptParameters;
return this;
}

public SerializedJson build() {
Objects.requireNonNull(jsonString);
return new SerializedJsonImpl(
jsonString.getBytes(StandardCharsets.UTF_8),
documentId, routingField, pipelineField, resolvedScriptParameters);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;

class SerializedJsonImpl implements SerializedJson, Serializable {
private byte[] document;
private String documentId = null;
private String routingField = null;
private String pipelineField = null;
private Map<String, Object> resolvedScriptParameters = null;

public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField) {
public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField, Map<String, Object> resolvedScriptParameters) {
this.document = document;
this.documentId = docId;
this.routingField = routingField;
this.pipelineField = pipelineField;
this.resolvedScriptParameters = resolvedScriptParameters;
}

public SerializedJsonImpl(final byte[] document) {
Expand Down Expand Up @@ -47,4 +50,9 @@ public Optional<String> getRoutingField() {

@Override
public Optional<String> getPipelineField() { return Optional.ofNullable(pipelineField); }

@Override
public Optional<Map<String, Object>> getResolvedScriptParameters() {
return Optional.ofNullable(resolvedScriptParameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;

class SerializedJsonNode implements SerializedJson, Serializable {
Expand All @@ -15,13 +16,15 @@ class SerializedJsonNode implements SerializedJson, Serializable {
private String documentId = null;
private String routingField = null;
private String pipelineField = null;
private Map<String, Object> resolvedScriptParameters = null;

public SerializedJsonNode(final JsonNode jsonNode, SerializedJson doc) {
this.jsonNode = jsonNode;
this.documentId = doc.getDocumentId().orElse(null);
this.routingField = doc.getRoutingField().orElse(null);
this.document = jsonNode.toString().getBytes();
this.pipelineField = doc.getPipelineField().orElse(null);;
this.pipelineField = doc.getPipelineField().orElse(null);
this.resolvedScriptParameters = doc.getResolvedScriptParameters().orElse(null);
}

public SerializedJsonNode(final JsonNode jsonNode) {
Expand Down Expand Up @@ -51,4 +54,9 @@ public Optional<String> getRoutingField() {

@Override
public Optional<String> getPipelineField() { return Optional.ofNullable(pipelineField); }

@Override
public Optional<Map<String, Object>> getResolvedScriptParameters() {
return Optional.ofNullable(resolvedScriptParameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ public boolean getEnableRequestCompression() {
@JsonProperty("actions")
private List<ActionConfiguration> actions = null;

@Getter
@Valid
@JsonProperty("script")
private ScriptConfiguration scriptConfiguration = null;

@AssertTrue(message = "script can only be used with update or upsert actions.")
public boolean isScriptActionValid() {
if (scriptConfiguration == null) {
return true;
}
final String effectiveAction = action != null ? action : OpenSearchBulkActions.INDEX.toString();
return effectiveAction.contains("${") ||
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPDATE.toString()) ||
effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPSERT.toString());
}

@Getter
@JsonProperty("document_root_key")
private String documentRootKey = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import lombok.Getter;

import java.util.Map;

public class ScriptConfiguration {

@Getter
@NotEmpty
@JsonProperty("source")
private String source;

@Getter
@JsonProperty("params")
private Map<String, Object> params;
}
Loading
Loading