Skip to content

Commit 2faa175

Browse files
authored
Add support for invoking acknowledgmentSet callback on expiry (#6596)
Add support for invoking acknowledgmentSet callback on expiry. Fixed expiry logic in DefaultAcknowledgementSet. Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 3e90b97 commit 2faa175

20 files changed

Lines changed: 384 additions & 21 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,16 @@ public interface AcknowledgementSetManager {
3030
* @since 2.2
3131
*/
3232
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);
33+
34+
/**
35+
* Creates an acknowledgement set
36+
*
37+
* @param callback callback function to be invoked
38+
* @param timeout expiry timeout
39+
* @param invokeCallbackOnExpiry flag indicating if the callback function should be invoked on expiry
40+
*
41+
* @return AcknowledgementSet returns a new acknowledgement set
42+
* @since 2.15
43+
*/
44+
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout, final boolean invokeCallbackOnExpiry);
3345
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,30 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
4242
private final DefaultAcknowledgementSetMetrics metrics;
4343
private ScheduledFuture<?> progressCheckFuture;
4444
private boolean completed;
45+
private boolean expired;
4546
private AtomicInteger totalEventsAdded;
47+
private final boolean invokeCallbackOnExpiry;
4648

4749
public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecutor,
4850
final Consumer<Boolean> callback,
4951
final Duration expiryTime,
5052
final DefaultAcknowledgementSetMetrics metrics) {
53+
this(scheduledExecutor, callback, expiryTime, metrics, false);
54+
}
55+
public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecutor,
56+
final Consumer<Boolean> callback,
57+
final Duration expiryTime,
58+
final DefaultAcknowledgementSetMetrics metrics,
59+
final boolean invokeCallbackOnExpiry) {
5160
this.callback = callback;
5261
this.result = true;
5362
this.totalEventsAdded = new AtomicInteger(0);
5463
this.scheduledExecutor = scheduledExecutor;
5564
this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis());
5665
this.callbackFuture = null;
5766
this.metrics = metrics;
67+
this.expired = false;
68+
this.invokeCallbackOnExpiry = invokeCallbackOnExpiry;
5869
this.completed = false;
5970
this.progressCheckCallback = null;
6071
pendingAcknowledgments = new HashMap<>();
@@ -127,19 +138,26 @@ public boolean isDone() {
127138
lock.lock();
128139
try {
129140
if (callbackFuture != null && callbackFuture.isDone()) {
130-
metrics.increment(DefaultAcknowledgementSetMetrics.COMPLETED_METRIC_NAME);
141+
if (!expired) {
142+
metrics.increment(DefaultAcknowledgementSetMetrics.COMPLETED_METRIC_NAME);
143+
}
131144
return true;
132145
}
133-
if (Instant.now().isAfter(expiryTime)) {
146+
if (!expired && Instant.now().isAfter(expiryTime)) {
147+
expired = true;
134148
if (progressCheckFuture != null) {
135149
progressCheckFuture.cancel(false);
136150
}
137-
if (callbackFuture != null) {
151+
metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME);
152+
if (invokeCallbackOnExpiry) {
153+
result = false;
154+
callbackFuture = scheduledExecutor.submit(() -> callback.accept(result));
155+
return false;
156+
} else if (callbackFuture != null) {
138157
callbackFuture.cancel(true);
139158
callbackFuture = null;
140159
LOG.warn("AcknowledgementSet expired");
141160
}
142-
metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME);
143161
return true;
144162
}
145163
} finally {

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
5151
return acknowledgementSet;
5252
}
5353

54+
public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout, final boolean invokeCallbackOnExpiry) {
55+
AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutor, callback, timeout, metrics, invokeCallbackOnExpiry);
56+
acknowledgementSetMonitor.add(acknowledgementSet);
57+
metrics.increment(DefaultAcknowledgementSetMetrics.CREATED_METRIC_NAME);
58+
return acknowledgementSet;
59+
}
60+
5461
public void shutdown() {
5562
acknowledgementSetMonitorThread.stop();
5663
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/InactiveAcknowledgementSetManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
2929
throw new UnsupportedOperationException("create operation not supported");
3030
}
3131

32+
public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout, final boolean invokeCallbackOnExpiry) {
33+
throw new UnsupportedOperationException("create operation not supported");
34+
}
3235
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetManagerTests.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.junit.jupiter.api.BeforeEach;
1313
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.ValueSource;
1416
import org.junit.jupiter.api.extension.ExtendWith;
1517
import org.mockito.Mock;
1618
import org.mockito.junit.jupiter.MockitoExtension;
@@ -26,14 +28,17 @@
2628
import static org.awaitility.Awaitility.await;
2729
import static org.hamcrest.MatcherAssert.assertThat;
2830
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.notNullValue;
2932
import static org.mockito.ArgumentMatchers.any;
3033
import static org.mockito.Mockito.doAnswer;
3134
import static org.mockito.Mockito.lenient;
35+
import static org.mockito.Mockito.when;
3236
import static org.mockito.Mockito.mock;
3337

3438
@ExtendWith(MockitoExtension.class)
3539
class DefaultAcknowledgementSetManagerTests {
3640
private static final Duration TEST_TIMEOUT = Duration.ofMillis(400);
41+
private static final Duration EXPIRY_TEST_TIMEOUT = Duration.ofMillis(100);
3742
private DefaultAcknowledgementSetManager acknowledgementSetManager;
3843
private ScheduledExecutorService callbackExecutor;
3944

@@ -104,9 +109,9 @@ void testBasic() {
104109
@Test
105110
void testExpirations() throws InterruptedException {
106111
eventHandle2.release(true);
107-
Thread.sleep(TEST_TIMEOUT.multipliedBy(5).toMillis());
112+
Thread.sleep(TEST_TIMEOUT.multipliedBy(2).toMillis());
108113
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
109-
await().atMost(TEST_TIMEOUT.multipliedBy(5))
114+
await().atMost(TEST_TIMEOUT.multipliedBy(3))
110115
.untilAsserted(() -> {
111116
assertThat(result, equalTo(null));
112117
});
@@ -205,7 +210,6 @@ void testWithProgressCheckCallbacks() {
205210
.untilAsserted(() -> {
206211
assertThat(result, equalTo(true));
207212
});
208-
209213
}
210214

211215
@Test
@@ -270,7 +274,30 @@ void testWithProgressCheckCallbacks_AcksExpire() {
270274
.untilAsserted(() -> {
271275
assertThat(result, equalTo(null));
272276
});
273-
277+
}
278+
279+
@ParameterizedTest
280+
@ValueSource(booleans = {true, false})
281+
void testCreateWithInvokeCallbackOnExpiry(final boolean expiryCallback) {
282+
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create((flag) -> { result = flag; }, EXPIRY_TEST_TIMEOUT, expiryCallback);
283+
assertThat(acknowledgementSet, notNullValue());
284+
}
285+
286+
@Test
287+
void testExpirationWithInvokeCallbackOnExpiryTrue() throws InterruptedException {
288+
DefaultAcknowledgementSet acknowledgementSet = (DefaultAcknowledgementSet) acknowledgementSetManager.create((flag) -> { result = flag; }, EXPIRY_TEST_TIMEOUT, true);
289+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
290+
JacksonEvent event = mock(JacksonEvent.class);
291+
when(event.getEventHandle()).thenReturn(eventHandle);
292+
acknowledgementSet.add(event);
293+
acknowledgementSet.complete();
294+
295+
Thread.sleep(EXPIRY_TEST_TIMEOUT.multipliedBy(2).toMillis());
296+
await().atMost(EXPIRY_TEST_TIMEOUT.multipliedBy(3))
297+
.untilAsserted(() -> {
298+
acknowledgementSet.isDone();
299+
assertThat(result, equalTo(false));
300+
});
274301
}
275302

276303
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class DefaultAcknowledgementSetTests {
6262
private DefaultAcknowledgementSetMetrics metrics;
6363
private int invalidAcquiresCounter;
6464
private int invalidReleasesCounter;
65-
65+
6666
private void setupMetrics() {
6767
metrics = mock(DefaultAcknowledgementSetMetrics.class);
6868
lenient().doAnswer(a -> {
@@ -146,6 +146,7 @@ void testDefaultAcknowledgementInvalidAcquire() {
146146
assertThat(invalidAcquiresCounter, equalTo(1));
147147
}
148148

149+
@Test
149150
void testDefaultAcknowledgementInvalidRelease() {
150151
defaultAcknowledgementSet.add(event);
151152
defaultAcknowledgementSet.complete();
@@ -168,7 +169,7 @@ void testDefaultAcknowledgementSetWithCustomCallback() throws Exception {
168169
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
169170
(flag) -> {
170171
acknowledgementSetResult = flag;
171-
}
172+
}
172173
);
173174
defaultAcknowledgementSet.add(event);
174175
defaultAcknowledgementSet.complete();
@@ -187,7 +188,7 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception {
187188
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
188189
(flag) -> {
189190
acknowledgementSetResult = flag;
190-
}
191+
}
191192
);
192193
defaultAcknowledgementSet.add(event);
193194
defaultAcknowledgementSet.complete();
@@ -221,7 +222,7 @@ void testDefaultAcknowledgementSetExpirations() throws Exception {
221222
} catch (Exception e) {
222223
callbackInterrupted.set(true);
223224
}
224-
}
225+
}
225226
);
226227
defaultAcknowledgementSet.add(event);
227228
defaultAcknowledgementSet.complete();
@@ -248,7 +249,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
248249
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
249250
(flag) -> {
250251
acknowledgementSetResult = flag;
251-
}
252+
}
252253
);
253254
defaultAcknowledgementSet.addProgressCheck(
254255
(progressCheck) -> {
@@ -313,4 +314,40 @@ void shutdown_cancels_progress_check_and_callback_future() throws NoSuchFieldExc
313314
verify(callbackFuture).cancel(false);
314315
verify(progressCheck).cancel(true);
315316
}
317+
318+
@Test
319+
void testDefaultAcknowledgementSetWithCallbackOnExpiryTrue() throws Exception {
320+
setupMetrics();
321+
defaultAcknowledgementSet = new DefaultAcknowledgementSet(executor, (flag) -> {
322+
acknowledgementSetResult = flag;
323+
}, TEST_TIMEOUT, metrics, true);
324+
325+
defaultAcknowledgementSet.add(event);
326+
defaultAcknowledgementSet.complete();
327+
328+
Thread.sleep(TEST_TIMEOUT.multipliedBy(2).toMillis());
329+
330+
Awaitility.waitAtMost(Duration.ofSeconds(15))
331+
.pollDelay(Duration.ofMillis(500))
332+
.until(() -> defaultAcknowledgementSet.isDone());
333+
assertThat(acknowledgementSetResult, equalTo(false));
334+
}
335+
336+
@Test
337+
void testDefaultAcknowledgementSetWithCallbackOnExpiryFalse() throws Exception {
338+
setupMetrics();
339+
defaultAcknowledgementSet = new DefaultAcknowledgementSet(executor, (flag) -> {
340+
acknowledgementSetResult = flag;
341+
}, TEST_TIMEOUT, metrics, false);
342+
343+
defaultAcknowledgementSet.add(event);
344+
defaultAcknowledgementSet.complete();
345+
346+
Thread.sleep(TEST_TIMEOUT.multipliedBy(2).toMillis());
347+
348+
Awaitility.waitAtMost(Duration.ofSeconds(10))
349+
.pollDelay(Duration.ofMillis(500))
350+
.until(() -> defaultAcknowledgementSet.isDone());
351+
assertThat(acknowledgementSetResult, equalTo(null));
352+
}
316353
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/InactiveAcknowledgementSetManagerTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.junit.jupiter.api.BeforeEach;
1313
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.ValueSource;
1416

1517
import java.time.Duration;
1618

@@ -32,4 +34,11 @@ void testCreateAPI() {
3234
assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.create((a)->{}, Duration.ofMillis(10)));
3335
}
3436

37+
@ParameterizedTest
38+
@ValueSource(booleans = {true, false})
39+
void testCreateAPIWithInvokeCallbackOnExpiry(boolean invokeCallbackOnExpiryFlag) {
40+
assertThat(acknowledgementSetManager, notNullValue());
41+
assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.create((a)->{}, Duration.ofMillis(10), invokeCallbackOnExpiryFlag));
42+
}
43+
3544
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.kafka.buffer;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.kafka.buffer;
@@ -102,7 +106,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
102106
this.shutdownInProgress = new AtomicBoolean(false);
103107
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
104108
this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
105-
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker, customCompressionOption);
109+
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker, customCompressionOption, true);
106110
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
107111
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
108112
consumers.forEach(this.executorService::submit);

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.kafka.buffer;
@@ -29,6 +33,7 @@
2933

3034
class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
3135
private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30);
36+
static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofHours(Integer.MAX_VALUE);
3237

3338
@JsonProperty("bootstrap_servers")
3439
private List<String> bootstrapServers;
@@ -55,6 +60,9 @@ class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
5560
@JsonProperty("drain_timeout")
5661
private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT;
5762

63+
@JsonProperty("acknowledgements_timeout")
64+
private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;
65+
5866
@JsonProperty("custom_metric_prefix")
5967
private String customMetricPrefix;
6068

@@ -144,6 +152,12 @@ public Duration getDrainTimeout() {
144152
return drainTimeout;
145153
}
146154

155+
@Override
156+
@JsonIgnore
157+
public Duration getAcknowledgementsTimeout() {
158+
return acknowledgementsTimeout;
159+
}
160+
147161
@JsonIgnore
148162
public Optional<String> getCustomMetricPrefix() {
149163
return Optional.ofNullable(customMetricPrefix);

0 commit comments

Comments
 (0)