Skip to content

Add Common Sink Output Strategies#6100

Closed
kkondaka wants to merge 5 commits into
opensearch-project:mainfrom
kkondaka:sink-strategy
Closed

Add Common Sink Output Strategies#6100
kkondaka wants to merge 5 commits into
opensearch-project:mainfrom
kkondaka:sink-strategy

Conversation

@kkondaka

Copy link
Copy Markdown
Collaborator

Description

Add common output strategies for Sinks. These classes can be reused by existing and future Sinks.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.


public class DefaultSinkMetrics implements SinkMetrics {
public static final String DEFAULT_EVENT_NAME = "Event";
public static final String SINK_REQUESTS_SUCCEEDED = "SinkRequestsSucceeded";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using terms like Sink seems to add bloat to the names and also doesn't match our existing conventions.

Most metrics use lowerCamelCase.


public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix, final String eventName) {
this.sinkRequestsSucceeded = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_SUCCEEDED);
this.sinkEventsSucceeded = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sSucceeded");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many metrics might this create? What is eventName?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For AMP sink, it would be "metric", for XRAY sink it would be "span" for opensearch sink it could be "document" and so on

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public abstract class DefaultSinkOutputStrategy implements SinkOutputStrategy {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a template pattern than a strategy.

I'd recommend that we have a solution that favors composition of these different pieces rather than giving the whole thing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to provide a top level template so that implementations do not differ a lot. Like we observed, the cloudwatch_logs sink implementation was so different we might want to re-implement it along the lines of this template

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have unit tests for this.


public abstract class RetrySinkOutputStrategy extends DefaultSinkOutputStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RetrySinkOutputStrategy.class);
private static final long INITIAL_DELAY_MS = 10;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be configurable. The sinks will have different values here.

}
}

public abstract void flushBuffer();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot to inherit here. Again, I think using strategies for these is ideal. Basically inverting the approach.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you meant, the flush can have two different strategies with explicit retry mechanism or implicit retry mechanism as in XRAY sink (and AMP sink would be similar to XRAY in that sense)

Signed-off-by: Kondaka <krishkdk@amazon.com>
Created custom tools (GradleTestRunner, JavaCodeAnalyzer, PluginValidator)
and comprehensive workspace summary for data-prepper development.
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I have a few more comments on this.

return estimatedSize >= maxEventSize;
}

public abstract Object doFlush(Object failedStatus) throws Exception;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be abstract?


public abstract Object doFlush(Object failedStatus) throws Exception;

public abstract long getCurrentRequestSize();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this be implemented? Why can't the implementation track the current size itself?

It seems the only thing we need most sinks to implement is getEstimatedSize(Event).

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public abstract class DefaultSinkOutputStrategy implements SinkOutputStrategy {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have unit tests for this.


public abstract class DefaultBufferStrategy implements BufferStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultBufferStrategy.class);
long lastFlushedTime;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should all be private.


import java.time.Instant;

public abstract class DefaultBufferStrategy implements BufferStrategy {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have unit tests.

@kkondaka kkondaka closed this Oct 16, 2025
@kkondaka kkondaka deleted the sink-strategy branch October 16, 2025 22:05
@kkondaka

Copy link
Copy Markdown
Collaborator Author

Abandoned in favor of PR 6183. #6183

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants