Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
private final DefaultAcknowledgementSetMetrics metrics;
private ScheduledFuture<?> progressCheckFuture;
private boolean completed;
private AtomicInteger totalEventsAdded;
private boolean callbackInvoked;
private final AtomicInteger totalEventsAdded;

public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecutor,
final Consumer<Boolean> callback,
Expand All @@ -52,6 +53,7 @@ public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecuto
this.callbackFuture = null;
this.metrics = metrics;
this.completed = false;
this.callbackInvoked = false;
this.progressCheckCallback = null;
pendingAcknowledgments = new HashMap<>();
lock = new ReentrantLock(true);
Expand Down Expand Up @@ -130,11 +132,18 @@ public boolean isDone() {
if (progressCheckFuture != null) {
progressCheckFuture.cancel(false);
}
if (callbackFuture != null) {

// Invoke callback with false on timeout if not already invoked
if (!callbackInvoked) {
callbackInvoked = true;
callbackFuture = scheduledExecutor.submit(() -> callback.accept(false));
LOG.warn("AcknowledgementSet expired, invoking callback with false");
} else if (callbackFuture != null) {
// Callback already invoked, just cancel the future if it exists
callbackFuture.cancel(true);
callbackFuture = null;
LOG.warn("AcknowledgementSet expired");
}

metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME);
return true;
}
Expand All @@ -144,19 +153,16 @@ public boolean isDone() {
return false;
}

public Instant getExpiryTime() {
return expiryTime;
}

Comment on lines -147 to -150

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused method

@Override
public void complete() {
lock.lock();
try {
completed = true;
if (pendingAcknowledgments.size() == 0) {
if (pendingAcknowledgments.size() == 0 && !callbackInvoked) {
if (progressCheckFuture != null) {
progressCheckFuture.cancel(false);
}
callbackInvoked = true;
callbackFuture = scheduledExecutor.submit(() -> callback.accept(this.result));
}
} finally {
Expand All @@ -178,10 +184,11 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
}
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (completed && pendingAcknowledgments.size() == 0) {
if (completed && pendingAcknowledgments.size() == 0 && !callbackInvoked) {
if (progressCheckFuture != null) {
progressCheckFuture.cancel(false);
}
callbackInvoked = true;
callbackFuture = scheduledExecutor.submit(() -> callback.accept(this.result));
return true;
} else if (pendingAcknowledgments.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ private void setupMetrics() {
metrics = mock(DefaultAcknowledgementSetMetrics.class);
lenient().doAnswer(a -> {
String metricName = a.getArgument(0);
if (metricName == DefaultAcknowledgementSetMetrics.INVALID_ACQUIRES_METRIC_NAME) {
if (DefaultAcknowledgementSetMetrics.INVALID_ACQUIRES_METRIC_NAME.equals(metricName)) {
invalidAcquiresCounter++;
} else if (metricName == DefaultAcknowledgementSetMetrics.INVALID_RELEASES_METRIC_NAME) {
} else if (DefaultAcknowledgementSetMetrics.INVALID_RELEASES_METRIC_NAME.equals(metricName)) {
invalidReleasesCounter++;
}
return null;
Expand Down Expand Up @@ -110,7 +110,7 @@ void setupEvent() {
}

@Test
void testDefaultAcknowledgementSetBasic() throws Exception {
void testDefaultAcknowledgementSetBasic() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
Expand All @@ -119,7 +119,7 @@ void testDefaultAcknowledgementSetBasic() throws Exception {
}

@Test
void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception {
void testDefaultAcknowledgementSetMultipleAcquireAndRelease() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
Expand Down Expand Up @@ -150,7 +150,7 @@ void testDefaultAcknowledgementInvalidRelease() {
}

@Test
void testDefaultAcknowledgementDuplicateReleaseError() throws Exception {
void testDefaultAcknowledgementDuplicateReleaseError() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
Expand All @@ -160,7 +160,7 @@ void testDefaultAcknowledgementDuplicateReleaseError() throws Exception {
}

@Test
void testDefaultAcknowledgementSetWithCustomCallback() throws Exception {
void testDefaultAcknowledgementSetWithCustomCallback() {
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
(flag) -> {
acknowledgementSetResult = flag;
Expand All @@ -179,7 +179,7 @@ void testDefaultAcknowledgementSetWithCustomCallback() throws Exception {
}

@Test
void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception {
void testDefaultAcknowledgementSetNegativeAcknowledgements() {
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
(flag) -> {
acknowledgementSetResult = flag;
Expand Down Expand Up @@ -208,7 +208,7 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception {
}

@Test
void testDefaultAcknowledgementSetExpirations() throws Exception {
void testDefaultAcknowledgementSetExpirations() {
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
(flag) -> {
try {
Expand Down Expand Up @@ -240,7 +240,7 @@ void testDefaultAcknowledgementSetExpirations() throws Exception {
}

@Test
void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
void testDefaultAcknowledgementSetWithProgressCheck() {
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
(flag) -> {
acknowledgementSetResult = flag;
Expand Down Expand Up @@ -309,4 +309,143 @@ void shutdown_cancels_progress_check_and_callback_future() throws NoSuchFieldExc
verify(callbackFuture).cancel(false);
verify(progressCheck).cancel(true);
}

@Test
void testCallbackInvokedWithFalseOnTimeout() throws InterruptedException {
// Verify callback is invoked with false when acknowledgement set times out
final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
final AtomicBoolean callbackResult = new AtomicBoolean(true);

final Duration shortTimeout = Duration.ofMillis(100);
final DefaultAcknowledgementSet acknowledgementSet =
new DefaultAcknowledgementSet(executor, (result) -> {
callbackInvoked.set(true);
callbackResult.set(result);
}, shortTimeout, metrics);

// Wait for timeout to occur
Thread.sleep(150);

// Trigger timeout check and verify it's done
assertThat(acknowledgementSet.isDone(), equalTo(true));

// Wait for callback to execute
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> {
assertThat(callbackInvoked.get(), equalTo(true));
assertThat(callbackResult.get(), equalTo(false));
});
}

@Test
void testCallbackInvokedOnlyOnceWhenTimeoutOccurs() throws InterruptedException {
// Verify callback is not invoked twice if isDone() is called multiple times after timeout
final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
final AtomicBoolean callbackResult = new AtomicBoolean(true);

final Duration shortTimeout = Duration.ofMillis(100);
final DefaultAcknowledgementSet acknowledgementSet =
new DefaultAcknowledgementSet(executor, (result) -> {
callbackInvoked.set(true);
callbackResult.set(result);
}, shortTimeout, metrics);

// Wait for timeout
Thread.sleep(150);

// Call isDone multiple times and verify it returns true
assertThat(acknowledgementSet.isDone(), equalTo(true));
assertThat(acknowledgementSet.isDone(), equalTo(true));
assertThat(acknowledgementSet.isDone(), equalTo(true));

// Wait for any callbacks to execute
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> {
assertThat(callbackInvoked.get(), equalTo(true));
assertThat(callbackResult.get(), equalTo(false));
});
}

@Test
void testCallbackNotInvokedOnTimeoutIfAlreadyCompleted() throws InterruptedException {
// Verify that if acknowledgement completes normally before timeout,
// timeout doesn't invoke callback again
final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
final AtomicBoolean lastCallbackResult = new AtomicBoolean(false);

final Duration timeout = Duration.ofSeconds(5);
final DefaultAcknowledgementSet acknowledgementSet =
new DefaultAcknowledgementSet(executor, (result) -> {
callbackInvoked.set(true);
lastCallbackResult.set(result);
}, timeout, metrics);

// Add and release an event (normal completion)
acknowledgementSet.add(event);
acknowledgementSet.complete();
assertThat(acknowledgementSet.release(handle, true), equalTo(true));

// Wait for callback
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> {
assertThat(callbackInvoked.get(), equalTo(true));
assertThat(lastCallbackResult.get(), equalTo(true));
});

// Reset flag to detect if callback is invoked again
callbackInvoked.set(false);

// Manually trigger timeout check (simulating late timeout check) and verify it's done
assertThat(acknowledgementSet.isDone(), equalTo(true));
Thread.sleep(100);

// Verify callback was not invoked again
assertThat(callbackInvoked.get(), equalTo(false));
}

@Test
void testExistingPositiveAcknowledgementBehaviorUnchanged() throws InterruptedException {
// Verify normal positive acknowledgement flow still works as before
final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
final AtomicBoolean callbackResult = new AtomicBoolean(false);

final DefaultAcknowledgementSet acknowledgementSet =
new DefaultAcknowledgementSet(executor, (result) -> {
callbackInvoked.set(true);
callbackResult.set(result);
}, Duration.ofMinutes(5), metrics);

acknowledgementSet.add(event);
acknowledgementSet.complete();
assertThat(acknowledgementSet.release(handle, true), equalTo(true));

await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> {
assertThat(callbackInvoked.get(), equalTo(true));
assertThat(callbackResult.get(), equalTo(true));
});
}

@Test
void testExistingNegativeAcknowledgementBehaviorUnchanged() throws InterruptedException {
// Verify normal negative acknowledgement flow still works as before
final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
final AtomicBoolean callbackResult = new AtomicBoolean(true);

final DefaultAcknowledgementSet acknowledgementSet =
new DefaultAcknowledgementSet(executor, (result) -> {
callbackInvoked.set(true);
callbackResult.set(result);
}, Duration.ofMinutes(5), metrics);

acknowledgementSet.add(event);
acknowledgementSet.complete();
assertThat(acknowledgementSet.release(handle, false), equalTo(true)); // Negative acknowledgement

await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> {
assertThat(callbackInvoked.get(), equalTo(true));
assertThat(callbackResult.get(), equalTo(false));
});
}
}
Loading