Skip to content

Commit 85e2398

Browse files
authored
Making a fix to compare sessionid on the message and the sessionid on the link filter in a case-insensitive way. (#48759)
* Making a fix to compare sessionid on the message and the link filter in a case insensite way. * Fixing one build error.
1 parent b3ac038 commit 85e2398

File tree

5 files changed

+156
-11
lines changed

5 files changed

+156
-11
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Deque;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.Locale;
3435
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.ConcurrentLinkedDeque;
3637
import java.util.concurrent.TimeoutException;
@@ -132,7 +133,7 @@ class ServiceBusSessionManager implements AutoCloseable, IServiceBusSessionManag
132133
* @return The name of the link, or {@code null} if there is no open link with that {@code sessionId}.
133134
*/
134135
public String getLinkName(String sessionId) {
135-
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
136+
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
136137
return receiver != null ? receiver.getLinkName() : null;
137138
}
138139

@@ -175,7 +176,7 @@ public Flux<ServiceBusMessageContext> receive() {
175176
private Mono<OffsetDateTime> renewSessionLock(String sessionId) {
176177
return validateParameter(sessionId, "sessionId", "renewSessionLock")
177178
.then(getManagementNode().flatMap(channel -> {
178-
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
179+
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
179180
final String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
180181

181182
return tracer
@@ -205,7 +206,7 @@ public Mono<Boolean> updateDisposition(String lockToken, String sessionId, Dispo
205206
validateParameter(lockToken, "lockToken", operation),
206207
validateParameter(sessionId, "'sessionId'", operation))
207208
.then(Mono.defer(() -> {
208-
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
209+
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
209210
if (receiver == null || !receiver.containsLockToken(lockToken)) {
210211
return Mono.just(false);
211212
}
@@ -324,8 +325,8 @@ Mono<ServiceBusReceiveLink> getActiveLink() {
324325
* @return A Mono that completes with an unnamed session receiver.
325326
*/
326327
private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean disposeOnIdle) {
327-
return getActiveLink().flatMap(
328-
link -> link.getSessionId().map(sessionId -> sessionReceivers.compute(sessionId, (key, existing) -> {
328+
return getActiveLink().flatMap(link -> link.getSessionId()
329+
.map(sessionId -> sessionReceivers.compute(sessionId.toLowerCase(Locale.ROOT), (key, existing) -> {
329330
if (existing != null) {
330331
return existing;
331332
}
@@ -340,7 +341,7 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
340341
.log("Closing session receiver.");
341342

342343
availableSchedulers.push(scheduler);
343-
sessionReceivers.remove(sessionReceiver.getSessionId());
344+
sessionReceivers.remove(sessionReceiver.getSessionId().toLowerCase(Locale.ROOT));
344345
sessionReceiver.closeAsync().subscribe();
345346

346347
if (receiverOptions.isRollingSessionReceiver()) {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public String getIdentifier() {
6767

6868
@Override
6969
public String getLinkName(String sessionId) {
70-
return sessionReceiver.getSessionId().equals(sessionId) ? sessionReceiver.getLinkName() : null;
70+
return sessionReceiver.getSessionId().equalsIgnoreCase(sessionId) ? sessionReceiver.getLinkName() : null;
7171
}
7272

7373
@Override
@@ -82,7 +82,7 @@ public Mono<Boolean> updateDisposition(String lockToken, String sessionId, Dispo
8282
ServiceBusTransactionContext transactionContext) {
8383
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
8484
deadLetterDescription, propertiesToModify, transactionContext);
85-
if (sessionReceiver.getSessionId().equals(sessionId)) {
85+
if (sessionReceiver.getSessionId().equalsIgnoreCase(sessionId)) {
8686
return sessionReceiver.updateDisposition(lockToken, deliveryState).thenReturn(true);
8787
// Once the side-by-side support for V1 is no longer needed, as part of deleting V1 ServiceBusSessionManager,
8888
// Update this method to return Mono<Void> and remove the thenReturn(true).

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.Locale;
3637
import java.util.concurrent.ConcurrentHashMap;
3738
import java.util.concurrent.atomic.AtomicBoolean;
3839
import java.util.concurrent.atomic.AtomicLong;
@@ -679,11 +680,11 @@ private SessionReceiversTracker(ClientLogger logger, int size, String fullyQuali
679680
}
680681

681682
private void track(ServiceBusSessionReactorReceiver receiver) {
682-
receivers.put(receiver.getSessionId(), receiver);
683+
receivers.put(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver);
683684
}
684685

685686
private void untrack(ServiceBusSessionReactorReceiver receiver) {
686-
receivers.remove(receiver.getSessionId(), receiver);
687+
receivers.remove(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver);
687688
}
688689

689690
private void clear() {
@@ -767,7 +768,7 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
767768
return Mono.error(new UnsupportedOperationException(m));
768769
}
769770
final String sessionId = message.getSessionId();
770-
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId);
771+
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId.toLowerCase(Locale.ROOT));
771772
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
772773
deadLetterDescription, propertiesToModify, transactionContext);
773774

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.messaging.servicebus;
4+
5+
import com.azure.core.amqp.AmqpRetryOptions;
6+
import com.azure.core.amqp.implementation.MessageSerializer;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.messaging.servicebus.implementation.DispositionStatus;
9+
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
10+
import org.junit.jupiter.api.AfterEach;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.mockito.Mock;
14+
import org.mockito.Mockito;
15+
import org.mockito.MockitoAnnotations;
16+
import reactor.core.publisher.Mono;
17+
import reactor.test.StepVerifier;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertNull;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.when;
24+
25+
class ServiceBusSingleSessionManagerTest {
26+
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSingleSessionManagerTest.class);
27+
28+
@Mock
29+
private ServiceBusSessionReactorReceiver sessionReceiver;
30+
@Mock
31+
private MessageSerializer serializer;
32+
@Mock
33+
private ServiceBusReceiverInstrumentation instrumentation;
34+
35+
private AutoCloseable mocksCloseable;
36+
37+
@BeforeEach
38+
void setup() {
39+
mocksCloseable = MockitoAnnotations.openMocks(this);
40+
}
41+
42+
@AfterEach
43+
void teardown() throws Exception {
44+
Mockito.framework().clearInlineMock(this);
45+
if (mocksCloseable != null) {
46+
mocksCloseable.close();
47+
}
48+
}
49+
50+
@Test
51+
void getLinkNameShouldMatchSessionIdCaseInsensitively() {
52+
final String acceptedSessionId = "Session-ABC";
53+
final String linkName = "link-1";
54+
55+
when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId);
56+
when(sessionReceiver.getLinkName()).thenReturn(linkName);
57+
58+
final ServiceBusSingleSessionManager manager = createManager();
59+
60+
// Exact match
61+
assertEquals(linkName, manager.getLinkName("Session-ABC"));
62+
// All lowercase
63+
assertEquals(linkName, manager.getLinkName("session-abc"));
64+
// All uppercase
65+
assertEquals(linkName, manager.getLinkName("SESSION-ABC"));
66+
// Non-matching value
67+
assertNull(manager.getLinkName("other-session"));
68+
}
69+
70+
@Test
71+
void updateDispositionShouldMatchSessionIdCaseInsensitively() {
72+
final String acceptedSessionId = "Session-ABC";
73+
final String lockToken = "lock-1";
74+
75+
when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId);
76+
when(sessionReceiver.updateDisposition(any(), any())).thenReturn(Mono.empty());
77+
78+
final ServiceBusSingleSessionManager manager = createManager();
79+
80+
// Message carries a differently-cased session ID — disposition should still succeed.
81+
StepVerifier
82+
.create(manager.updateDisposition(lockToken, "session-abc", DispositionStatus.COMPLETED, null, null, null,
83+
null))
84+
.assertNext(result -> assertTrue(result))
85+
.verifyComplete();
86+
87+
// Completely different session ID — disposition should error.
88+
StepVerifier
89+
.create(manager.updateDisposition(lockToken, "other-session", DispositionStatus.COMPLETED, null, null, null,
90+
null))
91+
.verifyError();
92+
}
93+
94+
private ServiceBusSingleSessionManager createManager() {
95+
return new ServiceBusSingleSessionManager(LOGGER, "test-identifier", sessionReceiver, 0, serializer,
96+
new AmqpRetryOptions(), instrumentation);
97+
}
98+
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,51 @@ public void shouldAbandonMessageOnErroredProcessing() {
456456
verify(onTerminate, times(1)).run();
457457
}
458458

459+
@Test
460+
@Execution(ExecutionMode.SAME_THREAD)
461+
public void shouldCompleteMessageWhenSessionIdDiffersInCase() {
462+
// The session ID returned by the broker on the accepted session may differ in case from the session ID
463+
// carried in the received message. The disposition lookup must be case-insensitive.
464+
final String acceptedSessionId = "Session-ABC";
465+
final String messageSessionId = "session-abc";
466+
467+
final HashMap<Message, ServiceBusReceivedMessage> session1Messages = createMockMessages(messageSessionId, 1);
468+
final TestPublisher<AmqpEndpointState> session1EpStates = TestPublisher.createCold();
469+
session1EpStates.next(AmqpEndpointState.ACTIVE);
470+
final Session session1 = createMockSession(acceptedSessionId, session1Messages, session1EpStates);
471+
when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty());
472+
final MessageSerializer serializer = createMockmessageSerializer(session1Messages);
473+
final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1);
474+
final Runnable onTerminate = createMockOnTerminate();
475+
476+
final int maxSessions = 1;
477+
final int concurrency = 1;
478+
final boolean autoDispositionDisabled = false;
479+
final Set<ServiceBusReceivedMessage> unseenMessages = values(session1Messages);
480+
Assertions.assertEquals(1, unseenMessages.size());
481+
final ServiceBusReceivedMessage processedMessage = unseenMessages.iterator().next();
482+
final Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
483+
unseenMessages.remove(context.getMessage());
484+
};
485+
final Consumer<ServiceBusErrorContext> processError = e -> {
486+
};
487+
final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions,
488+
concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate);
489+
490+
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
491+
verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify();
492+
}
493+
494+
Assertions.assertTrue(unseenMessages.isEmpty());
495+
verify(session1.getLink()).updateDisposition(lockTokenCaptor.capture(), deliveryStateCaptor.capture());
496+
final String lockToken = lockTokenCaptor.getValue();
497+
final DeliveryState deliveryState = deliveryStateCaptor.getValue();
498+
Assertions.assertEquals(processedMessage.getLockToken(), lockToken);
499+
Assertions.assertEquals(Accepted.getInstance(), deliveryState);
500+
verify(session1.getLink(), times(1)).closeAsync();
501+
verify(onTerminate, times(1)).run();
502+
}
503+
459504
@Test
460505
@Execution(ExecutionMode.SAME_THREAD)
461506
public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {

0 commit comments

Comments
 (0)