Skip to content

Commit f49bbba

Browse files
committed
Making a fix to compare sessionid on the message and the link filter in a case insensite way.
1 parent 51486db commit f49bbba

5 files changed

Lines changed: 158 additions & 10 deletions

File tree

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Deque;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.Locale;
3435
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.ConcurrentLinkedDeque;
3637
import java.util.concurrent.TimeoutException;
@@ -132,7 +133,7 @@ class ServiceBusSessionManager implements AutoCloseable, IServiceBusSessionManag
132133
* @return The name of the link, or {@code null} if there is no open link with that {@code sessionId}.
133134
*/
134135
public String getLinkName(String sessionId) {
135-
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
136+
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
136137
return receiver != null ? receiver.getLinkName() : null;
137138
}
138139

@@ -175,7 +176,7 @@ public Flux<ServiceBusMessageContext> receive() {
175176
private Mono<OffsetDateTime> renewSessionLock(String sessionId) {
176177
return validateParameter(sessionId, "sessionId", "renewSessionLock")
177178
.then(getManagementNode().flatMap(channel -> {
178-
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
179+
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
179180
final String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
180181

181182
return tracer
@@ -205,7 +206,7 @@ public Mono<Boolean> updateDisposition(String lockToken, String sessionId, Dispo
205206
validateParameter(lockToken, "lockToken", operation),
206207
validateParameter(sessionId, "'sessionId'", operation))
207208
.then(Mono.defer(() -> {
208-
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
209+
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
209210
if (receiver == null || !receiver.containsLockToken(lockToken)) {
210211
return Mono.just(false);
211212
}
@@ -325,7 +326,7 @@ Mono<ServiceBusReceiveLink> getActiveLink() {
325326
*/
326327
private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean disposeOnIdle) {
327328
return getActiveLink().flatMap(
328-
link -> link.getSessionId().map(sessionId -> sessionReceivers.compute(sessionId, (key, existing) -> {
329+
link -> link.getSessionId().map(sessionId -> sessionReceivers.compute(sessionId.toLowerCase(Locale.ROOT), (key, existing) -> {
329330
if (existing != null) {
330331
return existing;
331332
}
@@ -340,7 +341,7 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
340341
.log("Closing session receiver.");
341342

342343
availableSchedulers.push(scheduler);
343-
sessionReceivers.remove(sessionReceiver.getSessionId());
344+
sessionReceivers.remove(sessionReceiver.getSessionId().toLowerCase(Locale.ROOT));
344345
sessionReceiver.closeAsync().subscribe();
345346

346347
if (receiverOptions.isRollingSessionReceiver()) {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSingleSessionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public String getIdentifier() {
6767

6868
@Override
6969
public String getLinkName(String sessionId) {
70-
return sessionReceiver.getSessionId().equals(sessionId) ? sessionReceiver.getLinkName() : null;
70+
return sessionReceiver.getSessionId().equalsIgnoreCase(sessionId) ? sessionReceiver.getLinkName() : null;
7171
}
7272

7373
@Override
@@ -82,7 +82,7 @@ public Mono<Boolean> updateDisposition(String lockToken, String sessionId, Dispo
8282
ServiceBusTransactionContext transactionContext) {
8383
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
8484
deadLetterDescription, propertiesToModify, transactionContext);
85-
if (sessionReceiver.getSessionId().equals(sessionId)) {
85+
if (sessionReceiver.getSessionId().equalsIgnoreCase(sessionId)) {
8686
return sessionReceiver.updateDisposition(lockToken, deliveryState).thenReturn(true);
8787
// Once the side-by-side support for V1 is no longer needed, as part of deleting V1 ServiceBusSessionManager,
8888
// Update this method to return Mono<Void> and remove the thenReturn(true).

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.Locale;
3637
import java.util.concurrent.ConcurrentHashMap;
3738
import java.util.concurrent.atomic.AtomicBoolean;
3839
import java.util.concurrent.atomic.AtomicLong;
@@ -679,11 +680,11 @@ private SessionReceiversTracker(ClientLogger logger, int size, String fullyQuali
679680
}
680681

681682
private void track(ServiceBusSessionReactorReceiver receiver) {
682-
receivers.put(receiver.getSessionId(), receiver);
683+
receivers.put(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver);
683684
}
684685

685686
private void untrack(ServiceBusSessionReactorReceiver receiver) {
686-
receivers.remove(receiver.getSessionId(), receiver);
687+
receivers.remove(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver);
687688
}
688689

689690
private void clear() {
@@ -767,7 +768,7 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
767768
return Mono.error(new UnsupportedOperationException(m));
768769
}
769770
final String sessionId = message.getSessionId();
770-
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId);
771+
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId.toLowerCase(Locale.ROOT));
771772
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
772773
deadLetterDescription, propertiesToModify, transactionContext);
773774

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.messaging.servicebus;
4+
5+
import com.azure.core.amqp.AmqpRetryOptions;
6+
import com.azure.core.amqp.implementation.MessageSerializer;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.messaging.servicebus.implementation.DispositionStatus;
9+
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
10+
import org.junit.jupiter.api.AfterEach;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.mockito.Mock;
14+
import org.mockito.Mockito;
15+
import org.mockito.MockitoAnnotations;
16+
import reactor.core.publisher.Mono;
17+
import reactor.test.StepVerifier;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertNotNull;
21+
import static org.junit.jupiter.api.Assertions.assertNull;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.when;
25+
26+
class ServiceBusSingleSessionManagerTest {
27+
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSingleSessionManagerTest.class);
28+
29+
@Mock
30+
private ServiceBusSessionReactorReceiver sessionReceiver;
31+
@Mock
32+
private MessageSerializer serializer;
33+
@Mock
34+
private ServiceBusReceiverInstrumentation instrumentation;
35+
36+
private AutoCloseable mocksCloseable;
37+
38+
@BeforeEach
39+
void setup() {
40+
mocksCloseable = MockitoAnnotations.openMocks(this);
41+
}
42+
43+
@AfterEach
44+
void teardown() throws Exception {
45+
Mockito.framework().clearInlineMock(this);
46+
if (mocksCloseable != null) {
47+
mocksCloseable.close();
48+
}
49+
}
50+
51+
@Test
52+
void getLinkNameShouldMatchSessionIdCaseInsensitively() {
53+
final String acceptedSessionId = "Session-ABC";
54+
final String linkName = "link-1";
55+
56+
when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId);
57+
when(sessionReceiver.getLinkName()).thenReturn(linkName);
58+
59+
final ServiceBusSingleSessionManager manager = createManager();
60+
61+
// Exact match
62+
assertEquals(linkName, manager.getLinkName("Session-ABC"));
63+
// All lowercase
64+
assertEquals(linkName, manager.getLinkName("session-abc"));
65+
// All uppercase
66+
assertEquals(linkName, manager.getLinkName("SESSION-ABC"));
67+
// Non-matching value
68+
assertNull(manager.getLinkName("other-session"));
69+
}
70+
71+
@Test
72+
void updateDispositionShouldMatchSessionIdCaseInsensitively() {
73+
final String acceptedSessionId = "Session-ABC";
74+
final String lockToken = "lock-1";
75+
76+
when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId);
77+
when(sessionReceiver.updateDisposition(any(), any())).thenReturn(Mono.empty());
78+
79+
final ServiceBusSingleSessionManager manager = createManager();
80+
81+
// Message carries a differently-cased session ID — disposition should still succeed.
82+
StepVerifier
83+
.create(manager.updateDisposition(lockToken, "session-abc", DispositionStatus.COMPLETED, null, null, null,
84+
null))
85+
.assertNext(result -> assertTrue(result))
86+
.verifyComplete();
87+
88+
// Completely different session ID — disposition should error.
89+
StepVerifier
90+
.create(
91+
manager.updateDisposition(lockToken, "other-session", DispositionStatus.COMPLETED, null, null, null,
92+
null))
93+
.verifyError();
94+
}
95+
96+
private ServiceBusSingleSessionManager createManager() {
97+
return new ServiceBusSingleSessionManager(LOGGER, "test-identifier", sessionReceiver, 0, serializer,
98+
new AmqpRetryOptions(), instrumentation);
99+
}
100+
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,52 @@ public void shouldAbandonMessageOnErroredProcessing() {
456456
verify(onTerminate, times(1)).run();
457457
}
458458

459+
@Test
460+
@Execution(ExecutionMode.SAME_THREAD)
461+
public void shouldCompleteMessageWhenSessionIdDiffersInCase() {
462+
// The session ID returned by the broker on the accepted session may differ in case from the session ID
463+
// carried in the received message. The disposition lookup must be case-insensitive.
464+
final String acceptedSessionId = "Session-ABC";
465+
final String messageSessionId = "session-abc";
466+
467+
final HashMap<Message, ServiceBusReceivedMessage> session1Messages
468+
= createMockMessages(messageSessionId, 1);
469+
final TestPublisher<AmqpEndpointState> session1EpStates = TestPublisher.createCold();
470+
session1EpStates.next(AmqpEndpointState.ACTIVE);
471+
final Session session1 = createMockSession(acceptedSessionId, session1Messages, session1EpStates);
472+
when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty());
473+
final MessageSerializer serializer = createMockmessageSerializer(session1Messages);
474+
final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1);
475+
final Runnable onTerminate = createMockOnTerminate();
476+
477+
final int maxSessions = 1;
478+
final int concurrency = 1;
479+
final boolean autoDispositionDisabled = false;
480+
final Set<ServiceBusReceivedMessage> unseenMessages = values(session1Messages);
481+
Assertions.assertEquals(1, unseenMessages.size());
482+
final ServiceBusReceivedMessage processedMessage = unseenMessages.iterator().next();
483+
final Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
484+
unseenMessages.remove(context.getMessage());
485+
};
486+
final Consumer<ServiceBusErrorContext> processError = e -> {
487+
};
488+
final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions,
489+
concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate);
490+
491+
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
492+
verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify();
493+
}
494+
495+
Assertions.assertTrue(unseenMessages.isEmpty());
496+
verify(session1.getLink()).updateDisposition(lockTokenCaptor.capture(), deliveryStateCaptor.capture());
497+
final String lockToken = lockTokenCaptor.getValue();
498+
final DeliveryState deliveryState = deliveryStateCaptor.getValue();
499+
Assertions.assertEquals(processedMessage.getLockToken(), lockToken);
500+
Assertions.assertEquals(Accepted.getInstance(), deliveryState);
501+
verify(session1.getLink(), times(1)).closeAsync();
502+
verify(onTerminate, times(1)).run();
503+
}
504+
459505
@Test
460506
@Execution(ExecutionMode.SAME_THREAD)
461507
public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {

0 commit comments

Comments
 (0)