Skip to content

Commit 5dd9eb1

Browse files
authored
Add common sink framework for DataPrepper sinks (#6183)
* Add common sink framework for DataPrepper sinks Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Removed unnecessary file Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent df71eb9 commit 5dd9eb1

19 files changed

Lines changed: 1002 additions & 0 deletions
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import java.time.Instant;
9+
10+
public class DefaultSinkBuffer implements SinkBuffer {
11+
private final SinkBufferWriter sinkBufferWriter;
12+
private final long maxEvents;
13+
private final long maxRequestSize;
14+
private final long flushIntervalMs;
15+
private long lastFlushedTimeMs;
16+
private long numEvents;
17+
private long currentRequestSize;
18+
19+
public DefaultSinkBuffer(final long maxEvents,
20+
final long maxRequestSize,
21+
final long flushIntervalMs, final SinkBufferWriter sinkBufferWriter) {
22+
this.maxEvents = maxEvents;
23+
this.maxRequestSize = maxRequestSize;
24+
this.flushIntervalMs = flushIntervalMs;
25+
this.sinkBufferWriter = sinkBufferWriter;
26+
lastFlushedTimeMs = Instant.now().toEpochMilli();
27+
numEvents = 0L;
28+
currentRequestSize = 0L;
29+
}
30+
31+
@Override
32+
public boolean addToBuffer(final SinkBufferEntry sinkBufferEntry) throws Exception {
33+
if (sinkBufferWriter.writeToBuffer(sinkBufferEntry)) {
34+
currentRequestSize += sinkBufferEntry.getEstimatedSize();
35+
numEvents++;
36+
return true;
37+
}
38+
return false;
39+
}
40+
41+
@Override
42+
public boolean isMaxEventsLimitReached() {
43+
return numEvents >= maxEvents;
44+
}
45+
46+
@Override
47+
public boolean exceedsFlushTimeInterval() {
48+
long curTime = Instant.now().toEpochMilli();
49+
return (curTime - lastFlushedTimeMs >= flushIntervalMs);
50+
}
51+
52+
@Override
53+
public boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry sinkBufferEntry) {
54+
return (currentRequestSize + sinkBufferEntry.getEstimatedSize() >= maxRequestSize);
55+
}
56+
57+
@Override
58+
public SinkFlushableBuffer getFlushableBuffer(final SinkFlushContext sinkFlushContext) {
59+
return sinkBufferWriter.getBuffer(sinkFlushContext);
60+
}
61+
62+
}
63+
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
10+
import java.util.List;
11+
12+
public class DefaultSinkFlushResult implements SinkFlushResult {
13+
private final List<Event> events;
14+
private final Throwable exception;
15+
16+
public DefaultSinkFlushResult(final List<Event> events, final Throwable exception) {
17+
this.events = events;
18+
this.exception = exception;
19+
}
20+
21+
public List<Event> getEvents() {
22+
return events;
23+
}
24+
25+
public Throwable getException() {
26+
return exception;
27+
}
28+
}
29+
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.DistributionSummary;
10+
import org.opensearch.dataprepper.metrics.PluginMetrics;
11+
12+
public class DefaultSinkMetrics implements SinkMetrics {
13+
static final String DEFAULT_EVENT_NAME = "Event";
14+
static final String SINK_REQUESTS_SUCCEEDED = "sinkRequestsSucceeded";
15+
static final String SINK_REQUESTS_FAILED = "sinkRequestsFailed";
16+
static final String SINK_REQUEST_LATENCY = "sinkRequestLatency";
17+
static final String SINK_RETRIES = "sinkRetries";
18+
static final String SINK_REQUEST_SIZE = "sinkRequestSize";
19+
private final Counter sinkRequestsSucceeded;
20+
private final Counter sinkRequestsFailed;
21+
private final Counter sinkEventsSucceeded;
22+
private final Counter sinkEventsFailed;
23+
private final Counter sinkEventsDropped;
24+
private final Counter sinkRetries;
25+
private final DistributionSummary sinkRequestLatency;
26+
private final DistributionSummary sinkRequestSize;
27+
private final DistributionSummary sinkEventSize;
28+
29+
public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String eventName) {
30+
this.sinkRequestsSucceeded = pluginMetrics.counter(SINK_REQUESTS_SUCCEEDED);
31+
this.sinkEventsSucceeded = pluginMetrics.counter("sink"+eventName+"sSucceeded");
32+
this.sinkRequestsFailed = pluginMetrics.counter(SINK_REQUESTS_FAILED);
33+
this.sinkEventsFailed = pluginMetrics.counter("sink"+eventName+"sFailed");
34+
this.sinkEventsDropped = pluginMetrics.counter("sink"+eventName+"sDropped");
35+
this.sinkRetries = pluginMetrics.counter(SINK_RETRIES);
36+
this.sinkRequestLatency = pluginMetrics.summary(SINK_REQUEST_LATENCY);
37+
this.sinkRequestSize = pluginMetrics.summary(SINK_REQUEST_SIZE);
38+
this.sinkEventSize = pluginMetrics.summary("sink"+eventName+"Size");
39+
}
40+
41+
public DefaultSinkMetrics(final PluginMetrics pluginMetrics) {
42+
this(pluginMetrics, DEFAULT_EVENT_NAME);
43+
}
44+
45+
public void incrementEventsSuccessCounter(int value){
46+
sinkEventsSucceeded.increment(value);
47+
}
48+
49+
public void incrementRequestsSuccessCounter(int value){
50+
sinkRequestsSucceeded.increment(value);
51+
}
52+
53+
public void incrementEventsFailedCounter(int value) {
54+
sinkEventsFailed.increment(value);
55+
}
56+
57+
public void incrementEventsDroppedCounter(int value) {
58+
sinkEventsDropped.increment(value);
59+
}
60+
61+
public void incrementRequestsFailedCounter(int value) {
62+
sinkRequestsFailed.increment(value);
63+
}
64+
65+
public void incrementRetries(int value) {
66+
sinkRetries.increment(value);
67+
}
68+
69+
public void recordRequestLatency(double value) {
70+
sinkRequestLatency.record(value);
71+
}
72+
73+
public void recordRequestSize(double value){
74+
sinkRequestSize.record(value);
75+
}
76+
77+
public void recordEventSize(double value){
78+
sinkEventSize.record(value);
79+
}
80+
}
81+
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
11+
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.Collection;
16+
import java.util.List;
17+
18+
public abstract class DefaultSinkOutputStrategy implements SinkBufferEntryProvider, SinkDlqHandler {
19+
private static final Logger LOG = LoggerFactory.getLogger(DefaultSinkOutputStrategy.class);
20+
private final LockStrategy lockStrategy;
21+
private final SinkBuffer sinkBuffer;
22+
private final SinkMetrics sinkMetrics;
23+
private final SinkFlushContext sinkFlushContext;
24+
25+
public DefaultSinkOutputStrategy(final LockStrategy lockStrategy, final SinkBuffer sinkBuffer, final SinkFlushContext sinkFlushContext, final SinkMetrics sinkMetrics) {
26+
this.lockStrategy = lockStrategy;
27+
this.sinkBuffer = sinkBuffer;
28+
this.sinkMetrics = sinkMetrics;
29+
this.sinkFlushContext = sinkFlushContext;
30+
}
31+
32+
public void flushBuffer() {
33+
long startTime = System.nanoTime();
34+
// getBuffer() should return the buffer contents
35+
SinkFlushableBuffer flushableBuffer = sinkBuffer.getFlushableBuffer(sinkFlushContext);
36+
try {
37+
SinkFlushResult flushResult = flushableBuffer.flush();
38+
if (flushResult == null) { // success
39+
sinkMetrics.recordRequestLatency((double)(System.nanoTime() - startTime));
40+
List<Event> events = flushableBuffer.getEvents();
41+
for (final Event event: events) {
42+
event.getEventHandle().release(true);
43+
}
44+
} else {
45+
// flush Result should contain the events that are
46+
// failed to be delivered, so that these events can be forwarded to DLQ
47+
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException());
48+
}
49+
} catch (Exception e) {
50+
// Add list of events to DLQ
51+
addFailedEventsToDlq(flushableBuffer.getEvents(), e);
52+
}
53+
}
54+
55+
public void execute(Collection<Record<Event>> records) {
56+
lockStrategy.lock();
57+
try {
58+
if (sinkBuffer.exceedsFlushTimeInterval()) {
59+
flushBuffer();
60+
}
61+
if (records == null || records.isEmpty()) {
62+
return;
63+
}
64+
for (Record<Event> record : records) {
65+
final Event event = record.getData();
66+
try {
67+
// getSinkBufferEntry() is a sink method that may use codec to get
68+
// the estimated size of the event
69+
SinkBufferEntry bufferEntry = getSinkBufferEntry(event);
70+
71+
// Check if individual event exceeds sink's max event size
72+
if (bufferEntry.exceedsMaxEventSizeThreshold()) {
73+
throw new RuntimeException("Event size exceeds max allowed event size");
74+
}
75+
76+
// Check if adding this event to the batch buffer, would exceed the batch
77+
// buffer's max bytes threshold, if yes, flush the batch buffer before adding
78+
// new buffer entry
79+
if (sinkBuffer.willExceedMaxRequestSizeBytes(bufferEntry)) {
80+
flushBuffer();
81+
}
82+
83+
if (!sinkBuffer.addToBuffer(bufferEntry)) {
84+
throw new RuntimeException("Failed to add event to sink buffer");
85+
}
86+
87+
// Check if after adding the event, max events in a batch threshold exceeded
88+
// If yes, flush the batch buffer
89+
if (sinkBuffer.isMaxEventsLimitReached()) {
90+
flushBuffer();
91+
}
92+
} catch (Exception ex) {
93+
LOG.warn(NOISY, "Failed process the event ", ex);
94+
addFailedEventsToDlq(List.of(event), ex);
95+
}
96+
}
97+
} finally {
98+
flushDlqList();
99+
lockStrategy.unlock();
100+
}
101+
}
102+
103+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
public interface LockStrategy {
9+
void lock();
10+
void unlock();
11+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import java.util.concurrent.locks.ReentrantLock;
9+
10+
public class ReentrantLockStrategy implements LockStrategy {
11+
private final ReentrantLock reentrantLock;
12+
13+
public ReentrantLockStrategy() {
14+
reentrantLock = new ReentrantLock();
15+
}
16+
17+
@Override
18+
public void lock() {
19+
reentrantLock.lock();
20+
}
21+
22+
@Override
23+
public void unlock() {
24+
reentrantLock.unlock();
25+
}
26+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
public interface SinkBuffer {
9+
SinkFlushableBuffer getFlushableBuffer(final SinkFlushContext sinkFlushContext);
10+
boolean addToBuffer(final SinkBufferEntry bufferEntry) throws Exception;
11+
boolean exceedsFlushTimeInterval();
12+
boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry bufferEntry);
13+
boolean isMaxEventsLimitReached();
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
10+
public interface SinkBufferEntry {
11+
public long getEstimatedSize();
12+
public Event getEvent();
13+
public boolean exceedsMaxEventSizeThreshold();
14+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
10+
public interface SinkBufferEntryProvider {
11+
SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception;
12+
}
13+
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.common.sink;
7+
8+
public interface SinkBufferWriter {
9+
public boolean writeToBuffer(SinkBufferEntry sinkBufferEntry);
10+
public SinkFlushableBuffer getBuffer(final SinkFlushContext sinkFlushContext);
11+
}

0 commit comments

Comments
 (0)