Skip to content

Commit f44b9ff

Browse files
ksalazar-91CopilotEldertGrootenboer
authored
[Service Bus] Fix spurious DeliveryNotOnLinkException in session processor (Azure#47356) (Azure#49603)
* [Service Bus] Fix spurious DeliveryNotOnLinkException in session processor (Azure#47356) A session-enabled ServiceBusProcessorClient that settles messages manually (e.g. complete()) while auto-complete is left enabled logged a spurious DeliveryNotOnLinkException ("...does not exist in the link's DeliveryMap") at ERROR. The V2 session disposition path (SessionsMessagePump) never called message.setIsSettled() on success, so the isSettled() guard could not absorb the redundant auto-settlement the way the non-session and V1 session paths do. The second disposition then reached the receive-link after the delivery had already been removed from the DeliveryMap, producing the error log. The message was always settled correctly on the broker; only the misleading ERROR log is removed. Now the session path marks the message settled on a successful disposition, mirroring ServiceBusReceiverAsyncClient, so a redundant settle short-circuits at the already-settled guard. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * [Service Bus] Add regression test for session double-settle (Azure#47356) Adds SessionsMessagePumpIsolatedTest#shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled, covering the V2 session path where a handler settles a message manually while auto-complete is enabled. Verifies the message is marked settled and the redundant auto-settlement short-circuits at the isSettled() guard so the receive-link sees exactly one disposition (no second attempt that would raise DeliveryNotOnLinkException). Addresses the PR review feedback. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * [Service Bus] Use then(Mono.fromRunnable) to match sibling settle paths (Azure#47356) Addresses PR review feedback: switch the session disposition success side-effect from doOnSuccess to .<Void>then(Mono.fromRunnable(() -> message.setIsSettled())), matching the pattern used by the non-session and V1 session paths in ServiceBusReceiverAsyncClient. Behavior is unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: ksalazar-91 <ksalazar-91@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Eldert Grootenboer <eldert@eldert.net>
1 parent 3e5340e commit f44b9ff

3 files changed

Lines changed: 72 additions & 1 deletion

File tree

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@
2424
Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property
2525
from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214))
2626
- Fixed `ServiceBusAdministrationClient.updateSubscription()` silently ignoring `defaultMessageTimeToLive` changes. The property was incorrectly nullified before serialization. ([#48495](https://github.com/Azure/azure-sdk-for-java/issues/48495))
27+
- Fixed session-enabled `ServiceBusProcessorClient` logging a spurious `DeliveryNotOnLinkException`
28+
("...does not exist in the link's DeliveryMap") at ERROR when a message handler settles a message
29+
manually (e.g. `complete()`) while auto-complete is left enabled. The V2 session disposition path now
30+
marks the message settled on success, so the redundant auto-settlement short-circuits at the
31+
already-settled guard instead of attempting a second disposition on the receive-link. The message was
32+
always settled correctly; only the misleading error log is removed. ([#47356](https://github.com/Azure/azure-sdk-for-java/issues/47356))
2733

2834
### Other Changes
2935

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,15 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
894894

895895
Mono<Void> updateDispositionMono;
896896
if (receiver != null) {
897-
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState);
897+
// Mark the message settled once the broker acknowledges the disposition, mirroring the non-session
898+
// and V1 session paths (ServiceBusReceiverAsyncClient). This arms the message.isSettled() guard above
899+
// so that a redundant settlement attempt (e.g. when the handler calls complete() manually AND
900+
// auto-complete is left enabled) short-circuits here with a benign "already settled" error instead of
901+
// reaching the receive-link and logging a spurious DeliveryNotOnLinkException once the delivery has
902+
// already been removed from the link's DeliveryMap.
903+
// See https://github.com/Azure/azure-sdk-for-java/issues/47356
904+
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState)
905+
.<Void>then(Mono.fromRunnable(() -> message.setIsSettled()));
898906
} else {
899907
updateDispositionMono
900908
= Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState));

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.function.Supplier;
4949

5050
import static org.mockito.ArgumentMatchers.any;
51+
import static org.mockito.Mockito.doAnswer;
5152
import static org.mockito.Mockito.doNothing;
5253
import static org.mockito.Mockito.mock;
5354
import static org.mockito.Mockito.times;
@@ -501,6 +502,62 @@ public void shouldCompleteMessageWhenSessionIdDiffersInCase() {
501502
verify(onTerminate, times(1)).run();
502503
}
503504

505+
@Test
506+
@Execution(ExecutionMode.SAME_THREAD)
507+
public void shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled() {
508+
// Regression test for https://github.com/Azure/azure-sdk-for-java/issues/47356
509+
// When a session message handler settles the message manually (e.g. complete()) while auto-complete is
510+
// left enabled, the redundant auto-settlement must short-circuit at the message.isSettled() guard and
511+
// must NOT issue a second disposition on the receive-link (which would log a spurious
512+
// DeliveryNotOnLinkException once the delivery has been removed from the link's DeliveryMap).
513+
final String session1Id = "1";
514+
515+
final HashMap<Message, ServiceBusReceivedMessage> session1Messages = createMockMessages(session1Id, 1);
516+
// Wire the mock message's settled flag so the isSettled() guard in updateDisposition() behaves like the
517+
// real message: setIsSettled() flips it to true and isSettled() reflects the current value.
518+
final ServiceBusReceivedMessage message = session1Messages.values().iterator().next();
519+
final AtomicBoolean settled = new AtomicBoolean(false);
520+
when(message.isSettled()).thenAnswer(__ -> settled.get());
521+
doAnswer(__ -> {
522+
settled.set(true);
523+
return null;
524+
}).when(message).setIsSettled();
525+
526+
final TestPublisher<AmqpEndpointState> session1EpStates = TestPublisher.createCold();
527+
session1EpStates.next(AmqpEndpointState.ACTIVE);
528+
final Session session1 = createMockSession(session1Id, session1Messages, session1EpStates);
529+
when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty());
530+
final MessageSerializer serializer = createMockmessageSerializer(session1Messages);
531+
final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1);
532+
final Runnable onTerminate = createMockOnTerminate();
533+
534+
final int maxSessions = 1;
535+
final int concurrency = 1;
536+
final boolean autoDispositionDisabled = false; // auto-complete enabled.
537+
// The handler settles the message manually, mirroring the customer scenario in the issue.
538+
final Consumer<ServiceBusReceivedMessageContext> processMessage = ServiceBusReceivedMessageContext::complete;
539+
final Consumer<ServiceBusErrorContext> processError = e -> {
540+
};
541+
final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions,
542+
concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate);
543+
544+
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
545+
verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify();
546+
}
547+
548+
// The successful manual disposition marks the message settled...
549+
Assertions.assertTrue(message.isSettled());
550+
verify(message, times(1)).setIsSettled();
551+
// ...so the redundant auto-complete short-circuits at the isSettled() guard and the receive-link sees
552+
// exactly ONE disposition (the manual one), never a second that would raise DeliveryNotOnLinkException.
553+
verify(session1.getLink(), times(1)).updateDisposition(lockTokenCaptor.capture(),
554+
deliveryStateCaptor.capture());
555+
Assertions.assertEquals(message.getLockToken(), lockTokenCaptor.getValue());
556+
Assertions.assertEquals(Accepted.getInstance(), deliveryStateCaptor.getValue());
557+
verify(session1.getLink(), times(1)).closeAsync();
558+
verify(onTerminate, times(1)).run();
559+
}
560+
504561
@Test
505562
@Execution(ExecutionMode.SAME_THREAD)
506563
public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {

0 commit comments

Comments
 (0)