Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
Expand All @@ -40,6 +43,7 @@
import java.io.OutputStream;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -79,14 +83,23 @@ public class KafkaCustomProducer<T> {

private final CompressionOption compressionConfig;

private final SinkContext sinkContext;

private final SinkForwardRecordsContext sinkForwardRecordsContext;

private final HeadlessPipeline failurePipeline;

public KafkaCustomProducer(final KafkaProducer producer,
final KafkaProducerConfig kafkaProducerConfig,
final DLQSink dlqSink,
final ExpressionEvaluator expressionEvaluator,
final String tagTargetKey,
final KafkaTopicProducerMetrics topicMetrics,
final SchemaService schemaService,
final CompressionOption compressionConfig
final CompressionOption compressionConfig,
final SinkContext sinkContext,
final SinkForwardRecordsContext sinkForwardRecordsContext,
final HeadlessPipeline failurePipeline
) {
this.producer = producer;
this.kafkaProducerConfig = kafkaProducerConfig;
Expand All @@ -100,6 +113,9 @@ public KafkaCustomProducer(final KafkaProducer producer,
this.topicMetrics = topicMetrics;
this.topicMetrics.register(this.producer);
this.compressionConfig = (compressionConfig == null) ? CompressionOption.NONE: compressionConfig;
this.sinkContext = sinkContext;
this.sinkForwardRecordsContext = sinkForwardRecordsContext;
this.failurePipeline = failurePipeline;
}

public KafkaCustomProducer(final KafkaProducer producer,
Expand All @@ -110,7 +126,7 @@ public KafkaCustomProducer(final KafkaProducer producer,
final KafkaTopicProducerMetrics topicMetrics,
final SchemaService schemaService
) {
this(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, tagTargetKey, topicMetrics, schemaService, null);
this(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, tagTargetKey, topicMetrics, schemaService, null, null, null, null);
}

KafkaTopicProducerMetrics getTopicMetrics() {
Expand Down Expand Up @@ -162,7 +178,14 @@ public void produceRecords(final Record<Event> record) throws Exception {
} catch (Exception e) {
LOG.error("Error occurred while publishing record {}", e.getMessage());
topicMetrics.getNumberOfRecordSendErrors().increment();
if (dlqSink != null) {
if (failurePipeline != null) {
record.getData().updateFailureMetadata()
.withPluginName("kafka")
.withPipelineName(kafkaProducerConfig.getTopic() != null ? topicName : null)
.withErrorMessage(e.getMessage())
.with("topic", topicName);
failurePipeline.sendEvents(List.of(record));
} else if (dlqSink != null) {
JsonNode dataNode = record.getData().getJsonNode();
dlqSink.perform(dataNode, e);
} else {
Expand All @@ -182,7 +205,7 @@ private void publishJsonMessageAsBytes(Record<Event> record, String key) throws
compressedOutputStream.write(bytes);
compressedOutputStream.close();

send(topicName, key, byteArrayOutputStream.toByteArray());
send(topicName, key, byteArrayOutputStream.toByteArray(), record);
}

private Event getEvent(final Record<Event> record) {
Expand All @@ -197,7 +220,7 @@ private Event getEvent(final Record<Event> record) {


private void publishPlaintextMessage(final Record<Event> record, final String key) throws Exception {
send(topicName, key, record.getData().toJsonString());
send(topicName, key, record.getData().toJsonString(), record);
}

private void publishAvroMessage(final Record<Event> record, final String key) throws Exception {
Expand All @@ -206,20 +229,24 @@ private void publishAvroMessage(final Record<Event> record, final String key) th
throw new RuntimeException("Schema definition is mandatory in case of type avro");
}
final GenericRecord genericRecord = getGenericRecord(record.getData(), avroSchema);
send(topicName, key, genericRecord);
send(topicName, key, genericRecord, record);
}

Future send(final String topicName, String key, final Object record) throws Exception {
Future send(final String topicName, String key, final Object record, final Record<Event> originalRecord) throws Exception {
ProducerRecord producerRecord = Objects.isNull(key) ?
new ProducerRecord(topicName, record) :
new ProducerRecord(topicName, key, record);

return producer.send(producerRecord, callBack(record));
return producer.send(producerRecord, callBack(originalRecord));
}

Future send(final String topicName, String key, final Object record) throws Exception {
return send(topicName, key, record, null);
}

private void publishJsonMessage(final Record<Event> record, final String key) throws IOException, ProcessingException, Exception {
JsonNode dataNode = record.getData().getJsonNode();
send(topicName, key, dataNode);
send(topicName, key, dataNode, record);
}

public boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException {
Expand All @@ -232,13 +259,30 @@ public boolean validateSchema(final String jsonData, final String schemaJson) th
return report != null ? report.isSuccess() : false;
}

private Callback callBack(final Object dataForDlq) {
private Callback callBack(final Record<Event> originalRecord) {
return (metadata, exception) -> {
if (null != exception) {
LOG.error("Error occurred while publishing {}", exception.getMessage());
topicMetrics.getNumberOfRecordProcessingErrors().increment();
if (failurePipeline != null && originalRecord != null) {
originalRecord.getData().updateFailureMetadata()
.withPluginName("kafka")
.withPipelineName(kafkaProducerConfig.getTopic() != null ? topicName : null)
.withErrorMessage(exception.getMessage())
.with("topic", topicName);
failurePipeline.sendEvents(List.of(originalRecord));
} else if (dlqSink != null && originalRecord != null) {
dlqSink.perform(originalRecord.getData().getJsonNode(), exception);
} else {
releaseEventHandles(false);
}
} else {
releaseEventHandles(true);
if (sinkContext != null && sinkForwardRecordsContext != null && sinkContext.getForwardToPipelines().size() > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we need this?

&& sinkContext.getForwardToPipelines().size() > 0

Why would we get a SinkForwardRecordsContext if there is no pipeline to forward to?

sinkForwardRecordsContext.addRecord(originalRecord);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the OpenSearch sink forwards the OpenSearch document. Do we want to be consistent with that? This would be a little odd in this sink since it might be JSON or binary data. But it would retain some consistency.

sinkContext.forwardRecords(sinkForwardRecordsContext, null, null);
} else {
releaseEventHandles(true);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter;
Expand Down Expand Up @@ -61,11 +63,30 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
return createProducer(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, CompressionOption.NONE);
}

public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final DLQSink dlqSink,
final boolean topicNameInMetrics,
final SinkForwardRecordsContext sinkForwardRecordsContext,
final HeadlessPipeline failurePipeline) {
return createProducerInternal(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, CompressionOption.NONE, sinkForwardRecordsContext, failurePipeline);
}

public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final DLQSink dlqSink,
final boolean topicNameInMetrics,
final CompressionOption manualCompressionConfig) {
return createProducerInternal(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, manualCompressionConfig, null, null);
}

private KafkaCustomProducer createProducerInternal(final KafkaProducerConfig kafkaProducerConfig,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final DLQSink dlqSink,
final boolean topicNameInMetrics,
final CompressionOption manualCompressionConfig,
final SinkForwardRecordsContext sinkForwardRecordsContext,
final HeadlessPipeline failurePipeline) {
AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier);
KeyFactory keyFactory = new KeyFactory(awsContext);
// If either or both of Producer's max_request_size or
Expand Down Expand Up @@ -95,7 +116,8 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
final SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
return new KafkaCustomProducer(producer,
kafkaProducerConfig, dlqSink,
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService, manualCompressionConfig);
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService, manualCompressionConfig,
sinkContext, sinkForwardRecordsContext, failurePipeline);
}

private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class KafkaSink extends AbstractSink<Record<Event>> {

private final SinkContext sinkContext;

private final SinkForwardRecordsContext sinkForwardRecordsContext;


@DataPrepperPluginConstructor
public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaSinkConfig, final PluginFactory pluginFactory,
Expand All @@ -86,6 +89,7 @@ public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaS
this.expressionEvaluator = expressionEvaluator;
reentrantLock = new ReentrantLock();
this.sinkContext = sinkContext;
this.sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);

SerializationFactory serializationFactory = new CommonSerializationFactory();
topicServiceFactory = new TopicServiceFactory();
Expand Down Expand Up @@ -167,7 +171,8 @@ private void checkTopicCreationCriteriaAndCreateTopic() {

private KafkaCustomProducer createProducer() {
final DLQSink dlqSink = new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true,
sinkForwardRecordsContext, getFailurePipeline());
}


Expand Down
Loading
Loading