-
Notifications
You must be signed in to change notification settings - Fork 332
Add Common Sink Output Strategies #6100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
53c6ce4
6c93dda
32479b8
99b2e58
695907d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| import org.opensearch.dataprepper.model.event.Event; | ||
|
|
||
| public interface BufferStrategy { | ||
| long addToBuffer(final Event event, final long estimatedSize) throws Exception; | ||
| boolean flushBuffer(SinkMetrics sinkMetrics); | ||
| boolean exceedsFlushTimeInterval(); | ||
| boolean willExceedMaxRequestSizeBytes(final Event event, final long estimatedSize); | ||
| boolean isMaxEventsLimitReached(long numEvents); | ||
| boolean exceedsMaxEventSizeThreshold(final long estimatedSize); | ||
| long getEstimatedSize(final Event event) throws Exception; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.Instant; | ||
|
|
||
| public abstract class DefaultBufferStrategy implements BufferStrategy { | ||
| private static final Logger LOG = LoggerFactory.getLogger(DefaultBufferStrategy.class); | ||
| long lastFlushedTime; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should all be private. |
||
| final long maxEventSize; | ||
| final long maxRequestSize; | ||
| final long flushIntervalMs; | ||
| final long maxEvents; | ||
|
|
||
| public DefaultBufferStrategy(final long maxEventSize, final long maxEvents, final long maxRequestSize, final long flushIntervalMs) { | ||
| lastFlushedTime = Instant.now().toEpochMilli(); | ||
| this.maxEventSize = maxEventSize; | ||
| this.maxEvents = maxEvents; | ||
| this.maxRequestSize = maxRequestSize; | ||
| this.flushIntervalMs = flushIntervalMs; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean flushBuffer(final SinkMetrics sinkMetrics) { | ||
| try { | ||
| Object failedStatus = doFlush(null); | ||
| lastFlushedTime = Instant.now().toEpochMilli(); | ||
| return failedStatus == null; | ||
| } catch (Exception e) { | ||
| LOG.error(NOISY, "Failed to flush.", e); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isMaxEventsLimitReached(long numEvents) { | ||
| return numEvents >= maxEvents; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean exceedsFlushTimeInterval() { | ||
| long curTime = Instant.now().toEpochMilli(); | ||
| return (curTime - lastFlushedTime >= flushIntervalMs); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean willExceedMaxRequestSizeBytes(final Event event, final long estimatedSize) { | ||
| return (getCurrentRequestSize() + estimatedSize >= maxRequestSize); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean exceedsMaxEventSizeThreshold(final long estimatedSize) { | ||
| return estimatedSize >= maxEventSize; | ||
| } | ||
|
|
||
| public abstract Object doFlush(Object failedStatus) throws Exception; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be abstract? |
||
|
|
||
| public abstract long getCurrentRequestSize(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How should this be implemented? Why can't the implementation track the current size itself? It seems the only thing we need most sinks to implement is |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| import io.micrometer.core.instrument.Counter; | ||
| import io.micrometer.core.instrument.DistributionSummary; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
|
|
||
| public class DefaultSinkMetrics implements SinkMetrics { | ||
| public static final String DEFAULT_EVENT_NAME = "Event"; | ||
| public static final String SINK_REQUESTS_SUCCEEDED = "SinkRequestsSucceeded"; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using terms like Most metrics use |
||
| public static final String SINK_EVENTS_SUCCEEDED = "SinkEventsSucceeded"; | ||
| public static final String SINK_EVENTS_FAILED = "SinkEventsFailed"; | ||
| public static final String SINK_EVENTS_DROPPED = "SinkEventsDropped"; | ||
| public static final String SINK_REQUESTS_FAILED = "SinkRequestsFailed"; | ||
| public static final String SINK_REQUEST_LATENCY = "SinkRequestLatency"; | ||
| public static final String SINK_RETRIES = "SinkRetries"; | ||
| public static final String SINK_REQUEST_SIZE = "SinkRequestSize"; | ||
| private final Counter sinkRequestsSucceeded; | ||
| private final Counter sinkEventsSucceeded; | ||
| private final Counter sinkRequestsFailed; | ||
| private final Counter sinkEventsFailed; | ||
| private final Counter sinkEventsDropped; | ||
| private final Counter sinkRetries; | ||
| private final DistributionSummary sinkRequestLatency; | ||
| private final DistributionSummary sinkRequestSize; | ||
|
|
||
| public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix, final String eventName) { | ||
| this.sinkRequestsSucceeded = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_SUCCEEDED); | ||
| this.sinkEventsSucceeded = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sSucceeded"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How many metrics might this create? What is
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For AMP sink, it would be "metric", for XRAY sink it would be "span" for opensearch sink it could be "document" and so on |
||
| this.sinkRequestsFailed = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_FAILED); | ||
| this.sinkEventsFailed = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sFailed"); | ||
| this.sinkEventsDropped = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sDropped"); | ||
| this.sinkRetries = pluginMetrics.counter(sinkPrefix + SINK_RETRIES); | ||
| this.sinkRequestLatency = pluginMetrics.summary(sinkPrefix + SINK_REQUEST_LATENCY); | ||
| this.sinkRequestSize = pluginMetrics.summary(sinkPrefix + SINK_REQUEST_SIZE); | ||
| } | ||
|
|
||
| public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix) { | ||
| this(pluginMetrics, sinkPrefix, DEFAULT_EVENT_NAME); | ||
| } | ||
|
|
||
| public void incrementEventsSuccessCounter(int value){ | ||
| sinkEventsSucceeded.increment(value); | ||
| } | ||
|
|
||
| public void incrementRequestsSuccessCounter(int value){ | ||
| sinkRequestsSucceeded.increment(value); | ||
| } | ||
|
|
||
| public void incrementEventsFailedCounter(int value) { | ||
| sinkEventsFailed.increment(value); | ||
| } | ||
|
|
||
| public void incrementEventsDroppedCounter(int value) { | ||
| sinkEventsDropped.increment(value); | ||
| } | ||
|
|
||
| public void incrementRequestsFailedCounter(int value) { | ||
| sinkRequestsFailed.increment(value); | ||
| } | ||
|
|
||
| public void incrementRetries(int value) { | ||
| sinkRetries.increment(value); | ||
| } | ||
|
|
||
| public void recordRequestLatency(double value) { | ||
| sinkRequestLatency.record(value); | ||
| } | ||
|
|
||
| public void recordRequestSize(double value){ | ||
| sinkRequestSize.record(value); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
|
|
||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.Collection; | ||
|
|
||
| public abstract class DefaultSinkOutputStrategy implements SinkOutputStrategy { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more like a template pattern than a strategy. I'd recommend that we have a solution that favors composition of these different pieces rather than giving the whole thing.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to provide a top level template so that implementations do not differ a lot. Like we observed, the cloudwatch_logs sink implementation was so different we might want to re-implement it along the lines of this template
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have unit tests for this. |
||
| private static final Logger LOG = LoggerFactory.getLogger(DefaultSinkOutputStrategy.class); | ||
| private final LockStrategy lockStrategy; | ||
| private final BufferStrategy bufferStrategy; | ||
| private final SinkMetrics sinkMetrics; | ||
|
|
||
| public DefaultSinkOutputStrategy(final LockStrategy lockStrategy, final BufferStrategy bufferStrategy, final SinkMetrics sinkMetrics) { | ||
| this.lockStrategy = lockStrategy; | ||
| this.bufferStrategy = bufferStrategy; | ||
| this.sinkMetrics = sinkMetrics; | ||
| } | ||
|
|
||
| SinkMetrics getSinkMetrics() { | ||
| return sinkMetrics; | ||
| } | ||
|
|
||
| private void flushBuffer() { | ||
| long startTime = System.nanoTime(); | ||
| boolean flushSucceeded = bufferStrategy.flushBuffer(sinkMetrics); | ||
| if (flushSucceeded) { | ||
| sinkMetrics.recordRequestLatency((double)(System.nanoTime() - startTime)); | ||
| } | ||
| } | ||
|
|
||
| public void execute(Collection<Record<Event>> records) { | ||
| lockStrategy.lock(); | ||
| try { | ||
| // If records are empty, check if batch buffer needs to be flushed | ||
| // based on flush interval | ||
| if (records.isEmpty()) { | ||
| if (bufferStrategy.exceedsFlushTimeInterval()) { | ||
| flushBuffer(); | ||
| } | ||
| } else { | ||
| for (Record<Event> record : records) { | ||
| final Event event = record.getData(); | ||
| try { | ||
| long estimatedSize = bufferStrategy.getEstimatedSize(event); | ||
| // Check if individual event exceeds sink's max event size | ||
| if (bufferStrategy.exceedsMaxEventSizeThreshold(estimatedSize)) { | ||
| throw new RuntimeException("Event size exceeds max allowed event size"); | ||
| } | ||
| // Check if adding this event to the batch buffer, would exceed the batch | ||
| // buffer's max bytes threshold, if yes, flush the batch buffer | ||
| if (bufferStrategy.willExceedMaxRequestSizeBytes(event, estimatedSize)) { | ||
| flushBuffer(); | ||
| } | ||
| long numEvents = bufferStrategy.addToBuffer(event, estimatedSize); | ||
| // Check if after adding the event, max events in a batch threshold exceeded | ||
| // If yes, flush the batch buffer | ||
| if (bufferStrategy.isMaxEventsLimitReached(numEvents)) { | ||
| flushBuffer(); | ||
| } | ||
| } catch (Exception ex) { | ||
| LOG.warn(NOISY, "Failed process the event ", ex); | ||
| addEventToDLQList(event, ex); | ||
| } | ||
| } | ||
| } | ||
| } finally { | ||
| flushDLQList(); | ||
| lockStrategy.unlock(); | ||
| } | ||
| } | ||
|
|
||
| public abstract void flushDLQList(); | ||
| public abstract void addEventToDLQList(final Event event, Throwable ex); | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| public interface LockStrategy { | ||
| void lock(); | ||
| void unlock(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| import java.util.concurrent.locks.ReentrantLock; | ||
|
|
||
| public class ReentrantLockStrategy implements LockStrategy { | ||
| private final ReentrantLock reentrantLock; | ||
|
|
||
| public ReentrantLockStrategy() { | ||
| reentrantLock = new ReentrantLock(); | ||
| } | ||
|
|
||
| @Override | ||
| public void lock() { | ||
| reentrantLock.lock(); | ||
| } | ||
|
|
||
| @Override | ||
| public void unlock() { | ||
| reentrantLock.unlock(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| import com.linecorp.armeria.client.retry.Backoff; | ||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.Duration; | ||
|
|
||
| public abstract class RetryBufferStrategy extends DefaultBufferStrategy { | ||
| private static final Logger LOG = LoggerFactory.getLogger(RetryBufferStrategy.class); | ||
| private static final long INITIAL_DELAY_MS = 10; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should be configurable. The sinks will have different values here. |
||
| private static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis(); | ||
| private final int maxRetries; | ||
|
|
||
| public RetryBufferStrategy(final long maxEventSize, final long maxEvents, final long maxRequestSize, final long flushIntervalMs, final int maxRetries) { | ||
| super(maxEventSize, maxEvents, maxRequestSize, flushIntervalMs); | ||
| this.maxRetries = maxRetries; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean flushBuffer(final SinkMetrics sinkMetrics) { | ||
| int retryCount = 1; | ||
| Object failedStatus = null; | ||
| final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS) | ||
| .withMaxAttempts(maxRetries); | ||
| while (retryCount <= maxRetries) { | ||
| try { | ||
| failedStatus = doFlush(failedStatus); | ||
| } catch (Exception e) { | ||
| LOG.warn(NOISY, "Failed to flush.", e); | ||
| } | ||
| if (failedStatus == null) { | ||
| break; | ||
| } | ||
| final long delayMillis = backoff.nextDelayMillis(retryCount); | ||
| if (delayMillis < 0) { | ||
| break; | ||
| } | ||
| try { | ||
| Thread.sleep(delayMillis); | ||
| } catch (final InterruptedException e){} | ||
| retryCount++; | ||
| } | ||
| sinkMetrics.incrementRetries(retryCount); | ||
| if (failedStatus != null) { | ||
| addFailedObjectsToDlqList(failedStatus); | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| public abstract void addFailedObjectsToDlqList(Object failedStatus); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| public interface SinkMetrics { | ||
| public void incrementEventsSuccessCounter(int value); | ||
| public void incrementRequestsSuccessCounter(int value); | ||
| public void incrementEventsFailedCounter(int value); | ||
| public void incrementRequestsFailedCounter(int value); | ||
| public void incrementEventsDroppedCounter(int value); | ||
| public void incrementRetries(int value); | ||
| public void recordRequestLatency(double value); | ||
| public void recordRequestSize(double value); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.sink; | ||
|
|
||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
|
|
||
| import java.util.Collection; | ||
|
|
||
| public interface SinkOutputStrategy { | ||
| public void execute(Collection<Record<Event>> records); | ||
| public void flushDLQList(); | ||
| public void addEventToDLQList(final Event event, Throwable ex); | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have unit tests.