Add common sink framework for DataPrepper sinks#6183
Conversation
Signed-off-by: Kondaka <krishkdk@amazon.com>
KarstenSchnitter
left a comment
There was a problem hiding this comment.
Thanks for this PR. Looks good. I have some questions on error handling, nothing to major.
| public interface SinkFlushContext { | ||
| } |
There was a problem hiding this comment.
What is the generic idea behind this empty interface?
There was a problem hiding this comment.
Yes, shouldn't there be a flush() method?
There was a problem hiding this comment.
flush() is in the SinkFlushableBuffer class. This is to carry sink specific information like s3Client or sqsClient or httpSender or opensearchClient. The empty class just gives a name. I felt this is better than using Object. Also, it is possible that it will have some methods in future.
There was a problem hiding this comment.
So, for example, in case of sqs sink, it will have something like
public class SqsSinkFlushContext implements SinkFlushContext {
final SqsClient sqsClient;
public SqsSinkFlushContext(final SqsClient sqsClient) {
this.sqsClient = sqsClient;
}
public SqsClient getSqsClient() {
return sqsClient;
}
}
The flush() in SqsSinkFlushableBuffer class will get sqsClient from SqsSinkFlushContext to actually push the messages to SQS.
There was a problem hiding this comment.
The idea is that SinkFlushableBuffer can be flush()ed, not SinkFlushContext. SinkFlushContext just gives some context to do the flush
There was a problem hiding this comment.
I assumed the context to be a placeholder for more specialiced interfaces. I was just wondering, why this abstraction is introduced, when it is currently not used. I am fine with having it available to migrate the existing sinks to this pattern.
| if (sinkBuffer.exceedsFlushTimeInterval()) { | ||
| flushBuffer(); | ||
| } |
There was a problem hiding this comment.
You want to flush even when there are no records? Is this reasonable?
There was a problem hiding this comment.
Good catch. We should check that there is something present.
There was a problem hiding this comment.
There may not be records in the current execution but the buffer may have records. Ofcourse. If the buffer doesn't have anything, this does nothing.
There was a problem hiding this comment.
This is very important because imagine what happens if sink output() is called with 1 record every time and it never hits maxEvents or maxRequestSize limits (or it takes an hour to hit these limits). We have the third "time based" limit to flush. So, whatever accumulated in the sink buffer will be flushed if the time limit is reached.
There was a problem hiding this comment.
I wanted to make sure, this is intended behavior. It could have been an unintended side-effect. Thanks for clarifying this.
| } | ||
|
|
||
| if (!sinkBuffer.addToBuffer(bufferEntry)) { | ||
| throw new RuntimeException("Failed to add event to sink buffer"); |
There was a problem hiding this comment.
Same question on dropped events here.
There was a problem hiding this comment.
Same answer as above.
| import java.time.Instant; | ||
|
|
||
| public class DefaultSinkBuffer implements SinkBuffer { | ||
| final SinkBufferWriter sinkBufferWriter; |
| private final DistributionSummary sinkEventSize; | ||
|
|
||
| public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix, final String eventName) { | ||
| this.sinkRequestsSucceeded = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_SUCCEEDED); |
There was a problem hiding this comment.
Why do we need a sinkPrefix? The plugin name should be part of the name already.
There was a problem hiding this comment.
Ok, will delete the prefix.
| public interface SinkFlushContext { | ||
| } |
There was a problem hiding this comment.
Yes, shouldn't there be a flush() method?
| public abstract void flushDLQList(); | ||
| public abstract void addFailedEventsToDLQ(final List<Event> events, final Throwable ex); | ||
| public abstract SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception; | ||
| public abstract boolean exceedsMaxEventSizeThreshold(final long estimatedSize); |
There was a problem hiding this comment.
This would be better as long getMaxEventSizeThreshold(). Then the comparison happens in the common code.
There was a problem hiding this comment.
flush() is in SinkFlushableBuffer class
| } | ||
| } | ||
|
|
||
| public abstract void flushDLQList(); |
There was a problem hiding this comment.
I still think these should be handled as injected interfaces. Think about writing a sink. You don't need to mix these into the same class because they are not strongly coupled together.
For example, getting a SinkBufferEntry is about serialization. This is a very different concern from writing to the DLQ.
e.g.
public interface SinkBufferEntryProvider {
SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception;
}
There was a problem hiding this comment.
@dlvenable OK, I will modify to add interfaces
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
|
|
||
| public interface SinkBufferEntryProvider { | ||
| SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception; | ||
| boolean exceedsMaxEventSizeThreshold(final long estimatedSize); |
There was a problem hiding this comment.
I'd recommend that this take in the SinkBufferEntry. That way, if max size is somehow dynamic you'd have more context.
|
|
||
| import java.util.List; | ||
|
|
||
| public interface SinkDLQHandler { |
There was a problem hiding this comment.
Please rename to use Dlq to match existing naming conventions.
SinkDlqHandler
| import java.util.List; | ||
|
|
||
| public interface SinkDLQHandler { | ||
| void flushDLQList(); |
|
|
||
| public interface SinkDLQHandler { | ||
| void flushDLQList(); | ||
| void addFailedEventsToDLQ(final List<Event> events, final Throwable ex); |
Signed-off-by: Kondaka <krishkdk@amazon.com>
|
|
||
| import java.time.Instant; | ||
|
|
||
| public class DefaultSinkBuffer implements SinkBuffer { |
There was a problem hiding this comment.
Curious how this will be applied to all sinks? Or is it only meant to be applied for some?
There was a problem hiding this comment.
@graytaylor0 DefaultSinkBuffer is expected to extended by each sink.
Description
Add common sink framework for DataPrepper sinks. Include main execution loop, flexible flushing, sink level metrics.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.