Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def list_sessions_op( # pylint: disable=inconsistent-return-statements
return parsed
if status_code in [202, 204]:
return []
# 404 + SessionNotFound means no sessions exist for this entity.
if status_code == 404:
return []
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

amqp_transport.handle_amqp_mgmt_error(_LOGGER, "List sessions failed.", condition, description, status_code)
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------
# pylint: disable=client-method-missing-tracing-decorator
from typing import Any, Union, Optional, TYPE_CHECKING, Type
from datetime import datetime
Comment thread
EldertGrootenboer marked this conversation as resolved.
import logging
import warnings
from weakref import WeakSet
Expand All @@ -17,6 +18,7 @@
)
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._session_browser import _SessionBrowser
from ._common.auto_lock_renewer import AutoLockRenewer
from ._common._configuration import Configuration
from ._common.utils import (
Expand Down Expand Up @@ -699,3 +701,90 @@ def get_subscription_receiver(
)
self._handlers.add(handler)
return handler

def _create_session_browser(self, entity_name, subscription_name=None, **kwargs):
"""Create an internal _SessionBrowser for management-only operations."""
browser = _SessionBrowser(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
ssl_context=self._ssl_context,
amqp_transport=self._amqp_transport,
use_tls=self._config.use_tls,
subscription_name=subscription_name,
**kwargs,
)
return browser

def list_queue_sessions(
self,
queue_name: str,
*,
updated_since: Optional["datetime"] = None,
timeout: Optional[float] = None,
**kwargs: Any,
):
# type: (...) -> list[str]
"""List session IDs with active messages in a session-enabled queue.

If ``updated_since`` is specified, only sessions whose state was updated
after that time are returned. If not specified, returns sessions with
active messages in the queue.

Comment thread
EldertGrootenboer marked this conversation as resolved.
:param str queue_name: The name of the session-enabled queue.
:keyword ~datetime.datetime updated_since: If specified, only sessions whose state was
updated after this time are returned.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
browser = self._create_session_browser(queue_name)
try:
return browser.list_sessions(updated_since=updated_since, timeout=timeout)
finally:
browser.close()

def list_subscription_sessions(
self,
topic_name: str,
subscription_name: str,
*,
Comment thread
EldertGrootenboer marked this conversation as resolved.
updated_since: Optional["datetime"] = None,
timeout: Optional[float] = None,
**kwargs: Any,
):
# type: (...) -> list[str]
"""List session IDs with active messages in a session-enabled subscription.

If ``updated_since`` is specified, only sessions whose state was updated
after that time are returned. If not specified, returns sessions with
active messages in the subscription.

:param str topic_name: The name of the topic.
Comment thread
EldertGrootenboer marked this conversation as resolved.
:param str subscription_name: The name of the subscription.
:keyword ~datetime.datetime updated_since: If specified, only sessions whose state was
updated after this time are returned.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
browser = self._create_session_browser(topic_name, subscription_name=subscription_name)
try:
return browser.list_sessions(updated_since=updated_since, timeout=timeout)
finally:
browser.close()
135 changes: 135 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_session_browser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import time
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, List, Optional, Union

from ._base_handler import BaseHandler
from ._common.utils import create_authentication
from ._common.constants import (
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
)
from ._common import mgmt_handlers
from ._pyamqp.types import AMQPTypes, TYPE, VALUE

_LOGGER = logging.getLogger(__name__)

# The service checks for DateTime.MaxValue (C# 9999-12-31T23:59:59.9999999) to switch
# between "active messages" mode and "updated since" mode. On the AMQP wire, timestamps
# have millisecond precision, so DateTime.MaxValue becomes 253402300799999 ms from epoch.
# Python's datetime.timestamp() float math rounds this up by 1ms, so we use the
# pre-computed constant directly for the sentinel.
_MAX_DATETIME_MS = 253402300799999
_MAX_DATETIME = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc)
_PAGE_SIZE = 100


def _amqp_int_value(value):
return {TYPE: AMQPTypes.int, VALUE: value}


def _amqp_timestamp_value(value):
"""Convert a datetime to an AMQP timestamp (milliseconds since epoch)."""
return {TYPE: AMQPTypes.timestamp, VALUE: int(value.timestamp() * 1000)}

Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

class _SessionBrowser(BaseHandler):
"""Internal handler that opens an AMQP connection for management-only operations.

Unlike ServiceBusSender/ServiceBusReceiver, this does NOT create a sender or
receiver link. It only opens a connection and authenticates, then sends
management requests to the $management endpoint.

Used for entity-level management operations like get-message-sessions where
a sender or receiver link is not needed.
"""

def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs):
super().__init__(
fully_qualified_namespace=fully_qualified_namespace,
entity_name=entity_name,
credential=credential,
**kwargs,
)
self._auth_uri = f"sb://{self.fully_qualified_namespace}/{self._entity_name}"
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
self._error_policy = self._amqp_transport.create_retry_policy(self._config)
self._name = f"SBSessionBrowser-{uuid.uuid4()}"
self._connection = kwargs.get("connection")

def _create_handler(self, auth):
self._handler = self._amqp_transport.create_mgmt_client(
config=self._config,
auth=auth,
properties=self._properties,
retry_policy=self._error_policy,
client_name=self._name,
)

def _open(self):
if self._running:
return
if self._handler:
self._handler.close()

auth = None if self._connection else create_authentication(self)
self._create_handler(auth)
try:
self._handler.open(connection=self._connection)
while not self._handler.client_ready():
time.sleep(0.05)
self._running = True
except:
self._close_handler()
raise

def list_sessions(
self,
*,
updated_since: Optional[datetime] = None,
timeout: Optional[float] = None,
) -> List[str]:
"""List session IDs for this entity.

:keyword ~datetime.datetime updated_since: If specified, only sessions whose state was
updated after this time are returned. If not specified, returns sessions with
active messages in the entity.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
# DateTime.MaxValue triggers "active messages" mode on the service side.
# A real timestamp triggers "sessions updated since" mode.
use_sentinel = updated_since is None
last_updated_time_ms = (
_MAX_DATETIME_MS if use_sentinel
else int(updated_since.timestamp() * 1000)
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
)
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated

all_session_ids: List[str] = []
skip = 0

while True:
message = {
b"last-updated-time": {TYPE: AMQPTypes.timestamp, VALUE: last_updated_time_ms},
b"skip": _amqp_int_value(skip),
b"top": _amqp_int_value(_PAGE_SIZE),
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
result = self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
message,
mgmt_handlers.list_sessions_op,
keep_alive_associated_link=False,
timeout=timeout,
)
if not result:
break
all_session_ids.extend(result)
if len(result) < _PAGE_SIZE:
break
skip += len(result)

return all_session_ids
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .._pyamqp import (
utils,
AMQPClient,
SendClient,
constants,
ReceiveClient,
Expand Down Expand Up @@ -442,6 +443,36 @@ def close_connection(connection: "Connection") -> None:
"""
connection.close()

@staticmethod
def create_mgmt_client(config: "Configuration", **kwargs: Any) -> "AMQPClient":
"""Creates and returns a pyamqp AMQPClient for management-only operations.

Unlike SendClient/ReceiveClient, this client does not create a sender or
receiver link. It only opens a connection and authenticates, suitable for
management requests that don't need an associated link.

:param ~azure.servicebus._configuration.Configuration config: The configuration. Required.
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
:keyword ~pyamqp.authentication.JWTTokenAuth auth: Required.
:keyword retry_policy: Required.
:keyword str client_name: Required.
:keyword dict properties: Required.
:return: AMQPClient
:rtype: ~pyamqp.AMQPClient
"""
return AMQPClient(
config.hostname,
network_trace=config.logging_enable,
keep_alive_interval=config.keep_alive,
custom_endpoint_address=config.custom_endpoint_address,
connection_verify=config.connection_verify,
ssl_context=config.ssl_context,
transport_type=config.transport_type,
http_proxy=config.http_proxy,
socket_timeout=config.socket_timeout,
use_tls=config.use_tls,
**kwargs,
)

@staticmethod
def create_send_client(config: "Configuration", **kwargs: Any) -> "SendClient": # pylint: disable=docstring-keyword-should-match-keyword-only
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,28 @@ def close_connection(connection: "Connection") -> None:
"""
connection.destroy()

@staticmethod
def create_mgmt_client(config: "Configuration", **kwargs: Any) -> "AMQPClient":
"""Creates and returns a uamqp AMQPClient for management-only operations.

:param ~azure.servicebus._common._configuration.Configuration config: The configuration.
:keyword JWTTokenAuth auth: Required.
:keyword retry_policy: Required.
:keyword str client_name: Required.
:keyword dict properties: Required.
:return: AMQPClient
:rtype: ~uamqp.AMQPClient
"""
retry_policy = kwargs.pop("retry_policy")
return AMQPClient(
"amqps://" + config.hostname,
debug=config.logging_enable,
error_policy=retry_policy,
keep_alive_interval=config.keep_alive,
encoding=config.encoding,
**kwargs,
)

@staticmethod
def create_send_client(config: "Configuration", **kwargs: Any) -> "SendClient": # pylint:disable=docstring-keyword-should-match-keyword-only
"""
Expand Down
Loading
Loading