Skip to content

Add common sink framework for DataPrepper sinks#6183

Merged
kkondaka merged 4 commits into
opensearch-project:mainfrom
kkondaka:sink-strategy
Oct 28, 2025
Merged

Add common sink framework for DataPrepper sinks#6183
kkondaka merged 4 commits into
opensearch-project:mainfrom
kkondaka:sink-strategy

Conversation

@kkondaka

@kkondaka kkondaka commented Oct 16, 2025

Copy link
Copy Markdown
Collaborator

Description

Add common sink framework for DataPrepper sinks. Include main execution loop, flexible flushing, sink level metrics.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Kondaka <krishkdk@amazon.com>

@KarstenSchnitter KarstenSchnitter left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR. Looks good. I have some questions on error handling, nothing to major.

Comment on lines +8 to +9
public interface SinkFlushContext {
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the generic idea behind this empty interface?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, shouldn't there be a flush() method?

@kkondaka kkondaka Oct 17, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() is in the SinkFlushableBuffer class. This is to carry sink specific information like s3Client or sqsClient or httpSender or opensearchClient. The empty class just gives a name. I felt this is better than using Object. Also, it is possible that it will have some methods in future.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, for example, in case of sqs sink, it will have something like

public class SqsSinkFlushContext implements SinkFlushContext {
   final SqsClient sqsClient;
    public SqsSinkFlushContext(final SqsClient sqsClient) {
        this.sqsClient = sqsClient;
    }
    public SqsClient getSqsClient() {
        return sqsClient;
    }
}

The flush() in SqsSinkFlushableBuffer class will get sqsClient from SqsSinkFlushContext to actually push the messages to SQS.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that SinkFlushableBuffer can be flush()ed, not SinkFlushContext. SinkFlushContext just gives some context to do the flush

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed the context to be a placeholder for more specialiced interfaces. I was just wondering, why this abstraction is introduced, when it is currently not used. I am fine with having it available to migrate the existing sinks to this pattern.

Comment on lines +58 to +60
if (sinkBuffer.exceedsFlushTimeInterval()) {
flushBuffer();
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want to flush even when there are no records? Is this reasonable?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. We should check that there is something present.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may not be records in the current execution but the buffer may have records. Ofcourse. If the buffer doesn't have anything, this does nothing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very important because imagine what happens if sink output() is called with 1 record every time and it never hits maxEvents or maxRequestSize limits (or it takes an hour to hit these limits). We have the third "time based" limit to flush. So, whatever accumulated in the sink buffer will be flushed if the time limit is reached.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make sure, this is intended behavior. It could have been an unintended side-effect. Thanks for clarifying this.

}

if (!sinkBuffer.addToBuffer(bufferEntry)) {
throw new RuntimeException("Failed to add event to sink buffer");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question on dropped events here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as above.

import java.time.Instant;

public class DefaultSinkBuffer implements SinkBuffer {
final SinkBufferWriter sinkBufferWriter;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should all be private.

private final DistributionSummary sinkEventSize;

public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix, final String eventName) {
this.sinkRequestsSucceeded = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_SUCCEEDED);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a sinkPrefix? The plugin name should be part of the name already.

@kkondaka kkondaka Oct 17, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will delete the prefix.

Comment on lines +8 to +9
public interface SinkFlushContext {
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, shouldn't there be a flush() method?

public abstract void flushDLQList();
public abstract void addFailedEventsToDLQ(final List<Event> events, final Throwable ex);
public abstract SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception;
public abstract boolean exceedsMaxEventSizeThreshold(final long estimatedSize);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be better as long getMaxEventSizeThreshold(). Then the comparison happens in the common code.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() is in SinkFlushableBuffer class

}
}

public abstract void flushDLQList();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think these should be handled as injected interfaces. Think about writing a sink. You don't need to mix these into the same class because they are not strongly coupled together.

For example, getting a SinkBufferEntry is about serialization. This is a very different concern from writing to the DLQ.

e.g.

public interface SinkBufferEntryProvider {
  SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception;
}

@kkondaka kkondaka Oct 17, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable OK, I will modify to add interfaces

Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>

public interface SinkBufferEntryProvider {
SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception;
boolean exceedsMaxEventSizeThreshold(final long estimatedSize);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend that this take in the SinkBufferEntry. That way, if max size is somehow dynamic you'd have more context.


import java.util.List;

public interface SinkDLQHandler {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to use Dlq to match existing naming conventions.

  • SinkDlqHandler

import java.util.List;

public interface SinkDLQHandler {
void flushDLQList();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as above: use Dlq


public interface SinkDLQHandler {
void flushDLQList();
void addFailedEventsToDLQ(final List<Event> events, final Throwable ex);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as above: use Dlq

Signed-off-by: Kondaka <krishkdk@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kkondaka for this improvement to help with consistency!


import java.time.Instant;

public class DefaultSinkBuffer implements SinkBuffer {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how this will be applied to all sinks? Or is it only meant to be applied for some?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 DefaultSinkBuffer is expected to extended by each sink.

@kkondaka kkondaka merged commit 5dd9eb1 into opensearch-project:main Oct 28, 2025
45 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants