-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(servicebus): add list_queue_sessions and list_subscription_sessions #46575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
EldertGrootenboer
wants to merge
13
commits into
Azure:main
Choose a base branch
from
EldertGrootenboer:fix/servicebus-get-message-sessions
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
3fa067a
fix: Forward **kwargs to _build_pipeline in ServiceBusAdministrationC…
EldertGrootenboerMS e1efce8
feat(servicebus): add list_queue_sessions and list_subscription_sessions
EldertGrootenboerMS cb1dce7
fix: add early-exit on partial page to avoid extra pagination round trip
EldertGrootenboerMS 143cd6a
fix(servicebus): address PR review feedback for list sessions
EldertGrootenboerMS 90b04cf
fix(servicebus): validate timeout in list_*_sessions
EldertGrootenboerMS b9314f4
fix(servicebus): satisfy pylint and mypy for new session browser
EldertGrootenboerMS 00dafed
test(servicebus): add subscription updated_since coverage and async t…
EldertGrootenboerMS 17df2fc
test(servicebus): subtract clock skew buffer from updated_since boundary
EldertGrootenboerMS 84d7383
fix(servicebus): use real return-type annotations on list_*_sessions
EldertGrootenboerMS 5c925c1
docs(servicebus): clarify list_*_sessions updated_since semantics
EldertGrootenboerMS ce40401
fix(servicebus): add create_mgmt_client_async to UamqpTransportAsync
EldertGrootenboerMS 043375a
fix(servicebus): docstring path + test import order
EldertGrootenboerMS d5474b1
fix(servicebus): satisfy CI pylint for new session browser code
EldertGrootenboerMS File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
sdk/servicebus/azure-servicebus/azure/servicebus/_session_browser.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| # -------------------------------------------------------------------------------------------- | ||
| # Copyright (c) Microsoft Corporation. All rights reserved. | ||
| # Licensed under the MIT License. See License.txt in the project root for license information. | ||
| # -------------------------------------------------------------------------------------------- | ||
| import time | ||
| import logging | ||
| import uuid | ||
| from datetime import datetime, timezone | ||
| from typing import Any, List, Optional, Union | ||
|
|
||
| from ._base_handler import BaseHandler | ||
| from ._common.utils import create_authentication | ||
| from ._common.constants import ( | ||
| REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION, | ||
| ) | ||
| from ._common import mgmt_handlers | ||
| from ._pyamqp.types import AMQPTypes, TYPE, VALUE | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
| # The service checks for DateTime.MaxValue (C# 9999-12-31T23:59:59.9999999) to switch | ||
| # between "active messages" mode and "updated since" mode. On the AMQP wire, timestamps | ||
| # have millisecond precision, so DateTime.MaxValue becomes 253402300799999 ms from epoch. | ||
| # Python's datetime.timestamp() float math rounds this up by 1ms, so we use the | ||
| # pre-computed constant directly for the sentinel. | ||
| _MAX_DATETIME_MS = 253402300799999 | ||
| _MAX_DATETIME = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) | ||
| _PAGE_SIZE = 100 | ||
|
|
||
|
|
||
| def _amqp_int_value(value): | ||
| return {TYPE: AMQPTypes.int, VALUE: value} | ||
|
|
||
|
|
||
| def _amqp_timestamp_value(value): | ||
| """Convert a datetime to an AMQP timestamp (milliseconds since epoch).""" | ||
| return {TYPE: AMQPTypes.timestamp, VALUE: int(value.timestamp() * 1000)} | ||
|
|
||
|
EldertGrootenboer marked this conversation as resolved.
Outdated
|
||
|
|
||
| class _SessionBrowser(BaseHandler): | ||
| """Internal handler that opens an AMQP connection for management-only operations. | ||
|
|
||
| Unlike ServiceBusSender/ServiceBusReceiver, this does NOT create a sender or | ||
| receiver link. It only opens a connection and authenticates, then sends | ||
| management requests to the $management endpoint. | ||
|
|
||
| Used for entity-level management operations like get-message-sessions where | ||
| a sender or receiver link is not needed. | ||
| """ | ||
|
|
||
| def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs): | ||
| super().__init__( | ||
| fully_qualified_namespace=fully_qualified_namespace, | ||
| entity_name=entity_name, | ||
| credential=credential, | ||
| **kwargs, | ||
| ) | ||
| self._auth_uri = f"sb://{self.fully_qualified_namespace}/{self._entity_name}" | ||
|
EldertGrootenboer marked this conversation as resolved.
Outdated
|
||
| self._error_policy = self._amqp_transport.create_retry_policy(self._config) | ||
| self._name = f"SBSessionBrowser-{uuid.uuid4()}" | ||
| self._connection = kwargs.get("connection") | ||
|
|
||
| def _create_handler(self, auth): | ||
| self._handler = self._amqp_transport.create_mgmt_client( | ||
| config=self._config, | ||
| auth=auth, | ||
| properties=self._properties, | ||
| retry_policy=self._error_policy, | ||
| client_name=self._name, | ||
| ) | ||
|
|
||
| def _open(self): | ||
| if self._running: | ||
| return | ||
| if self._handler: | ||
| self._handler.close() | ||
|
|
||
| auth = None if self._connection else create_authentication(self) | ||
| self._create_handler(auth) | ||
| try: | ||
| self._handler.open(connection=self._connection) | ||
| while not self._handler.client_ready(): | ||
| time.sleep(0.05) | ||
| self._running = True | ||
| except: | ||
| self._close_handler() | ||
| raise | ||
|
|
||
| def list_sessions( | ||
| self, | ||
| *, | ||
| updated_since: Optional[datetime] = None, | ||
| timeout: Optional[float] = None, | ||
| ) -> List[str]: | ||
| """List session IDs for this entity. | ||
|
|
||
| :keyword ~datetime.datetime updated_since: If specified, only sessions whose state was | ||
| updated after this time are returned. If not specified, returns sessions with | ||
| active messages in the entity. | ||
| :keyword float timeout: The total operation timeout in seconds. | ||
| :returns: A list of session ID strings. | ||
| :rtype: list[str] | ||
| """ | ||
| # DateTime.MaxValue triggers "active messages" mode on the service side. | ||
| # A real timestamp triggers "sessions updated since" mode. | ||
| use_sentinel = updated_since is None | ||
| last_updated_time_ms = ( | ||
| _MAX_DATETIME_MS if use_sentinel | ||
| else int(updated_since.timestamp() * 1000) | ||
|
EldertGrootenboer marked this conversation as resolved.
Outdated
|
||
| ) | ||
|
EldertGrootenboer marked this conversation as resolved.
Outdated
|
||
|
|
||
| all_session_ids: List[str] = [] | ||
| skip = 0 | ||
|
|
||
| while True: | ||
| message = { | ||
| b"last-updated-time": {TYPE: AMQPTypes.timestamp, VALUE: last_updated_time_ms}, | ||
| b"skip": _amqp_int_value(skip), | ||
| b"top": _amqp_int_value(_PAGE_SIZE), | ||
| } | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
| result = self._mgmt_request_response_with_retry( | ||
| REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION, | ||
| message, | ||
| mgmt_handlers.list_sessions_op, | ||
| keep_alive_associated_link=False, | ||
| timeout=timeout, | ||
| ) | ||
| if not result: | ||
| break | ||
| all_session_ids.extend(result) | ||
| if len(result) < _PAGE_SIZE: | ||
| break | ||
| skip += len(result) | ||
|
|
||
| return all_session_ids | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.