Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

import org.opensearch.dataprepper.model.event.Event;

public interface BufferStrategy {
long addToBuffer(final Event event, final long estimatedSize) throws Exception;
boolean flushBuffer(SinkMetrics sinkMetrics);
boolean exceedsFlushTimeInterval();
boolean willExceedMaxRequestSizeBytes(final Event event, final long estimatedSize);
boolean isMaxEventsLimitReached(long numEvents);
boolean exceedsMaxEventSizeThreshold(final long estimatedSize);
long getEstimatedSize(final Event event) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

import org.opensearch.dataprepper.model.event.Event;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;

public abstract class DefaultBufferStrategy implements BufferStrategy {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have unit tests.

private static final Logger LOG = LoggerFactory.getLogger(DefaultBufferStrategy.class);
long lastFlushedTime;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should all be private.

final long maxEventSize;
final long maxRequestSize;
final long flushIntervalMs;
final long maxEvents;

public DefaultBufferStrategy(final long maxEventSize, final long maxEvents, final long maxRequestSize, final long flushIntervalMs) {
lastFlushedTime = Instant.now().toEpochMilli();
this.maxEventSize = maxEventSize;
this.maxEvents = maxEvents;
this.maxRequestSize = maxRequestSize;
this.flushIntervalMs = flushIntervalMs;
}

@Override
public boolean flushBuffer(final SinkMetrics sinkMetrics) {
try {
Object failedStatus = doFlush(null);
lastFlushedTime = Instant.now().toEpochMilli();
return failedStatus == null;
} catch (Exception e) {
LOG.error(NOISY, "Failed to flush.", e);
}
return false;
}

@Override
public boolean isMaxEventsLimitReached(long numEvents) {
return numEvents >= maxEvents;
}

@Override
public boolean exceedsFlushTimeInterval() {
long curTime = Instant.now().toEpochMilli();
return (curTime - lastFlushedTime >= flushIntervalMs);
}

@Override
public boolean willExceedMaxRequestSizeBytes(final Event event, final long estimatedSize) {
return (getCurrentRequestSize() + estimatedSize >= maxRequestSize);
}

@Override
public boolean exceedsMaxEventSizeThreshold(final long estimatedSize) {
return estimatedSize >= maxEventSize;
}

public abstract Object doFlush(Object failedStatus) throws Exception;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be abstract?


public abstract long getCurrentRequestSize();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this be implemented? Why can't the implementation track the current size itself?

It seems the only thing we need most sinks to implement is getEstimatedSize(Event).

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class DefaultSinkMetrics implements SinkMetrics {
public static final String DEFAULT_EVENT_NAME = "Event";
public static final String SINK_REQUESTS_SUCCEEDED = "SinkRequestsSucceeded";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using terms like Sink seems to add bloat to the names and also doesn't match our existing conventions.

Most metrics use lowerCamelCase.

public static final String SINK_EVENTS_SUCCEEDED = "SinkEventsSucceeded";
public static final String SINK_EVENTS_FAILED = "SinkEventsFailed";
public static final String SINK_EVENTS_DROPPED = "SinkEventsDropped";
public static final String SINK_REQUESTS_FAILED = "SinkRequestsFailed";
public static final String SINK_REQUEST_LATENCY = "SinkRequestLatency";
public static final String SINK_RETRIES = "SinkRetries";
public static final String SINK_REQUEST_SIZE = "SinkRequestSize";
private final Counter sinkRequestsSucceeded;
private final Counter sinkEventsSucceeded;
private final Counter sinkRequestsFailed;
private final Counter sinkEventsFailed;
private final Counter sinkEventsDropped;
private final Counter sinkRetries;
private final DistributionSummary sinkRequestLatency;
private final DistributionSummary sinkRequestSize;

public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix, final String eventName) {
this.sinkRequestsSucceeded = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_SUCCEEDED);
this.sinkEventsSucceeded = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sSucceeded");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many metrics might this create? What is eventName?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For AMP sink, it would be "metric", for XRAY sink it would be "span" for opensearch sink it could be "document" and so on

this.sinkRequestsFailed = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_FAILED);
this.sinkEventsFailed = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sFailed");
this.sinkEventsDropped = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sDropped");
this.sinkRetries = pluginMetrics.counter(sinkPrefix + SINK_RETRIES);
this.sinkRequestLatency = pluginMetrics.summary(sinkPrefix + SINK_REQUEST_LATENCY);
this.sinkRequestSize = pluginMetrics.summary(sinkPrefix + SINK_REQUEST_SIZE);
}

public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPrefix) {
this(pluginMetrics, sinkPrefix, DEFAULT_EVENT_NAME);
}

public void incrementEventsSuccessCounter(int value){
sinkEventsSucceeded.increment(value);
}

public void incrementRequestsSuccessCounter(int value){
sinkRequestsSucceeded.increment(value);
}

public void incrementEventsFailedCounter(int value) {
sinkEventsFailed.increment(value);
}

public void incrementEventsDroppedCounter(int value) {
sinkEventsDropped.increment(value);
}

public void incrementRequestsFailedCounter(int value) {
sinkRequestsFailed.increment(value);
}

public void incrementRetries(int value) {
sinkRetries.increment(value);
}

public void recordRequestLatency(double value) {
sinkRequestLatency.record(value);
}

public void recordRequestSize(double value){
sinkRequestSize.record(value);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;


import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;

public abstract class DefaultSinkOutputStrategy implements SinkOutputStrategy {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a template pattern than a strategy.

I'd recommend that we have a solution that favors composition of these different pieces rather than giving the whole thing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to provide a top level template so that implementations do not differ a lot. Like we observed, the cloudwatch_logs sink implementation was so different we might want to re-implement it along the lines of this template

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have unit tests for this.

private static final Logger LOG = LoggerFactory.getLogger(DefaultSinkOutputStrategy.class);
private final LockStrategy lockStrategy;
private final BufferStrategy bufferStrategy;
private final SinkMetrics sinkMetrics;

public DefaultSinkOutputStrategy(final LockStrategy lockStrategy, final BufferStrategy bufferStrategy, final SinkMetrics sinkMetrics) {
this.lockStrategy = lockStrategy;
this.bufferStrategy = bufferStrategy;
this.sinkMetrics = sinkMetrics;
}

SinkMetrics getSinkMetrics() {
return sinkMetrics;
}

private void flushBuffer() {
long startTime = System.nanoTime();
boolean flushSucceeded = bufferStrategy.flushBuffer(sinkMetrics);
if (flushSucceeded) {
sinkMetrics.recordRequestLatency((double)(System.nanoTime() - startTime));
}
}

public void execute(Collection<Record<Event>> records) {
lockStrategy.lock();
try {
// If records are empty, check if batch buffer needs to be flushed
// based on flush interval
if (records.isEmpty()) {
if (bufferStrategy.exceedsFlushTimeInterval()) {
flushBuffer();
}
} else {
for (Record<Event> record : records) {
final Event event = record.getData();
try {
long estimatedSize = bufferStrategy.getEstimatedSize(event);
// Check if individual event exceeds sink's max event size
if (bufferStrategy.exceedsMaxEventSizeThreshold(estimatedSize)) {
throw new RuntimeException("Event size exceeds max allowed event size");
}
// Check if adding this event to the batch buffer, would exceed the batch
// buffer's max bytes threshold, if yes, flush the batch buffer
if (bufferStrategy.willExceedMaxRequestSizeBytes(event, estimatedSize)) {
flushBuffer();
}
long numEvents = bufferStrategy.addToBuffer(event, estimatedSize);
// Check if after adding the event, max events in a batch threshold exceeded
// If yes, flush the batch buffer
if (bufferStrategy.isMaxEventsLimitReached(numEvents)) {
flushBuffer();
}
} catch (Exception ex) {
LOG.warn(NOISY, "Failed process the event ", ex);
addEventToDLQList(event, ex);
}
}
}
} finally {
flushDLQList();
lockStrategy.unlock();
}
}

public abstract void flushDLQList();
public abstract void addEventToDLQList(final Event event, Throwable ex);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

public interface LockStrategy {
void lock();
void unlock();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockStrategy implements LockStrategy {
private final ReentrantLock reentrantLock;

public ReentrantLockStrategy() {
reentrantLock = new ReentrantLock();
}

@Override
public void lock() {
reentrantLock.lock();
}

@Override
public void unlock() {
reentrantLock.unlock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

import com.linecorp.armeria.client.retry.Backoff;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public abstract class RetryBufferStrategy extends DefaultBufferStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RetryBufferStrategy.class);
private static final long INITIAL_DELAY_MS = 10;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be configurable. The sinks will have different values here.

private static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
private final int maxRetries;

public RetryBufferStrategy(final long maxEventSize, final long maxEvents, final long maxRequestSize, final long flushIntervalMs, final int maxRetries) {
super(maxEventSize, maxEvents, maxRequestSize, flushIntervalMs);
this.maxRetries = maxRetries;
}

@Override
public boolean flushBuffer(final SinkMetrics sinkMetrics) {
int retryCount = 1;
Object failedStatus = null;
final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS)
.withMaxAttempts(maxRetries);
while (retryCount <= maxRetries) {
try {
failedStatus = doFlush(failedStatus);
} catch (Exception e) {
LOG.warn(NOISY, "Failed to flush.", e);
}
if (failedStatus == null) {
break;
}
final long delayMillis = backoff.nextDelayMillis(retryCount);
if (delayMillis < 0) {
break;
}
try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){}
retryCount++;
}
sinkMetrics.incrementRetries(retryCount);
if (failedStatus != null) {
addFailedObjectsToDlqList(failedStatus);
return false;
}
return true;
}

public abstract void addFailedObjectsToDlqList(Object failedStatus);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

public interface SinkMetrics {
public void incrementEventsSuccessCounter(int value);
public void incrementRequestsSuccessCounter(int value);
public void incrementEventsFailedCounter(int value);
public void incrementRequestsFailedCounter(int value);
public void incrementEventsDroppedCounter(int value);
public void incrementRetries(int value);
public void recordRequestLatency(double value);
public void recordRequestSize(double value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.sink;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;

public interface SinkOutputStrategy {
public void execute(Collection<Record<Event>> records);
public void flushDLQList();
public void addEventToDLQList(final Event event, Throwable ex);
}


Loading