Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
53587b2
feat(servicebus): add listSessions to session receiver clients
EldertGrootenboerMS Apr 28, 2026
74c669a
fix: simplify pagination with takeWhile + map for skip tracking
EldertGrootenboerMS Apr 28, 2026
6c01e0d
fix(servicebus): address review feedback on listSessions
EldertGrootenboerMS Apr 28, 2026
c951cf5
fix(servicebus): align listSessions with Track 1 semantics + use Page…
EldertGrootenboerMS Apr 28, 2026
816475c
fix(servicebus): validate listSessions continuation tokens + spelling
EldertGrootenboerMS Apr 28, 2026
325bcec
fix(servicebus): harden listSessions response parsing + cursor encoding
EldertGrootenboerMS Apr 28, 2026
06e4fe9
test(servicebus): cover listSessions cursor + sync delegation paths
EldertGrootenboerMS Apr 28, 2026
c493bdc
fix(servicebus): drop unreachable null check in listSessions cursor l…
EldertGrootenboerMS Apr 28, 2026
90406e3
fix(servicebus): tighten listSessions argument + cursor validation
EldertGrootenboerMS Apr 28, 2026
a3ce677
fix(servicebus): validate listSessions paging args + harden response …
EldertGrootenboerMS Apr 28, 2026
1c216b5
fix(servicebus): preserve empty-string lastSessionId cursor
EldertGrootenboerMS Apr 28, 2026
e7db264
fix(servicebus): accept Iterable session-IDs payload + tighten fallba…
EldertGrootenboerMS Apr 28, 2026
82d7822
fix(servicebus): use FluxUtil.pagedFluxError for listSessions null guard
EldertGrootenboerMS Apr 29, 2026
3f5850b
test(servicebus): isolate getMessageSessionsNotFound from shared state
EldertGrootenboerMS Apr 29, 2026
a1e0b62
fix(servicebus): tighten timestamp assertion + sentinel comparison co…
EldertGrootenboerMS Apr 29, 2026
4dd5d4b
fix(servicebus): strict UTF-8 cursor decoding + clearer async-error test
EldertGrootenboerMS Apr 29, 2026
660136b
fix(servicebus): allocate fresh HttpHeaders per listSessions page
EldertGrootenboerMS Apr 29, 2026
40fa0c2
fix(servicebus): wrap listSessions page fetch in tracing span
EldertGrootenboerMS Apr 29, 2026
3b248d0
docs(servicebus): add CHANGELOG entry for listSessions feature
EldertGrootenboerMS Apr 29, 2026
1e70cc8
test(servicebus): correct getMessageSessionsNotFound rationale comment
EldertGrootenboerMS Apr 29, 2026
91bb76a
docs(servicebus): document active-messages sentinel clamp on listSess…
EldertGrootenboerMS Apr 29, 2026
c79b765
fix(servicebus): address round 21 review feedback on listSessions
EldertGrootenboerMS Apr 29, 2026
af72ac7
test(servicebus): use local copy in getMessageSessionsNoContent
EldertGrootenboerMS Apr 29, 2026
334d2c3
fix(servicebus): treat empty listSessions continuation token as end o…
EldertGrootenboerMS Apr 29, 2026
b4ba8e0
feat(servicebus): propagate caller page size to listSessions broker r…
EldertGrootenboerMS Apr 29, 2026
3bdaba6
docs(servicebus): correct listSessionsInternal byPage comment
EldertGrootenboerMS Apr 29, 2026
beedeff
docs(servicebus): clarify active-messages sentinel literal in javadoc
EldertGrootenboerMS Apr 29, 2026
f572db1
docs(servicebus): drop implementation-package link from listSessions …
EldertGrootenboerMS Apr 29, 2026
924be36
fix(servicebus): surface protocol errors and harden MessageSessionsRe…
EldertGrootenboerMS Apr 29, 2026
0e38dc0
fix(servicebus): protocol-error on unexpected sessions-ids payload type
EldertGrootenboerMS Apr 30, 2026
d5cb86a
fix(servicebus): require strict monotonic forward progress on respons…
EldertGrootenboerMS Apr 30, 2026
38eba31
test(servicebus): use OffsetDateTime.MAX in getMessageSessionsCapsYear
EldertGrootenboerMS Apr 30, 2026
0c525e0
Merge origin/main into fix/servicebus-get-message-sessions
EldertGrootenboerMS May 20, 2026
1358ba1
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer May 21, 2026
9bd4c49
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Objects;

import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions;

Expand Down Expand Up @@ -292,6 +296,55 @@ private Mono<ServiceBusReceiverAsyncClient> acquireSpecificOrNextSession(String
return tracer.traceMono("ServiceBus.acceptSession", acquireSessionReceiver);
}

/**
* Lists the IDs of sessions that have active messages in this entity.
*
* <p>Only sessions with active messages in the queue or subscription are returned.
* Sessions on the dead-letter queue or sessions having only a session state (but no messages)
* are not returned.</p>
*
* @return A {@link Flux} of session ID strings.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<String> listSessions() {
// Use year 9999 as sentinel for "active messages" mode.
// Cannot use OffsetDateTime.MAX because its year (999999999) overflows java.util.Date.
return listSessionsInternal(OffsetDateTime.of(9999, 12, 31, 23, 59, 59, 999999999, ZoneOffset.UTC));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

/**
* Lists the IDs of sessions whose state was updated after the specified time.
*
* @param updatedAfter Only sessions whose session state was updated after this time are returned.
* @return A {@link Flux} of session ID strings.
* @throws NullPointerException if {@code updatedAfter} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<String> listSessions(OffsetDateTime updatedAfter) {
if (updatedAfter == null) {
return fluxError(LOGGER, new NullPointerException("'updatedAfter' cannot be null."));
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
return listSessionsInternal(updatedAfter);
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.

private Flux<String> listSessionsInternal(OffsetDateTime lastUpdatedTime) {
final int pageSize = 100;
return connectionCacheWrapper.getConnection()
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMapMany(managementNode -> {
final int[] currentSkip = { 0 };
return Flux
.defer(() -> managementNode.getMessageSessions(lastUpdatedTime, currentSkip[0], pageSize, null))
.repeat()
.takeWhile(page -> !page.isEmpty())
.map(page -> {
currentSkip[0] += page.size();
return page;
})
.flatMapIterable(page -> page);
});
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
}

@Override
public void close() {
if (this.unNamedSessionManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -201,6 +203,28 @@ public ServiceBusReceiverClient acceptSession(String sessionId) {
.block();
}

/**
* Lists the IDs of sessions that have active messages in this entity.
*
* @return A list of session ID strings.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public List<String> listSessions() {
return sessionAsyncClient.listSessions().collectList().block(operationTimeout);
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

/**
* Lists the IDs of sessions whose state was updated after the specified time.
*
* @param updatedAfter Only sessions whose session state was updated after this time are returned.
* @return A list of session ID strings.
* @throws NullPointerException if {@code updatedAfter} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public List<String> listSessions(OffsetDateTime updatedAfter) {
return sessionAsyncClient.listSessions(updatedAfter).collectList().block(operationTimeout);
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.

@Override
public void close() {
sessionAsyncClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_ADD_RULE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_MESSAGE_SESSIONS;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_RULES;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_SESSION_STATE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_PEEK;
Expand Down Expand Up @@ -504,6 +505,77 @@ public Mono<Void> deleteRule(String ruleName) {
})).then();
}

/**
* {@inheritDoc}
*/
@Override
public Mono<List<String>> getMessageSessions(OffsetDateTime lastUpdatedTime, int skip, int top,
String lastSessionId) {
if (lastUpdatedTime == null) {
return monoError(logger, new NullPointerException("'lastUpdatedTime' cannot be null."));
}

// The service checks `lastUpdatedTime == DateTime.MaxValue` (C# 9999-12-31T23:59:59.9999999)
// to switch between "active messages" mode and "updated since" mode. On the AMQP wire,
// timestamps have millisecond precision, so DateTime.MaxValue becomes 253402300799999 ms.
// Year 9999 with .999999999 nanos in UTC produces this same ms value via Date.from(instant),
// ensuring the service-side comparison matches. Cap to the max supported instant in UTC to
// avoid java.util.Date overflow (OffsetDateTime.MAX year 999999999 exceeds Date's range)
// while keeping the serialized sentinel stable even when the source offset is non-UTC.
final OffsetDateTime cappedTime = lastUpdatedTime.getYear() > 9999
? OffsetDateTime.of(9999, 12, 31, 23, 59, 59, 999999999, ZoneOffset.UTC)
: lastUpdatedTime;
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

return isAuthorized(OPERATION_GET_MESSAGE_SESSIONS).then(channelCache.get().flatMap(channel -> {
// No associated link name for entity-level operations
final Message message = createManagementMessage(OPERATION_GET_MESSAGE_SESSIONS, null);

final Map<String, Object> body = new HashMap<>();
body.put(ManagementConstants.LAST_UPDATED_TIME, Date.from(cappedTime.toInstant()));
body.put(ManagementConstants.SKIP, skip);
body.put(ManagementConstants.TOP, top);
if (!CoreUtils.isNullOrEmpty(lastSessionId)) {
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
body.put(ManagementConstants.LAST_SESSION_ID, lastSessionId);
}

message.setBody(new AmqpValue(body));

return sendWithVerify(channel, message, null);
})).flatMap(response -> {
final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(response);

if (statusCode == AmqpResponseCode.OK) {
final Object value = ((AmqpValue) response.getBody()).getValue();
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
if (value instanceof Map) {
@SuppressWarnings("unchecked")
final Map<String, Object> map = (Map<String, Object>) value;
final Object sessionsObj = map.get(ManagementConstants.SESSIONS_IDS);

if (sessionsObj instanceof Object[]) {
final Object[] sessionArray = (Object[]) sessionsObj;
final List<String> sessionIds = new ArrayList<>(sessionArray.length);
for (Object id : sessionArray) {
sessionIds.add(String.valueOf(id));
}
return Mono.just(sessionIds);
}
}
return Mono.just(Collections.emptyList());
} else if (statusCode == AmqpResponseCode.NO_CONTENT) {
return Mono.just(Collections.emptyList());
} else if (statusCode == AmqpResponseCode.NOT_FOUND) {
// 404 + SessionNotFound means no sessions exist. sendWithVerify already passes
// this through as a successful response rather than an error.
return Mono.just(Collections.emptyList());
} else {
final String statusDescription = RequestResponseUtils.getStatusDescription(response);
throw logger.logExceptionAsWarning(new AmqpException(true,
"Get message sessions failed. Status: " + statusCode + " Description: " + statusDescription,
getErrorContext()));
}
});
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class ManagementConstants {
public static final String SEQUENCE_NUMBERS = "sequence-numbers";
public static final String SESSION_ID = "session-id";
public static final String SESSION_STATE = "session-state";
public static final String SESSIONS_IDS = "sessions-ids";
public static final String LAST_UPDATED_TIME = "last-updated-time";
public static final String LAST_SESSION_ID = "last-session-id";
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
public static final String VIA_PARTITION_KEY = "via-partition-key";
public static final String RULE_NAME = "rule-name";
public static final String RULE_DESCRIPTION = "rule-description";
Expand Down Expand Up @@ -61,6 +64,7 @@ public class ManagementConstants {
static final String OPERATION_ADD_RULE = AmqpConstants.VENDOR + ":add-rule";
static final String OPERATION_REMOVE_RULE = AmqpConstants.VENDOR + ":remove-rule";
static final String OPERATION_GET_RULES = AmqpConstants.VENDOR + ":enumerate-rules";
static final String OPERATION_GET_MESSAGE_SESSIONS = AmqpConstants.VENDOR + ":get-message-sessions";

static final String SERVER_TIMEOUT = AmqpConstants.VENDOR + ":server-timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ Mono<Void> updateDisposition(String lockToken, DispositionStatus dispositionStat
*/
Flux<RuleProperties> listRules();

/**
* Lists the session IDs for sessions that have active messages or whose state was updated
* since the given time.
*
* @param lastUpdatedTime Filter timestamp. To get sessions with active messages, pass the UTC sentinel
* {@code 9999-12-31T23:59:59.999999999Z}; pass a real timestamp to get sessions updated since that
* time. The implementation caps {@code lastUpdatedTime} values beyond year 9999 (such as
* {@link java.time.OffsetDateTime#MAX}) to that UTC sentinel so the service-side sentinel
* comparison matches.
* @param skip Pagination offset.
* @param top Page size.
* @param lastSessionId Last session ID from the previous page (for cursor-based pagination), or null.
* @return A list of session ID strings for this page.
*/
Mono<List<String>> getMessageSessions(OffsetDateTime lastUpdatedTime, int skip, int top, String lastSessionId);

@Override
void close();
}
Loading
Loading