Skip to content
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
53587b2
feat(servicebus): add listSessions to session receiver clients
EldertGrootenboerMS Apr 28, 2026
74c669a
fix: simplify pagination with takeWhile + map for skip tracking
EldertGrootenboerMS Apr 28, 2026
6c01e0d
fix(servicebus): address review feedback on listSessions
EldertGrootenboerMS Apr 28, 2026
c951cf5
fix(servicebus): align listSessions with Track 1 semantics + use Page…
EldertGrootenboerMS Apr 28, 2026
816475c
fix(servicebus): validate listSessions continuation tokens + spelling
EldertGrootenboerMS Apr 28, 2026
325bcec
fix(servicebus): harden listSessions response parsing + cursor encoding
EldertGrootenboerMS Apr 28, 2026
06e4fe9
test(servicebus): cover listSessions cursor + sync delegation paths
EldertGrootenboerMS Apr 28, 2026
c493bdc
fix(servicebus): drop unreachable null check in listSessions cursor l…
EldertGrootenboerMS Apr 28, 2026
90406e3
fix(servicebus): tighten listSessions argument + cursor validation
EldertGrootenboerMS Apr 28, 2026
a3ce677
fix(servicebus): validate listSessions paging args + harden response …
EldertGrootenboerMS Apr 28, 2026
1c216b5
fix(servicebus): preserve empty-string lastSessionId cursor
EldertGrootenboerMS Apr 28, 2026
e7db264
fix(servicebus): accept Iterable session-IDs payload + tighten fallba…
EldertGrootenboerMS Apr 28, 2026
82d7822
fix(servicebus): use FluxUtil.pagedFluxError for listSessions null guard
EldertGrootenboerMS Apr 29, 2026
3f5850b
test(servicebus): isolate getMessageSessionsNotFound from shared state
EldertGrootenboerMS Apr 29, 2026
a1e0b62
fix(servicebus): tighten timestamp assertion + sentinel comparison co…
EldertGrootenboerMS Apr 29, 2026
4dd5d4b
fix(servicebus): strict UTF-8 cursor decoding + clearer async-error test
EldertGrootenboerMS Apr 29, 2026
660136b
fix(servicebus): allocate fresh HttpHeaders per listSessions page
EldertGrootenboerMS Apr 29, 2026
40fa0c2
fix(servicebus): wrap listSessions page fetch in tracing span
EldertGrootenboerMS Apr 29, 2026
3b248d0
docs(servicebus): add CHANGELOG entry for listSessions feature
EldertGrootenboerMS Apr 29, 2026
1e70cc8
test(servicebus): correct getMessageSessionsNotFound rationale comment
EldertGrootenboerMS Apr 29, 2026
91bb76a
docs(servicebus): document active-messages sentinel clamp on listSess…
EldertGrootenboerMS Apr 29, 2026
c79b765
fix(servicebus): address round 21 review feedback on listSessions
EldertGrootenboerMS Apr 29, 2026
af72ac7
test(servicebus): use local copy in getMessageSessionsNoContent
EldertGrootenboerMS Apr 29, 2026
334d2c3
fix(servicebus): treat empty listSessions continuation token as end o…
EldertGrootenboerMS Apr 29, 2026
b4ba8e0
feat(servicebus): propagate caller page size to listSessions broker r…
EldertGrootenboerMS Apr 29, 2026
3bdaba6
docs(servicebus): correct listSessionsInternal byPage comment
EldertGrootenboerMS Apr 29, 2026
beedeff
docs(servicebus): clarify active-messages sentinel literal in javadoc
EldertGrootenboerMS Apr 29, 2026
f572db1
docs(servicebus): drop implementation-package link from listSessions …
EldertGrootenboerMS Apr 29, 2026
924be36
fix(servicebus): surface protocol errors and harden MessageSessionsRe…
EldertGrootenboerMS Apr 29, 2026
0e38dc0
fix(servicebus): protocol-error on unexpected sessions-ids payload type
EldertGrootenboerMS Apr 30, 2026
d5cb86a
fix(servicebus): require strict monotonic forward progress on respons…
EldertGrootenboerMS Apr 30, 2026
38eba31
test(servicebus): use OffsetDateTime.MAX in getMessageSessionsCapsYear
EldertGrootenboerMS Apr 30, 2026
0c525e0
Merge origin/main into fix/servicebus-get-message-sessions
EldertGrootenboerMS May 20, 2026
1358ba1
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer May 21, 2026
9bd4c49
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Added `listSessions()` and `listSessions(OffsetDateTime updatedAfter)` to `ServiceBusSessionReceiverAsyncClient` (returning `PagedFlux<String>`) and `ServiceBusSessionReceiverClient` (returning `PagedIterable<String>`). The no-arg overload returns sessions with active messages; the `updatedAfter` overload returns sessions whose state was updated after the given timestamp. Implements the `com.microsoft:get-message-sessions` AMQP management operation, exposing parity with Track 1's `IMessageSessionEntity.getMessageSessions()`. ([#48956](https://github.com/Azure/azure-sdk-for-java/pull/48956))

### Breaking Changes

### Bugs Fixed
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,30 @@
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.ManagementConstants;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Base64;
import java.util.Objects;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.pagedFluxError;
import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions;

/**
Expand Down Expand Up @@ -149,6 +161,14 @@
@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true)
public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable {
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiverAsyncClient.class);
Comment thread
EldertGrootenboer marked this conversation as resolved.
private static final int LIST_SESSIONS_DEFAULT_PAGE_SIZE = 100;
/**
* Continuation-token format is {@code <decimal-skip>|<base64url-utf8(lastSessionId)>}. The
* {@code |} is safe as a separator because the URL-safe Base64 alphabet (A-Z, a-z, 0-9, '-',
* '_') does not contain it, so any byte sequence in {@code lastSessionId} survives a round
* trip without escaping.
*/
private static final char CURSOR_SEPARATOR = '|';

private final String fullyQualifiedNamespace;
private final String entityPath;
Expand Down Expand Up @@ -292,6 +312,157 @@ private Mono<ServiceBusReceiverAsyncClient> acquireSpecificOrNextSession(String
return tracer.traceMono("ServiceBus.acceptSession", acquireSessionReceiver);
}

/**
* Lists the IDs of sessions that have active messages in this entity.
*
* <p>Only sessions with active messages in the queue or subscription are returned.
* Sessions on the dead-letter queue or sessions having only a session state (but no messages)
* are not returned.</p>
*
* <p>The returned {@link PagedFlux} fetches additional pages from the broker on demand using
* cursor-based pagination (server-returned {@code skip} plus {@code lastSessionId} of the
* previous page) and terminates when the broker returns an empty page. The default page size
* is 100; callers can request a different size via
* {@link PagedFlux#byPage(int)}.</p>
*
* @return A {@link PagedFlux} of session ID strings.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<String> listSessions() {
// Wire value matches Track 1's SessionBrowser.MAXDATE so the broker switches into the
// active-messages mode it has historically been validated against.
return listSessionsInternal(ManagementConstants.ACTIVE_MESSAGES_SENTINEL);
}

/**
* Lists the IDs of sessions whose state was updated after the specified time.
*
* <p>The returned {@link PagedFlux} fetches additional pages from the broker on demand using
* cursor-based pagination (server-returned {@code skip} plus {@code lastSessionId} of the
* previous page) and terminates when the broker returns an empty page. The default page size
* is 100; callers can request a different size via
* {@link PagedFlux#byPage(int)}.</p>
*
* <p>Values at or beyond the active-messages sentinel value
* ({@code new Date(253402300800000L)}, rendered by {@code OffsetDateTime.toString()} as
* {@code +10000-01-01T00:00Z}, matching Track 1's {@code SessionBrowser.MAXDATE}) are clamped
* to that sentinel and behave the same as {@link #listSessions()}, returning sessions that
* have active messages.</p>
*
* @param updatedAfter Only sessions whose session state was updated after this time are returned.
* @return A {@link PagedFlux} of session ID strings.
* @throws NullPointerException if {@code updatedAfter} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<String> listSessions(OffsetDateTime updatedAfter) {
if (updatedAfter == null) {
return pagedFluxError(LOGGER, new NullPointerException("'updatedAfter' cannot be null."));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
return listSessionsInternal(updatedAfter);
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.

private PagedFlux<String> listSessionsInternal(OffsetDateTime lastUpdatedTime) {
// Use the page-size-aware PagedFlux constructor so a caller's byPage(int) value flows
// through to the management request's `top` parameter. When the caller doesn't request a
// specific page size, pageSize is null and we fall back to the default. The first lambda
// is the first-page retriever. The next-page retriever may also be invoked directly when
// a caller starts from a continuation token via byPage(token) without going through any
// previous PagedResponse, so it must validate the token it receives. Note: in azure-core,
// byPage(null) returns Flux.empty() rather than routing to the first-page retriever, so a
// null continuation token never reaches this lambda.
return new PagedFlux<>(pageSize -> fetchSessionPage(lastUpdatedTime, 0, null, resolvePageSize(pageSize)),
(continuationToken, pageSize) -> {
// Treat an empty continuation token as "no more pages", matching
// ServiceBusAdministrationAsyncClient.listQueuesNextPage / listRulesNextPage and
// the wider Azure SDK paging convention. This is tolerant of callers that persist
// the token to storage and read back an empty string.
if (continuationToken.isEmpty()) {
return Mono.empty();
}
final int separator = continuationToken.indexOf(CURSOR_SEPARATOR);
if (separator < 0) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Expected format '<skip>|<base64url(lastSessionId)>'."));
}

final int nextSkip;
try {
nextSkip = Integer.parseInt(continuationToken.substring(0, separator));
} catch (NumberFormatException ex) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Expected a numeric skip value before the '|' separator.", ex));
}
if (nextSkip < 0) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Skip value must be non-negative; got " + nextSkip + "."));
}

final String lastSessionId;
try {
final byte[] decoded = Base64.getUrlDecoder().decode(continuationToken.substring(separator + 1));
// Strict UTF-8 decoding: a token whose payload base64-decodes cleanly but isn't valid
// UTF-8 must be rejected, otherwise new String(decoded, UTF_8) silently substitutes
// U+FFFD and we'd send a corrupted session ID to the broker as the cursor.
lastSessionId = StandardCharsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPORT)
.decode(ByteBuffer.wrap(decoded))
.toString();
} catch (IllegalArgumentException ex) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Expected base64url-encoded UTF-8 bytes after the '|' separator.",
ex));
} catch (CharacterCodingException ex) {
return monoError(LOGGER, new IllegalArgumentException(
"Invalid continuation token. Decoded bytes after the '|' separator are not valid UTF-8.", ex));
}

return fetchSessionPage(lastUpdatedTime, nextSkip, lastSessionId, resolvePageSize(pageSize));
});
}

/**
* Resolves the per-page request size: a positive caller-supplied value (via {@code byPage(int)})
* is honored, anything else (null or non-positive) falls back to the default.
*/
private static int resolvePageSize(Integer requested) {
return requested != null && requested > 0 ? requested : LIST_SESSIONS_DEFAULT_PAGE_SIZE;
}

private Mono<PagedResponse<String>> fetchSessionPage(OffsetDateTime lastUpdatedTime, int skip, String lastSessionId,
int pageSize) {
// Wrap each page fetch in a tracing span so distributed traces show one
// "ServiceBus.listSessions" span per page, matching the tracing pattern used by
// acceptSession/acceptNextSession in this client.
final Mono<PagedResponse<String>> pageMono = connectionCacheWrapper.getConnection()
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(
managementNode -> managementNode.getMessageSessions(lastUpdatedTime, skip, pageSize, lastSessionId))
.map(result -> {
final java.util.List<String> sessionIds = result.getSessionIds();
// Empty page terminates pagination (matches Track 1's SessionBrowser loop and the
// broker contract). Continuation token encodes the server-returned skip and the
// last session ID of the page so the next call uses the same cursor Track 1 does.
// Base64url-encode the session ID so arbitrary byte sequences (including the '|'
// separator) round-trip without escaping.
final String continuationToken;
if (sessionIds.isEmpty()) {
continuationToken = null;
} else {
final String last = sessionIds.get(sessionIds.size() - 1);
final String encoded
= Base64.getUrlEncoder().withoutPadding().encodeToString(last.getBytes(StandardCharsets.UTF_8));
continuationToken = result.getNextSkip() + String.valueOf(CURSOR_SEPARATOR) + encoded;
}
// Allocate a fresh HttpHeaders per page so callers cannot mutate a shared
// instance (HttpHeaders is mutable and PagedResponseBase exposes the reference
// via getHeaders()).
return new PagedResponseBase<Void, String>(null, 200, new HttpHeaders(), sessionIds, continuationToken,
null);
});
return tracer.traceMono("ServiceBus.listSessions", pageMono);
}

@Override
public void close() {
if (this.unNamedSessionManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.rest.PagedIterable;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -201,6 +203,47 @@ public ServiceBusReceiverClient acceptSession(String sessionId) {
.block();
}

/**
* Lists the IDs of sessions that have active messages in this entity.
*
* <p>The returned {@link PagedIterable} fetches additional pages from the broker on demand;
* iterate the {@code PagedIterable} (or call {@link PagedIterable#stream()}) to receive every
* session ID. Pages are fetched lazily as the iterator advances. The default page size is 100;
* callers can request a different size via {@link PagedIterable#iterableByPage(int)} (or the
* equivalent on the underlying {@code PagedFlux}).</p>
*
* @return A {@link PagedIterable} of session ID strings.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedIterable<String> listSessions() {
return new PagedIterable<>(sessionAsyncClient.listSessions());
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

/**
* Lists the IDs of sessions whose state was updated after the specified time.
*
* <p>The returned {@link PagedIterable} fetches additional pages from the broker on demand;
* iterate the {@code PagedIterable} (or call {@link PagedIterable#stream()}) to receive every
* session ID. Pages are fetched lazily as the iterator advances. The default page size is 100;
* callers can request a different size via {@link PagedIterable#iterableByPage(int)} (or the
* equivalent on the underlying {@code PagedFlux}).</p>
*
* <p>Values at or beyond the active-messages sentinel value
* ({@code new Date(253402300800000L)}, rendered by {@code OffsetDateTime.toString()} as
* {@code +10000-01-01T00:00Z}, matching Track 1's {@code SessionBrowser.MAXDATE}) are clamped
* to that sentinel and behave the same as {@link #listSessions()}, returning sessions that
* have active messages.</p>
*
* @param updatedAfter Only sessions whose session state was updated after this time are returned.
* @return A {@link PagedIterable} of session ID strings.
* @throws NullPointerException if {@code updatedAfter} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedIterable<String> listSessions(OffsetDateTime updatedAfter) {
Comment thread
EldertGrootenboer marked this conversation as resolved.
Objects.requireNonNull(updatedAfter, "'updatedAfter' cannot be null.");
return new PagedIterable<>(sessionAsyncClient.listSessions(updatedAfter));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.

@Override
public void close() {
sessionAsyncClient.close();
Expand Down
Loading
Loading