Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import logging

from .message import ServiceBusReceivedMessage
from .constants import ServiceBusReceiveMode, MGMT_RESPONSE_MESSAGE_ERROR_CONDITION
from .constants import (
ServiceBusReceiveMode,
MGMT_RESPONSE_MESSAGE_ERROR_CONDITION,
ERROR_CODE_MESSAGE_NOT_FOUND,
)

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,6 +80,11 @@ def list_sessions_op( # pylint: disable=inconsistent-return-statements
return parsed
if status_code in [202, 204]:
return []
# The service returns 404 with com.microsoft:message-not-found when there are
# no sessions matching the query. Other 404 conditions (entity not found,
# auth failures, etc.) should propagate as errors.
if status_code == 404 and condition == ERROR_CODE_MESSAGE_NOT_FOUND:
return []

amqp_transport.handle_amqp_mgmt_error(_LOGGER, "List sessions failed.", condition, description, status_code)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=client-method-missing-tracing-decorator
from typing import Any, Union, Optional, TYPE_CHECKING, Type
from typing import Any, List, Union, Optional, TYPE_CHECKING, Type
from datetime import datetime
import logging
import warnings
from weakref import WeakSet
Expand All @@ -17,6 +18,7 @@
)
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._session_browser import _SessionBrowser
from ._common.auto_lock_renewer import AutoLockRenewer
from ._common._configuration import Configuration
from ._common.utils import (
Expand Down Expand Up @@ -699,3 +701,98 @@ def get_subscription_receiver(
)
self._handlers.add(handler)
return handler

def _create_session_browser(self, entity_name, subscription_name=None, **kwargs):
"""Create an internal _SessionBrowser for management-only operations.

:param str entity_name: The queue name (or topic name when ``subscription_name`` is set).
:param str subscription_name: The subscription name when listing sessions on a topic.
:return: A new internal _SessionBrowser bound to this client.
:rtype: ~azure.servicebus._session_browser._SessionBrowser
"""
browser = _SessionBrowser(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
ssl_context=self._ssl_context,
amqp_transport=self._amqp_transport,
use_tls=self._config.use_tls,
subscription_name=subscription_name,
**kwargs,
)
return browser

def list_queue_sessions(
self,
queue_name: str,
*,
updated_since: Optional[datetime] = None,
timeout: Optional[float] = None,
**kwargs: Any,
) -> List[str]:
"""List session IDs with active messages in a session-enabled queue.

If ``updated_since`` is specified, only sessions whose last update (state change
or message activity) is after that time are returned. If not specified, returns
sessions with active messages in the queue.

:param str queue_name: The name of the session-enabled queue.
:keyword ~datetime.datetime updated_since: If specified, only sessions whose last update
(state change or message activity) is after this time are returned.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
browser = self._create_session_browser(queue_name)
try:
return browser.list_sessions(updated_since=updated_since, timeout=timeout)
finally:
browser.close()

def list_subscription_sessions(
self,
topic_name: str,
subscription_name: str,
*,
updated_since: Optional[datetime] = None,
timeout: Optional[float] = None,
**kwargs: Any,
) -> List[str]:
"""List session IDs with active messages in a session-enabled subscription.

If ``updated_since`` is specified, only sessions whose last update (state change
or message activity) is after that time are returned. If not specified, returns
sessions with active messages in the subscription.

:param str topic_name: The name of the topic.
:param str subscription_name: The name of the subscription.
:keyword ~datetime.datetime updated_since: If specified, only sessions whose last update
(state change or message activity) is after this time are returned.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
browser = self._create_session_browser(topic_name, subscription_name=subscription_name)
try:
return browser.list_sessions(updated_since=updated_since, timeout=timeout)
finally:
browser.close()
134 changes: 134 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_session_browser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import time
import uuid
from datetime import datetime, timezone
from typing import List, Optional

from ._base_handler import BaseHandler
from ._common.utils import create_authentication
from ._common.constants import (
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
)
from ._common import mgmt_handlers
from ._pyamqp.types import AMQPTypes, TYPE, VALUE

# The service checks for DateTime.MaxValue (C# 9999-12-31T23:59:59.9999999) to switch
# between "active messages" mode and "updated since" mode. On the AMQP wire, timestamps
# have millisecond precision, so DateTime.MaxValue becomes 253402300799999 ms from epoch.
# Python's datetime.timestamp() float math rounds this up by 1ms, so we use the
# pre-computed constant directly for the sentinel.
_MAX_DATETIME_MS = 253402300799999
_PAGE_SIZE = 100


def _amqp_int_value(value):
return {TYPE: AMQPTypes.int, VALUE: value}


class _SessionBrowser(BaseHandler):
"""Internal handler that opens an AMQP connection for management-only operations.

Unlike ServiceBusSender/ServiceBusReceiver, this does NOT create a sender or
receiver link. It only opens a connection and authenticates, then sends
management requests to the $management endpoint.

Used for entity-level management operations like get-message-sessions where
a sender or receiver link is not needed.
"""

def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs):
super().__init__(
fully_qualified_namespace=fully_qualified_namespace,
entity_name=entity_name,
credential=credential,
**kwargs,
)
self._auth_uri = f"sb://{self.fully_qualified_namespace}/{self._entity_path}"
self._error_policy = self._amqp_transport.create_retry_policy(self._config)
self._name = f"SBSessionBrowser-{uuid.uuid4()}"
self._connection = kwargs.get("connection")

def _create_handler(self, auth):
self._handler = self._amqp_transport.create_mgmt_client(
config=self._config,
auth=auth,
properties=self._properties,
retry_policy=self._error_policy,
client_name=self._name,
)

def _open(self):
if self._running:
return
if self._handler:
self._handler.close()

auth = None if self._connection else create_authentication(self)
self._create_handler(auth)
try:
self._handler.open(connection=self._connection)
while not self._handler.client_ready():
time.sleep(0.05)
self._running = True
except:
self._close_handler()
raise

def list_sessions(
self,
*,
updated_since: Optional[datetime] = None,
timeout: Optional[float] = None,
) -> List[str]:
"""List session IDs for this entity.

:keyword ~datetime.datetime updated_since: If specified, only sessions whose last update
(state change or message activity) is after this time are returned. If not specified,
returns sessions with active messages in the entity.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
# DateTime.MaxValue triggers "active messages" mode on the service side.
# A real timestamp triggers "sessions updated since" mode.
if updated_since is None:
last_updated_time_ms = _MAX_DATETIME_MS
else:
# Normalize naive datetimes to UTC. Python's datetime.timestamp()
# interprets naive values as local time, which would make the wire
# value depend on the host's timezone. Treat naive values as UTC
# (consistent with how naive datetimes are handled elsewhere in
# this SDK) and convert aware values to UTC before serializing.
if updated_since.tzinfo is None:
normalized = updated_since.replace(tzinfo=timezone.utc)
else:
normalized = updated_since.astimezone(timezone.utc)
last_updated_time_ms = int(normalized.timestamp() * 1000)

all_session_ids: List[str] = []
skip = 0

while True:
message = {
"last-updated-time": {TYPE: AMQPTypes.timestamp, VALUE: last_updated_time_ms},
"skip": _amqp_int_value(skip),
"top": _amqp_int_value(_PAGE_SIZE),
}
result = self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
message,
mgmt_handlers.list_sessions_op,
keep_alive_associated_link=False,
timeout=timeout,
)
if not result:
break
all_session_ids.extend(result)
if len(result) < _PAGE_SIZE:
break
skip += len(result)

return all_session_ids
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .._pyamqp import (
utils,
AMQPClient,
SendClient,
constants,
ReceiveClient,
Expand Down Expand Up @@ -100,7 +101,6 @@
from .._common.message import ServiceBusReceivedMessage, ServiceBusMessage, ServiceBusMessageBatch
from .._common._configuration import Configuration
from .._pyamqp.performatives import AttachFrame, TransferFrame
from .._pyamqp.client import AMQPClient
from .._pyamqp.message import MessageDict

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -442,6 +442,36 @@ def close_connection(connection: "Connection") -> None:
"""
connection.close()

@staticmethod
def create_mgmt_client(config: "Configuration", **kwargs: Any) -> "AMQPClient": # pylint: disable=docstring-keyword-should-match-keyword-only
"""Creates and returns a pyamqp AMQPClient for management-only operations.

Unlike SendClient/ReceiveClient, this client does not create a sender or
receiver link. It only opens a connection and authenticates, suitable for
management requests that don't need an associated link.

:param ~azure.servicebus._common._configuration.Configuration config: The configuration. Required.
:keyword ~pyamqp.authentication.JWTTokenAuth auth: Required.
:keyword retry_policy: Required.
:keyword str client_name: Required.
:keyword dict properties: Required.
:return: AMQPClient
:rtype: ~pyamqp.AMQPClient
"""
return AMQPClient(
config.hostname,
network_trace=config.logging_enable,
keep_alive_interval=config.keep_alive,
custom_endpoint_address=config.custom_endpoint_address,
connection_verify=config.connection_verify,
ssl_context=config.ssl_context,
transport_type=config.transport_type,
http_proxy=config.http_proxy,
socket_timeout=config.socket_timeout,
use_tls=config.use_tls,
**kwargs,
)

@staticmethod
def create_send_client(config: "Configuration", **kwargs: Any) -> "SendClient": # pylint: disable=docstring-keyword-should-match-keyword-only
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

try:
from uamqp import (
AMQPClient,
BatchMessage,
constants,
MessageBodyType,
Expand Down Expand Up @@ -507,6 +508,28 @@ def close_connection(connection: "Connection") -> None:
"""
connection.destroy()

@staticmethod
def create_mgmt_client(config: "Configuration", **kwargs: Any) -> "AMQPClient": # pylint: disable=docstring-keyword-should-match-keyword-only
"""Creates and returns a uamqp AMQPClient for management-only operations.

:param ~azure.servicebus._common._configuration.Configuration config: The configuration.
:keyword JWTTokenAuth auth: Required.
:keyword retry_policy: Required.
:keyword str client_name: Required.
:keyword dict properties: Required.
:return: AMQPClient
:rtype: ~uamqp.AMQPClient
"""
retry_policy = kwargs.pop("retry_policy")
return AMQPClient(
"amqps://" + config.hostname,
debug=config.logging_enable,
error_policy=retry_policy,
keep_alive_interval=config.keep_alive,
encoding=config.encoding,
**kwargs,
)

@staticmethod
def create_send_client(config: "Configuration", **kwargs: Any) -> "SendClient": # pylint:disable=docstring-keyword-should-match-keyword-only
"""
Expand Down
Loading
Loading