Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import logging

from .message import ServiceBusReceivedMessage
from .constants import ServiceBusReceiveMode, MGMT_RESPONSE_MESSAGE_ERROR_CONDITION
from .constants import (
ServiceBusReceiveMode,
MGMT_RESPONSE_MESSAGE_ERROR_CONDITION,
ERROR_CODE_MESSAGE_NOT_FOUND,
)

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,6 +80,11 @@ def list_sessions_op( # pylint: disable=inconsistent-return-statements
return parsed
if status_code in [202, 204]:
return []
# The service returns 404 with com.microsoft:message-not-found when there are
# no sessions matching the query. Other 404 conditions (entity not found,
# auth failures, etc.) should propagate as errors.
if status_code == 404 and condition == ERROR_CODE_MESSAGE_NOT_FOUND:
return []

amqp_transport.handle_amqp_mgmt_error(_LOGGER, "List sessions failed.", condition, description, status_code)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=client-method-missing-tracing-decorator
from typing import Any, Union, Optional, TYPE_CHECKING, Type
from typing import Any, List, Union, Optional, TYPE_CHECKING, Type
from datetime import datetime
Comment thread
EldertGrootenboer marked this conversation as resolved.
import logging
import warnings
from weakref import WeakSet
Expand All @@ -17,6 +18,7 @@
)
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._session_browser import _SessionBrowser
from ._common.auto_lock_renewer import AutoLockRenewer
from ._common._configuration import Configuration
from ._common.utils import (
Expand Down Expand Up @@ -699,3 +701,92 @@ def get_subscription_receiver(
)
self._handlers.add(handler)
return handler

def _create_session_browser(self, entity_name, subscription_name=None, **kwargs):
"""Create an internal _SessionBrowser for management-only operations."""
browser = _SessionBrowser(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
ssl_context=self._ssl_context,
amqp_transport=self._amqp_transport,
use_tls=self._config.use_tls,
subscription_name=subscription_name,
**kwargs,
)
return browser

def list_queue_sessions(
self,
queue_name: str,
*,
updated_since: Optional[datetime] = None,
Comment thread
EldertGrootenboer marked this conversation as resolved.
timeout: Optional[float] = None,
**kwargs: Any,
) -> List[str]:
"""List session IDs with active messages in a session-enabled queue.

If ``updated_since`` is specified, only sessions whose last update (state change
or message activity) is after that time are returned. If not specified, returns
sessions with active messages in the queue.

Comment thread
EldertGrootenboer marked this conversation as resolved.
:param str queue_name: The name of the session-enabled queue.
:keyword ~datetime.datetime updated_since: If specified, only sessions whose last update
(state change or message activity) is after this time are returned.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
if timeout is not None and timeout <= 0:
Comment thread
EldertGrootenboer marked this conversation as resolved.
raise ValueError("The timeout must be greater than 0.")
browser = self._create_session_browser(queue_name)
try:
return browser.list_sessions(updated_since=updated_since, timeout=timeout)
finally:
browser.close()

def list_subscription_sessions(
self,
topic_name: str,
subscription_name: str,
*,
Comment thread
EldertGrootenboer marked this conversation as resolved.
updated_since: Optional[datetime] = None,
timeout: Optional[float] = None,
**kwargs: Any,
) -> List[str]:
"""List session IDs with active messages in a session-enabled subscription.

If ``updated_since`` is specified, only sessions whose last update (state change
or message activity) is after that time are returned. If not specified, returns
sessions with active messages in the subscription.

:param str topic_name: The name of the topic.
Comment thread
EldertGrootenboer marked this conversation as resolved.
:param str subscription_name: The name of the subscription.
:keyword ~datetime.datetime updated_since: If specified, only sessions whose last update
(state change or message activity) is after this time are returned.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
if timeout is not None and timeout <= 0:
Comment thread
EldertGrootenboer marked this conversation as resolved.
raise ValueError("The timeout must be greater than 0.")
browser = self._create_session_browser(topic_name, subscription_name=subscription_name)
try:
return browser.list_sessions(updated_since=updated_since, timeout=timeout)
finally:
browser.close()
134 changes: 134 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_session_browser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import time
import uuid
from datetime import datetime, timezone
from typing import List, Optional

from ._base_handler import BaseHandler
from ._common.utils import create_authentication
from ._common.constants import (
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
)
from ._common import mgmt_handlers
from ._pyamqp.types import AMQPTypes, TYPE, VALUE

# The service checks for DateTime.MaxValue (C# 9999-12-31T23:59:59.9999999) to switch
# between "active messages" mode and "updated since" mode. On the AMQP wire, timestamps
# have millisecond precision, so DateTime.MaxValue becomes 253402300799999 ms from epoch.
# Python's datetime.timestamp() float math rounds this up by 1ms, so we use the
# pre-computed constant directly for the sentinel.
_MAX_DATETIME_MS = 253402300799999
_PAGE_SIZE = 100


def _amqp_int_value(value):
return {TYPE: AMQPTypes.int, VALUE: value}


class _SessionBrowser(BaseHandler):
"""Internal handler that opens an AMQP connection for management-only operations.

Unlike ServiceBusSender/ServiceBusReceiver, this does NOT create a sender or
receiver link. It only opens a connection and authenticates, then sends
management requests to the $management endpoint.

Used for entity-level management operations like get-message-sessions where
a sender or receiver link is not needed.
"""

def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs):
super().__init__(
fully_qualified_namespace=fully_qualified_namespace,
entity_name=entity_name,
credential=credential,
**kwargs,
)
self._auth_uri = f"sb://{self.fully_qualified_namespace}/{self._entity_path}"
self._error_policy = self._amqp_transport.create_retry_policy(self._config)
self._name = f"SBSessionBrowser-{uuid.uuid4()}"
self._connection = kwargs.get("connection")

def _create_handler(self, auth):
self._handler = self._amqp_transport.create_mgmt_client(
config=self._config,
auth=auth,
properties=self._properties,
retry_policy=self._error_policy,
client_name=self._name,
)

def _open(self):
if self._running:
return
if self._handler:
self._handler.close()

auth = None if self._connection else create_authentication(self)
self._create_handler(auth)
try:
self._handler.open(connection=self._connection)
while not self._handler.client_ready():
time.sleep(0.05)
self._running = True
except:
self._close_handler()
raise

def list_sessions(
self,
*,
updated_since: Optional[datetime] = None,
timeout: Optional[float] = None,
) -> List[str]:
"""List session IDs for this entity.

:keyword ~datetime.datetime updated_since: If specified, only sessions whose last update
(state change or message activity) is after this time are returned. If not specified,
returns sessions with active messages in the entity.
Comment thread
EldertGrootenboer marked this conversation as resolved.
:keyword float timeout: The total operation timeout in seconds.
:returns: A list of session ID strings.
:rtype: list[str]
"""
# DateTime.MaxValue triggers "active messages" mode on the service side.
# A real timestamp triggers "sessions updated since" mode.
if updated_since is None:
last_updated_time_ms = _MAX_DATETIME_MS
else:
# Normalize naive datetimes to UTC. Python's datetime.timestamp()
# interprets naive values as local time, which would make the wire
# value depend on the host's timezone. Treat naive values as UTC
# (consistent with how naive datetimes are handled elsewhere in
# this SDK) and convert aware values to UTC before serializing.
if updated_since.tzinfo is None:
normalized = updated_since.replace(tzinfo=timezone.utc)
else:
normalized = updated_since.astimezone(timezone.utc)
last_updated_time_ms = int(normalized.timestamp() * 1000)

all_session_ids: List[str] = []
skip = 0

while True:
message = {
"last-updated-time": {TYPE: AMQPTypes.timestamp, VALUE: last_updated_time_ms},
"skip": _amqp_int_value(skip),
"top": _amqp_int_value(_PAGE_SIZE),
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
result = self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
message,
mgmt_handlers.list_sessions_op,
keep_alive_associated_link=False,
timeout=timeout,
)
if not result:
break
all_session_ids.extend(result)
if len(result) < _PAGE_SIZE:
break
skip += len(result)

return all_session_ids
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .._pyamqp import (
utils,
AMQPClient,
SendClient,
constants,
ReceiveClient,
Expand Down Expand Up @@ -442,6 +443,36 @@ def close_connection(connection: "Connection") -> None:
"""
connection.close()

@staticmethod
def create_mgmt_client(config: "Configuration", **kwargs: Any) -> "AMQPClient":
"""Creates and returns a pyamqp AMQPClient for management-only operations.

Unlike SendClient/ReceiveClient, this client does not create a sender or
receiver link. It only opens a connection and authenticates, suitable for
management requests that don't need an associated link.

:param ~azure.servicebus._configuration.Configuration config: The configuration. Required.
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
:keyword ~pyamqp.authentication.JWTTokenAuth auth: Required.
:keyword retry_policy: Required.
:keyword str client_name: Required.
:keyword dict properties: Required.
:return: AMQPClient
:rtype: ~pyamqp.AMQPClient
"""
return AMQPClient(
config.hostname,
network_trace=config.logging_enable,
keep_alive_interval=config.keep_alive,
custom_endpoint_address=config.custom_endpoint_address,
connection_verify=config.connection_verify,
ssl_context=config.ssl_context,
transport_type=config.transport_type,
http_proxy=config.http_proxy,
socket_timeout=config.socket_timeout,
use_tls=config.use_tls,
**kwargs,
)

@staticmethod
def create_send_client(config: "Configuration", **kwargs: Any) -> "SendClient": # pylint: disable=docstring-keyword-should-match-keyword-only
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,28 @@ def close_connection(connection: "Connection") -> None:
"""
connection.destroy()

@staticmethod
def create_mgmt_client(config: "Configuration", **kwargs: Any) -> "AMQPClient":
"""Creates and returns a uamqp AMQPClient for management-only operations.

:param ~azure.servicebus._common._configuration.Configuration config: The configuration.
:keyword JWTTokenAuth auth: Required.
:keyword retry_policy: Required.
:keyword str client_name: Required.
:keyword dict properties: Required.
:return: AMQPClient
:rtype: ~uamqp.AMQPClient
"""
retry_policy = kwargs.pop("retry_policy")
return AMQPClient(
"amqps://" + config.hostname,
debug=config.logging_enable,
error_policy=retry_policy,
keep_alive_interval=config.keep_alive,
encoding=config.encoding,
**kwargs,
)

@staticmethod
def create_send_client(config: "Configuration", **kwargs: Any) -> "SendClient": # pylint:disable=docstring-keyword-should-match-keyword-only
"""
Expand Down
Loading
Loading