Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -132,7 +133,7 @@ class ServiceBusSessionManager implements AutoCloseable, IServiceBusSessionManag
* @return The name of the link, or {@code null} if there is no open link with that {@code sessionId}.
*/
public String getLinkName(String sessionId) {
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
return receiver != null ? receiver.getLinkName() : null;
Comment thread
yvgopal marked this conversation as resolved.
}

Expand Down Expand Up @@ -175,7 +176,7 @@ public Flux<ServiceBusMessageContext> receive() {
private Mono<OffsetDateTime> renewSessionLock(String sessionId) {
return validateParameter(sessionId, "sessionId", "renewSessionLock")
.then(getManagementNode().flatMap(channel -> {
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
final String associatedLinkName = receiver != null ? receiver.getLinkName() : null;

return tracer
Expand Down Expand Up @@ -205,7 +206,7 @@ public Mono<Boolean> updateDisposition(String lockToken, String sessionId, Dispo
validateParameter(lockToken, "lockToken", operation),
validateParameter(sessionId, "'sessionId'", operation))
.then(Mono.defer(() -> {
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId);
final ServiceBusSessionReceiver receiver = sessionReceivers.get(sessionId.toLowerCase(Locale.ROOT));
if (receiver == null || !receiver.containsLockToken(lockToken)) {
return Mono.just(false);
}
Expand Down Expand Up @@ -324,8 +325,8 @@ Mono<ServiceBusReceiveLink> getActiveLink() {
* @return A Mono that completes with an unnamed session receiver.
*/
private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean disposeOnIdle) {
return getActiveLink().flatMap(
link -> link.getSessionId().map(sessionId -> sessionReceivers.compute(sessionId, (key, existing) -> {
return getActiveLink().flatMap(link -> link.getSessionId()
.map(sessionId -> sessionReceivers.compute(sessionId.toLowerCase(Locale.ROOT), (key, existing) -> {
if (existing != null) {
return existing;
}
Expand All @@ -340,7 +341,7 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
.log("Closing session receiver.");

availableSchedulers.push(scheduler);
sessionReceivers.remove(sessionReceiver.getSessionId());
sessionReceivers.remove(sessionReceiver.getSessionId().toLowerCase(Locale.ROOT));
sessionReceiver.closeAsync().subscribe();

if (receiverOptions.isRollingSessionReceiver()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public String getIdentifier() {

@Override
public String getLinkName(String sessionId) {
return sessionReceiver.getSessionId().equals(sessionId) ? sessionReceiver.getLinkName() : null;
return sessionReceiver.getSessionId().equalsIgnoreCase(sessionId) ? sessionReceiver.getLinkName() : null;
}

@Override
Expand All @@ -82,7 +82,7 @@ public Mono<Boolean> updateDisposition(String lockToken, String sessionId, Dispo
ServiceBusTransactionContext transactionContext) {
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
deadLetterDescription, propertiesToModify, transactionContext);
if (sessionReceiver.getSessionId().equals(sessionId)) {
if (sessionReceiver.getSessionId().equalsIgnoreCase(sessionId)) {
return sessionReceiver.updateDisposition(lockToken, deliveryState).thenReturn(true);
// Once the side-by-side support for V1 is no longer needed, as part of deleting V1 ServiceBusSessionManager,
// Update this method to return Mono<Void> and remove the thenReturn(true).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -679,11 +680,11 @@ private SessionReceiversTracker(ClientLogger logger, int size, String fullyQuali
}

private void track(ServiceBusSessionReactorReceiver receiver) {
receivers.put(receiver.getSessionId(), receiver);
receivers.put(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver);
}

private void untrack(ServiceBusSessionReactorReceiver receiver) {
receivers.remove(receiver.getSessionId(), receiver);
receivers.remove(receiver.getSessionId().toLowerCase(Locale.ROOT), receiver);
}

private void clear() {
Expand Down Expand Up @@ -767,7 +768,7 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
return Mono.error(new UnsupportedOperationException(m));
}
final String sessionId = message.getSessionId();
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId);
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId.toLowerCase(Locale.ROOT));
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
deadLetterDescription, propertiesToModify, transactionContext);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

class ServiceBusSingleSessionManagerTest {
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSingleSessionManagerTest.class);

@Mock
private ServiceBusSessionReactorReceiver sessionReceiver;
@Mock
private MessageSerializer serializer;
@Mock
private ServiceBusReceiverInstrumentation instrumentation;

private AutoCloseable mocksCloseable;

@BeforeEach
void setup() {
mocksCloseable = MockitoAnnotations.openMocks(this);
}

@AfterEach
void teardown() throws Exception {
Mockito.framework().clearInlineMock(this);
if (mocksCloseable != null) {
mocksCloseable.close();
}
}

@Test
void getLinkNameShouldMatchSessionIdCaseInsensitively() {
final String acceptedSessionId = "Session-ABC";
final String linkName = "link-1";

when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId);
when(sessionReceiver.getLinkName()).thenReturn(linkName);

final ServiceBusSingleSessionManager manager = createManager();

// Exact match
assertEquals(linkName, manager.getLinkName("Session-ABC"));
// All lowercase
assertEquals(linkName, manager.getLinkName("session-abc"));
// All uppercase
assertEquals(linkName, manager.getLinkName("SESSION-ABC"));
// Non-matching value
assertNull(manager.getLinkName("other-session"));
}

@Test
void updateDispositionShouldMatchSessionIdCaseInsensitively() {
final String acceptedSessionId = "Session-ABC";
final String lockToken = "lock-1";

when(sessionReceiver.getSessionId()).thenReturn(acceptedSessionId);
when(sessionReceiver.updateDisposition(any(), any())).thenReturn(Mono.empty());

final ServiceBusSingleSessionManager manager = createManager();

// Message carries a differently-cased session ID — disposition should still succeed.
StepVerifier
.create(manager.updateDisposition(lockToken, "session-abc", DispositionStatus.COMPLETED, null, null, null,
null))
.assertNext(result -> assertTrue(result))
.verifyComplete();

// Completely different session ID — disposition should error.
StepVerifier
.create(manager.updateDisposition(lockToken, "other-session", DispositionStatus.COMPLETED, null, null, null,
null))
.verifyError();
}

private ServiceBusSingleSessionManager createManager() {
return new ServiceBusSingleSessionManager(LOGGER, "test-identifier", sessionReceiver, 0, serializer,
new AmqpRetryOptions(), instrumentation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,51 @@ public void shouldAbandonMessageOnErroredProcessing() {
verify(onTerminate, times(1)).run();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldCompleteMessageWhenSessionIdDiffersInCase() {
// The session ID returned by the broker on the accepted session may differ in case from the session ID
// carried in the received message. The disposition lookup must be case-insensitive.
final String acceptedSessionId = "Session-ABC";
final String messageSessionId = "session-abc";

final HashMap<Message, ServiceBusReceivedMessage> session1Messages = createMockMessages(messageSessionId, 1);
final TestPublisher<AmqpEndpointState> session1EpStates = TestPublisher.createCold();
session1EpStates.next(AmqpEndpointState.ACTIVE);
final Session session1 = createMockSession(acceptedSessionId, session1Messages, session1EpStates);
when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty());
final MessageSerializer serializer = createMockmessageSerializer(session1Messages);
final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1);
final Runnable onTerminate = createMockOnTerminate();

final int maxSessions = 1;
final int concurrency = 1;
final boolean autoDispositionDisabled = false;
final Set<ServiceBusReceivedMessage> unseenMessages = values(session1Messages);
Assertions.assertEquals(1, unseenMessages.size());
final ServiceBusReceivedMessage processedMessage = unseenMessages.iterator().next();
final Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
unseenMessages.remove(context.getMessage());
};
final Consumer<ServiceBusErrorContext> processError = e -> {
};
final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions,
concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate);

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify();
}

Assertions.assertTrue(unseenMessages.isEmpty());
verify(session1.getLink()).updateDisposition(lockTokenCaptor.capture(), deliveryStateCaptor.capture());
final String lockToken = lockTokenCaptor.getValue();
final DeliveryState deliveryState = deliveryStateCaptor.getValue();
Assertions.assertEquals(processedMessage.getLockToken(), lockToken);
Assertions.assertEquals(Accepted.getInstance(), deliveryState);
verify(session1.getLink(), times(1)).closeAsync();
verify(onTerminate, times(1)).run();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {
Expand Down