diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 3a6343c98710..b5760c5afe0e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -31,6 +31,7 @@ import java.util.Deque; import java.util.List; import java.util.Map; +import java.util.Locale; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeoutException; @@ -132,7 +133,7 @@ class ServiceBusSessionManager implements AutoCloseable, IServiceBusSessionManag * @return The name of the link, or {@code null} if there is no open link with that {@code sessionId}. */ public String getLinkName(String sessionId) { - final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId); + final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT)); return receiver != null ? receiver.getLinkName() : null; } @@ -175,7 +176,7 @@ public Flux receive() { private Mono renewSessionLock(String sessionId) { return validateParameter(sessionId, "sessionId", "renewSessionLock") .then(getManagementNode().flatMap(channel -> { - final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId); + final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT)); final String associatedLinkName = receiver != null ? receiver.getLinkName() : null; return tracer @@ -205,7 +206,7 @@ public Mono updateDisposition(String lockToken, String sessionId, Dispo validateParameter(lockToken, "lockToken", operation), validateParameter(sessionId, "'sessionId'", operation)) .then(Mono.defer(() -> { - final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId); + final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT)); if (receiver == null || !receiver.containsLockToken(lockToken)) { return Mono.just(false); } @@ -324,8 +325,8 @@ Mono getActiveLink() { * @return A Mono that completes with an unnamed session receiver. */ private Flux getSession(Scheduler scheduler, boolean disposeOnIdle) { - return getActiveLink().flatMap( - link -> link.getSessionId().map(sessionId -> sessionReceivers.compute(sessionId, (key, existing) -> { + return getActiveLink().flatMap(link -> link.getSessionId() + .map(sessionId -> sessionReceivers.compute(sessionId.toLowerCase(Locale.ROOT), (key, existing) -> { if (existing != null) { return existing; } @@ -340,7 +341,7 @@ private Flux getSession(Scheduler scheduler, boolean d .log("Closing session receiver."); availableSchedulers.push(scheduler); - sessionReceivers.remove(sessionReceiver.getSessionId()); + sessionReceivers.remove(sessionReceiver.getSessionId().toLowerCase(Locale.ROOT)); sessionReceiver.closeAsync().subscribe(); if (receiverOptions.isRollingSessionReceiver()) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManager.java index 59013f09bddc..a92e088bc062 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManager.java @@ -67,7 +67,7 @@ public String getIdentifier() { @Override public String getLinkName(String sessionId) { - return sessionReceiver.getSessionId().equals(sessionId) ? sessionReceiver.getLinkName() : null; + return sessionReceiver.getSessionId().equalsIgnoreCase(sessionId) ? sessionReceiver.getLinkName() : null; } @Override @@ -82,7 +82,7 @@ public Mono updateDisposition(String lockToken, String sessionId, Dispo ServiceBusTransactionContext transactionContext) { final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterDescription, propertiesToModify, transactionContext); - if (sessionReceiver.getSessionId().equals(sessionId)) { + if (sessionReceiver.getSessionId().equalsIgnoreCase(sessionId)) { return sessionReceiver.updateDisposition(lockToken, deliveryState).thenReturn(true); // Once the side-by-side support for V1 is no longer needed, as part of deleting V1 ServiceBusSessionManager, // Update this method to return Mono and remove the thenReturn(true). diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index a21da9a86324..90bf2db4f0aa 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Locale; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -679,11 +680,11 @@ private SessionReceiversTracker(ClientLogger logger, int size, String fullyQuali } private void track(ServiceBusSessionReactorReceiver receiver) { - receivers.put(receiver.getSessionId(), receiver); + receivers.put(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver); } private void untrack(ServiceBusSessionReactorReceiver receiver) { - receivers.remove(receiver.getSessionId(), receiver); + receivers.remove(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver); } private void clear() { @@ -767,7 +768,7 @@ private Mono updateDisposition(ServiceBusReceivedMessage message, Disposit return Mono.error(new UnsupportedOperationException(m)); } final String sessionId = message.getSessionId(); - final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId); + final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId.toLowerCase(Locale.ROOT)); final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterDescription, propertiesToModify, transactionContext); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManagerTest.java new file mode 100644 index 000000000000..7cfa05d29b45 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManagerTest.java @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.messaging.servicebus; + +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.DispositionStatus; +import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +class ServiceBusSingleSessionManagerTest { + private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSingleSessionManagerTest.class); + + @Mock + private ServiceBusSessionReactorReceiver sessionReceiver; + @Mock + private MessageSerializer serializer; + @Mock + private ServiceBusReceiverInstrumentation instrumentation; + + private AutoCloseable mocksCloseable; + + @BeforeEach + void setup() { + mocksCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + @Test + void getLinkNameShouldMatchSessionIdCaseInsensitively() { + final String acceptedSessionId = "Session-ABC"; + final String linkName = "link-1"; + + when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId); + when(sessionReceiver.getLinkName()).thenReturn(linkName); + + final ServiceBusSingleSessionManager manager = createManager(); + + // Exact match + assertEquals(linkName, manager.getLinkName("Session-ABC")); + // All lowercase + assertEquals(linkName, manager.getLinkName("session-abc")); + // All uppercase + assertEquals(linkName, manager.getLinkName("SESSION-ABC")); + // Non-matching value + assertNull(manager.getLinkName("other-session")); + } + + @Test + void updateDispositionShouldMatchSessionIdCaseInsensitively() { + final String acceptedSessionId = "Session-ABC"; + final String lockToken = "lock-1"; + + when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId); + when(sessionReceiver.updateDisposition(any(), any())).thenReturn(Mono.empty()); + + final ServiceBusSingleSessionManager manager = createManager(); + + // Message carries a differently-cased session ID — disposition should still succeed. + StepVerifier + .create(manager.updateDisposition(lockToken, "session-abc", DispositionStatus.COMPLETED, null, null, null, + null)) + .assertNext(result -> assertTrue(result)) + .verifyComplete(); + + // Completely different session ID — disposition should error. + StepVerifier + .create(manager.updateDisposition(lockToken, "other-session", DispositionStatus.COMPLETED, null, null, null, + null)) + .verifyError(); + } + + private ServiceBusSingleSessionManager createManager() { + return new ServiceBusSingleSessionManager(LOGGER, "test-identifier", sessionReceiver, 0, serializer, + new AmqpRetryOptions(), instrumentation); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java index e1044a69b828..1f1cf0927508 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java @@ -456,6 +456,51 @@ public void shouldAbandonMessageOnErroredProcessing() { verify(onTerminate, times(1)).run(); } + @Test + @Execution(ExecutionMode.SAME_THREAD) + public void shouldCompleteMessageWhenSessionIdDiffersInCase() { + // The session ID returned by the broker on the accepted session may differ in case from the session ID + // carried in the received message. The disposition lookup must be case-insensitive. + final String acceptedSessionId = "Session-ABC"; + final String messageSessionId = "session-abc"; + + final HashMap session1Messages = createMockMessages(messageSessionId, 1); + final TestPublisher session1EpStates = TestPublisher.createCold(); + session1EpStates.next(AmqpEndpointState.ACTIVE); + final Session session1 = createMockSession(acceptedSessionId, session1Messages, session1EpStates); + when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty()); + final MessageSerializer serializer = createMockmessageSerializer(session1Messages); + final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1); + final Runnable onTerminate = createMockOnTerminate(); + + final int maxSessions = 1; + final int concurrency = 1; + final boolean autoDispositionDisabled = false; + final Set unseenMessages = values(session1Messages); + Assertions.assertEquals(1, unseenMessages.size()); + final ServiceBusReceivedMessage processedMessage = unseenMessages.iterator().next(); + final Consumer processMessage = context -> { + unseenMessages.remove(context.getMessage()); + }; + final Consumer processError = e -> { + }; + final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions, + concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate); + + try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { + verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify(); + } + + Assertions.assertTrue(unseenMessages.isEmpty()); + verify(session1.getLink()).updateDisposition(lockTokenCaptor.capture(), deliveryStateCaptor.capture()); + final String lockToken = lockTokenCaptor.getValue(); + final DeliveryState deliveryState = deliveryStateCaptor.getValue(); + Assertions.assertEquals(processedMessage.getLockToken(), lockToken); + Assertions.assertEquals(Accepted.getInstance(), deliveryState); + verify(session1.getLink(), times(1)).closeAsync(); + verify(onTerminate, times(1)).run(); + } + @Test @Execution(ExecutionMode.SAME_THREAD) public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {