Add Common Sink Output Strategies#6100
Conversation
|
|
||
| public class DefaultSinkMetrics implements SinkMetrics { | ||
| public static final String DEFAULT_EVENT_NAME = "Event"; | ||
| public static final String SINK_REQUESTS_SUCCEEDED = "SinkRequestsSucceeded"; |
There was a problem hiding this comment.
Using terms like Sink seems to add bloat to the names and also doesn't match our existing conventions.
Most metrics use lowerCamelCase.
|
|
||
| public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix, final String eventName) { | ||
| this.sinkRequestsSucceeded = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_SUCCEEDED); | ||
| this.sinkEventsSucceeded = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sSucceeded"); |
There was a problem hiding this comment.
How many metrics might this create? What is eventName?
There was a problem hiding this comment.
For AMP sink, it would be "metric", for XRAY sink it would be "span" for opensearch sink it could be "document" and so on
| import java.util.Collection; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
|
|
||
| public abstract class DefaultSinkOutputStrategy implements SinkOutputStrategy { |
There was a problem hiding this comment.
This is more like a template pattern than a strategy.
I'd recommend that we have a solution that favors composition of these different pieces rather than giving the whole thing.
There was a problem hiding this comment.
I wanted to provide a top level template so that implementations do not differ a lot. Like we observed, the cloudwatch_logs sink implementation was so different we might want to re-implement it along the lines of this template
There was a problem hiding this comment.
We should have unit tests for this.
|
|
||
| public abstract class RetrySinkOutputStrategy extends DefaultSinkOutputStrategy { | ||
| private static final Logger LOG = LoggerFactory.getLogger(RetrySinkOutputStrategy.class); | ||
| private static final long INITIAL_DELAY_MS = 10; |
There was a problem hiding this comment.
These should be configurable. The sinks will have different values here.
| } | ||
| } | ||
|
|
||
| public abstract void flushBuffer(); |
There was a problem hiding this comment.
There is a lot to inherit here. Again, I think using strategies for these is ideal. Basically inverting the approach.
There was a problem hiding this comment.
I guess you meant, the flush can have two different strategies with explicit retry mechanism or implicit retry mechanism as in XRAY sink (and AMP sink would be similar to XRAY in that sense)
Signed-off-by: Kondaka <krishkdk@amazon.com>
Created custom tools (GradleTestRunner, JavaCodeAnalyzer, PluginValidator) and comprehensive workspace summary for data-prepper development.
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
ae58d34 to
695907d
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thanks! I have a few more comments on this.
| return estimatedSize >= maxEventSize; | ||
| } | ||
|
|
||
| public abstract Object doFlush(Object failedStatus) throws Exception; |
There was a problem hiding this comment.
Why does this need to be abstract?
|
|
||
| public abstract Object doFlush(Object failedStatus) throws Exception; | ||
|
|
||
| public abstract long getCurrentRequestSize(); |
There was a problem hiding this comment.
How should this be implemented? Why can't the implementation track the current size itself?
It seems the only thing we need most sinks to implement is getEstimatedSize(Event).
| import java.util.Collection; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
|
|
||
| public abstract class DefaultSinkOutputStrategy implements SinkOutputStrategy { |
There was a problem hiding this comment.
We should have unit tests for this.
|
|
||
| public abstract class DefaultBufferStrategy implements BufferStrategy { | ||
| private static final Logger LOG = LoggerFactory.getLogger(DefaultBufferStrategy.class); | ||
| long lastFlushedTime; |
|
|
||
| import java.time.Instant; | ||
|
|
||
| public abstract class DefaultBufferStrategy implements BufferStrategy { |
|
Abandoned in favor of PR 6183. #6183 |
Description
Add common output strategies for Sinks. These classes can be reused by existing and future Sinks.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.