Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
53587b2
feat(servicebus): add listSessions to session receiver clients
EldertGrootenboerMS Apr 28, 2026
74c669a
fix: simplify pagination with takeWhile + map for skip tracking
EldertGrootenboerMS Apr 28, 2026
6c01e0d
fix(servicebus): address review feedback on listSessions
EldertGrootenboerMS Apr 28, 2026
c951cf5
fix(servicebus): align listSessions with Track 1 semantics + use Page…
EldertGrootenboerMS Apr 28, 2026
816475c
fix(servicebus): validate listSessions continuation tokens + spelling
EldertGrootenboerMS Apr 28, 2026
325bcec
fix(servicebus): harden listSessions response parsing + cursor encoding
EldertGrootenboerMS Apr 28, 2026
06e4fe9
test(servicebus): cover listSessions cursor + sync delegation paths
EldertGrootenboerMS Apr 28, 2026
c493bdc
fix(servicebus): drop unreachable null check in listSessions cursor l…
EldertGrootenboerMS Apr 28, 2026
90406e3
fix(servicebus): tighten listSessions argument + cursor validation
EldertGrootenboerMS Apr 28, 2026
a3ce677
fix(servicebus): validate listSessions paging args + harden response …
EldertGrootenboerMS Apr 28, 2026
1c216b5
fix(servicebus): preserve empty-string lastSessionId cursor
EldertGrootenboerMS Apr 28, 2026
e7db264
fix(servicebus): accept Iterable session-IDs payload + tighten fallba…
EldertGrootenboerMS Apr 28, 2026
82d7822
fix(servicebus): use FluxUtil.pagedFluxError for listSessions null guard
EldertGrootenboerMS Apr 29, 2026
3f5850b
test(servicebus): isolate getMessageSessionsNotFound from shared state
EldertGrootenboerMS Apr 29, 2026
a1e0b62
fix(servicebus): tighten timestamp assertion + sentinel comparison co…
EldertGrootenboerMS Apr 29, 2026
4dd5d4b
fix(servicebus): strict UTF-8 cursor decoding + clearer async-error test
EldertGrootenboerMS Apr 29, 2026
660136b
fix(servicebus): allocate fresh HttpHeaders per listSessions page
EldertGrootenboerMS Apr 29, 2026
40fa0c2
fix(servicebus): wrap listSessions page fetch in tracing span
EldertGrootenboerMS Apr 29, 2026
3b248d0
docs(servicebus): add CHANGELOG entry for listSessions feature
EldertGrootenboerMS Apr 29, 2026
1e70cc8
test(servicebus): correct getMessageSessionsNotFound rationale comment
EldertGrootenboerMS Apr 29, 2026
91bb76a
docs(servicebus): document active-messages sentinel clamp on listSess…
EldertGrootenboerMS Apr 29, 2026
c79b765
fix(servicebus): address round 21 review feedback on listSessions
EldertGrootenboerMS Apr 29, 2026
af72ac7
test(servicebus): use local copy in getMessageSessionsNoContent
EldertGrootenboerMS Apr 29, 2026
334d2c3
fix(servicebus): treat empty listSessions continuation token as end o…
EldertGrootenboerMS Apr 29, 2026
b4ba8e0
feat(servicebus): propagate caller page size to listSessions broker r…
EldertGrootenboerMS Apr 29, 2026
3bdaba6
docs(servicebus): correct listSessionsInternal byPage comment
EldertGrootenboerMS Apr 29, 2026
beedeff
docs(servicebus): clarify active-messages sentinel literal in javadoc
EldertGrootenboerMS Apr 29, 2026
f572db1
docs(servicebus): drop implementation-package link from listSessions …
EldertGrootenboerMS Apr 29, 2026
924be36
fix(servicebus): surface protocol errors and harden MessageSessionsRe…
EldertGrootenboerMS Apr 29, 2026
0e38dc0
fix(servicebus): protocol-error on unexpected sessions-ids payload type
EldertGrootenboerMS Apr 30, 2026
d5cb86a
fix(servicebus): require strict monotonic forward progress on respons…
EldertGrootenboerMS Apr 30, 2026
38eba31
test(servicebus): use OffsetDateTime.MAX in getMessageSessionsCapsYear
EldertGrootenboerMS Apr 30, 2026
0c525e0
Merge origin/main into fix/servicebus-get-message-sessions
EldertGrootenboerMS May 20, 2026
1358ba1
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer May 21, 2026
9bd4c49
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.ManagementConstants;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Base64;
import java.util.Collections;
import java.util.Objects;

import static com.azure.core.util.FluxUtil.monoError;
Expand Down Expand Up @@ -149,6 +158,15 @@
@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true)
public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable {
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiverAsyncClient.class);
Comment thread
EldertGrootenboer marked this conversation as resolved.
private static final int LIST_SESSIONS_PAGE_SIZE = 100;
/**
* Continuation-token format is {@code <decimal-skip>|<base64url-utf8(lastSessionId)>}. The
* {@code |} is safe as a separator because the URL-safe Base64 alphabet (A-Z, a-z, 0-9, '-',
* '_') does not contain it, so any byte sequence in {@code lastSessionId} survives a round
* trip without escaping.
*/
private static final char CURSOR_SEPARATOR = '|';
private static final HttpHeaders EMPTY_HEADERS = new HttpHeaders(Collections.emptyMap());
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
private final String fullyQualifiedNamespace;
private final String entityPath;
Expand Down Expand Up @@ -292,6 +310,105 @@ private Mono<ServiceBusReceiverAsyncClient> acquireSpecificOrNextSession(String
return tracer.traceMono("ServiceBus.acceptSession", acquireSessionReceiver);
}

/**
* Lists the IDs of sessions that have active messages in this entity.
*
* <p>Only sessions with active messages in the queue or subscription are returned.
* Sessions on the dead-letter queue or sessions having only a session state (but no messages)
* are not returned.</p>
*
* <p>The returned {@link PagedFlux} fetches additional pages from the broker on demand using
* cursor-based pagination (server-returned {@code skip} plus {@code lastSessionId} of the
* previous page) and terminates when the broker returns an empty page.</p>
*
* @return A {@link PagedFlux} of session ID strings.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<String> listSessions() {
// Wire value matches Track 1's SessionBrowser.MAXDATE so the broker switches into the
// active-messages mode it has historically been validated against.
return listSessionsInternal(ManagementConstants.ACTIVE_MESSAGES_SENTINEL);
}

/**
* Lists the IDs of sessions whose state was updated after the specified time.
*
* <p>The returned {@link PagedFlux} fetches additional pages from the broker on demand using
* cursor-based pagination (server-returned {@code skip} plus {@code lastSessionId} of the
* previous page) and terminates when the broker returns an empty page.</p>
*
* @param updatedAfter Only sessions whose session state was updated after this time are returned.
* @return A {@link PagedFlux} of session ID strings.
* @throws NullPointerException if {@code updatedAfter} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<String> listSessions(OffsetDateTime updatedAfter) {
if (updatedAfter == null) {
return new PagedFlux<>(() -> monoError(LOGGER, new NullPointerException("'updatedAfter' cannot be null.")));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
return listSessionsInternal(updatedAfter);
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.

private PagedFlux<String> listSessionsInternal(OffsetDateTime lastUpdatedTime) {
return new PagedFlux<>(() -> fetchSessionPage(lastUpdatedTime, 0, null), continuationToken -> {
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
if (continuationToken == null) {
return monoError(LOGGER, new IllegalArgumentException("'continuationToken' cannot be null."));
}

Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
final int separator = continuationToken.indexOf(CURSOR_SEPARATOR);
if (separator < 0) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Expected format '<skip>|<base64url(lastSessionId)>'."));
}

final int nextSkip;
try {
nextSkip = Integer.parseInt(continuationToken.substring(0, separator));
} catch (NumberFormatException ex) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Expected a numeric skip value before the '|' separator.", ex));
}

final String lastSessionId;
try {
final byte[] decoded = Base64.getUrlDecoder().decode(continuationToken.substring(separator + 1));
lastSessionId = new String(decoded, StandardCharsets.UTF_8);
} catch (IllegalArgumentException ex) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Expected base64url-encoded UTF-8 bytes after the '|' separator.", ex));
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
}

return fetchSessionPage(lastUpdatedTime, nextSkip, lastSessionId);
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
});
}

private Mono<PagedResponse<String>> fetchSessionPage(OffsetDateTime lastUpdatedTime, int skip,
String lastSessionId) {
return connectionCacheWrapper.getConnection()
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(managementNode -> managementNode.getMessageSessions(lastUpdatedTime, skip, LIST_SESSIONS_PAGE_SIZE,
lastSessionId))
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
.map(result -> {
final java.util.List<String> sessionIds = result.getSessionIds();
// Empty page terminates pagination (matches Track 1's SessionBrowser loop and the
// broker contract). Continuation token encodes the server-returned skip and the
// last session ID of the page so the next call uses the same cursor Track 1 does.
// Base64url-encode the session ID so arbitrary byte sequences (including the '|'
// separator) round-trip without escaping.
final String continuationToken;
if (sessionIds.isEmpty()) {
continuationToken = null;
} else {
final String last = sessionIds.get(sessionIds.size() - 1);
final String encoded
= Base64.getUrlEncoder().withoutPadding().encodeToString(last.getBytes(StandardCharsets.UTF_8));
continuationToken = result.getNextSkip() + String.valueOf(CURSOR_SEPARATOR) + encoded;
}
return new PagedResponseBase<Void, String>(null, 200, EMPTY_HEADERS, sessionIds, continuationToken,
null);
});
}

@Override
public void close() {
if (this.unNamedSessionManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.rest.PagedIterable;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -201,6 +203,36 @@ public ServiceBusReceiverClient acceptSession(String sessionId) {
.block();
}

/**
* Lists the IDs of sessions that have active messages in this entity.
*
* <p>The returned {@link PagedIterable} fetches additional pages from the broker on demand;
* iterate the {@code PagedIterable} (or call {@link PagedIterable#stream()}) to receive every
* session ID. Pages are fetched lazily as the iterator advances.</p>
*
* @return A {@link PagedIterable} of session ID strings.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedIterable<String> listSessions() {
return new PagedIterable<>(sessionAsyncClient.listSessions());
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

/**
* Lists the IDs of sessions whose state was updated after the specified time.
*
* <p>The returned {@link PagedIterable} fetches additional pages from the broker on demand;
* iterate the {@code PagedIterable} (or call {@link PagedIterable#stream()}) to receive every
* session ID. Pages are fetched lazily as the iterator advances.</p>
*
* @param updatedAfter Only sessions whose session state was updated after this time are returned.
* @return A {@link PagedIterable} of session ID strings.
* @throws NullPointerException if {@code updatedAfter} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedIterable<String> listSessions(OffsetDateTime updatedAfter) {
Comment thread
EldertGrootenboer marked this conversation as resolved.
return new PagedIterable<>(sessionAsyncClient.listSessions(updatedAfter));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.

@Override
public void close() {
sessionAsyncClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_ADD_RULE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_MESSAGE_SESSIONS;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_RULES;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_SESSION_STATE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_PEEK;
Expand Down Expand Up @@ -504,6 +505,103 @@ public Mono<Void> deleteRule(String ruleName) {
})).then();
}

/**
* {@inheritDoc}
*/
@Override
public Mono<MessageSessionsResult> getMessageSessions(OffsetDateTime lastUpdatedTime, int skip, int top,
String lastSessionId) {
if (lastUpdatedTime == null) {
return monoError(logger, new NullPointerException("'lastUpdatedTime' cannot be null."));
}

// Track 1's SessionBrowser uses new Date(253402300800000L) as the active-messages sentinel
// (1ms past 9999-12-31T23:59:59.999Z, i.e. 10000-01-01T00:00:00Z UTC). This is the wire
// value the broker has been validated against for years; align with it here. Any input at
// or beyond that instant (including OffsetDateTime.MAX, whose nanosecond precision and
// year-999_999_999 value would otherwise overflow java.util.Date) is clamped to it so the
// sentinel comparison and Date.from(...) both stay well-defined.
final OffsetDateTime cappedTime = lastUpdatedTime.compareTo(ManagementConstants.ACTIVE_MESSAGES_SENTINEL) > 0
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
? ManagementConstants.ACTIVE_MESSAGES_SENTINEL
: lastUpdatedTime;

return isAuthorized(OPERATION_GET_MESSAGE_SESSIONS).then(channelCache.get().flatMap(channel -> {
// No associated link name for entity-level operations
final Message message = createManagementMessage(OPERATION_GET_MESSAGE_SESSIONS, null);

final Map<String, Object> body = new HashMap<>();
body.put(ManagementConstants.LAST_UPDATED_TIME, Date.from(cappedTime.toInstant()));
body.put(ManagementConstants.SKIP, skip);
body.put(ManagementConstants.TOP, top);
if (!CoreUtils.isNullOrEmpty(lastSessionId)) {
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
body.put(ManagementConstants.LAST_SESSION_ID, lastSessionId);
}

message.setBody(new AmqpValue(body));

return sendWithVerify(channel, message, null);
})).flatMap(response -> {
final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(response);

if (statusCode == AmqpResponseCode.OK) {
// Defensive: a misbehaving broker or proxy could return a null/non-AmqpValue body.
// Treat that as an empty page rather than NPE/CCE'ing pagination.
final Object responseBody = response.getBody();
if (!(responseBody instanceof AmqpValue)) {
return Mono.just(new MessageSessionsResult(Collections.emptyList(), skip));
}

final Object value = ((AmqpValue) responseBody).getValue();
if (!(value instanceof Map)) {
return Mono.just(new MessageSessionsResult(Collections.emptyList(), skip));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

@SuppressWarnings("unchecked")
final Map<String, Object> map = (Map<String, Object>) value;
final Object sessionsObj = map.get(ManagementConstants.SESSION_IDS);

final List<String> sessionIds;
if (sessionsObj instanceof Object[]) {
final Object[] sessionArray = (Object[]) sessionsObj;
sessionIds = new ArrayList<>(sessionArray.length);
for (Object id : sessionArray) {
sessionIds.add(String.valueOf(id));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
} else {
sessionIds = Collections.emptyList();
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
}

return Mono.just(new MessageSessionsResult(sessionIds, readResponseSkip(map, skip, sessionIds.size())));
} else if (statusCode == AmqpResponseCode.NO_CONTENT) {
return Mono.just(new MessageSessionsResult(Collections.emptyList(), skip));
} else if (statusCode == AmqpResponseCode.NOT_FOUND) {
// 404 + SessionNotFound means no sessions exist. sendWithVerify already passes
// this through as a successful response rather than an error.
return Mono.just(new MessageSessionsResult(Collections.emptyList(), skip));
} else {
final String statusDescription = RequestResponseUtils.getStatusDescription(response);
throw logger.logExceptionAsWarning(new AmqpException(true,
"Get message sessions failed. Status: " + statusCode + " Description: " + statusDescription,
getErrorContext()));
}
});
}

/**
* Reads the {@code skip} value the service returns alongside the session-ID page. Track 1 uses
* this server-returned value as the cursor for the next page rather than {@code currentSkip +
* page.size()}, so callers must propagate it verbatim. If the field is missing or non-numeric,
* advance by the size of the page actually returned so a malformed response cannot stall the
* cursor on the same skip value.
*/
private static int readResponseSkip(Map<String, Object> responseBody, int requestSkip, int pageSize) {
final Object value = responseBody.get(ManagementConstants.SKIP);
if (value instanceof Number) {
return ((Number) value).intValue();
}
return requestSkip + pageSize;
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import com.azure.core.amqp.implementation.AmqpConstants;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;

/**
* Constants which is used for management calls to support operations for example renewlock, schedule, defer etc.
*/
Expand All @@ -25,6 +28,25 @@ public class ManagementConstants {
public static final String SEQUENCE_NUMBERS = "sequence-numbers";
public static final String SESSION_ID = "session-id";
public static final String SESSION_STATE = "session-state";
/**
* Wire key for the session-ID array in the {@code OPERATION_GET_MESSAGE_SESSIONS} response. The
* field name ("sessions-ids") is fixed by the broker contract; the constant name omits the extra
* "s" so it reads consistently with {@link #SESSION_ID} and {@link #SEQUENCE_NUMBERS}.
*/
public static final String SESSION_IDS = "sessions-ids";

/**
* Sentinel timestamp the broker recognizes as "list sessions with active messages" mode for the
* {@code OPERATION_GET_MESSAGE_SESSIONS} operation. Matches Track 1's
* {@code SessionBrowser.MAXDATE = new Date(253402300800000L)} ({@code 10000-01-01T00:00:00Z}
* UTC); using any other value risks the broker not switching into the proven mode. Defined as
* {@link OffsetDateTime} so callers and the implementation can clamp inputs via
* {@link OffsetDateTime#compareTo} without each owning their own copy.
*/
Comment thread
EldertGrootenboer marked this conversation as resolved.
public static final OffsetDateTime ACTIVE_MESSAGES_SENTINEL
= OffsetDateTime.of(10000, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
public static final String LAST_UPDATED_TIME = "last-updated-time";
public static final String LAST_SESSION_ID = "last-session-id";
Comment thread
EldertGrootenboer marked this conversation as resolved.
public static final String VIA_PARTITION_KEY = "via-partition-key";
public static final String RULE_NAME = "rule-name";
public static final String RULE_DESCRIPTION = "rule-description";
Expand Down Expand Up @@ -61,6 +83,7 @@ public class ManagementConstants {
static final String OPERATION_ADD_RULE = AmqpConstants.VENDOR + ":add-rule";
static final String OPERATION_REMOVE_RULE = AmqpConstants.VENDOR + ":remove-rule";
static final String OPERATION_GET_RULES = AmqpConstants.VENDOR + ":enumerate-rules";
static final String OPERATION_GET_MESSAGE_SESSIONS = AmqpConstants.VENDOR + ":get-message-sessions";

static final String SERVER_TIMEOUT = AmqpConstants.VENDOR + ":server-timeout";

Expand Down
Loading
Loading