-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(servicebus): add list_queue_sessions and list_subscription_sessions #46575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
EldertGrootenboer
wants to merge
34
commits into
Azure:main
Choose a base branch
from
EldertGrootenboer:fix/servicebus-get-message-sessions
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 11 commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
3fa067a
fix: Forward **kwargs to _build_pipeline in ServiceBusAdministrationC…
EldertGrootenboerMS e1efce8
feat(servicebus): add list_queue_sessions and list_subscription_sessions
EldertGrootenboerMS cb1dce7
fix: add early-exit on partial page to avoid extra pagination round trip
EldertGrootenboerMS 143cd6a
fix(servicebus): address PR review feedback for list sessions
EldertGrootenboerMS 90b04cf
fix(servicebus): validate timeout in list_*_sessions
EldertGrootenboerMS b9314f4
fix(servicebus): satisfy pylint and mypy for new session browser
EldertGrootenboerMS 00dafed
test(servicebus): add subscription updated_since coverage and async t…
EldertGrootenboerMS 17df2fc
test(servicebus): subtract clock skew buffer from updated_since boundary
EldertGrootenboerMS 84d7383
fix(servicebus): use real return-type annotations on list_*_sessions
EldertGrootenboerMS 5c925c1
docs(servicebus): clarify list_*_sessions updated_since semantics
EldertGrootenboerMS ce40401
fix(servicebus): add create_mgmt_client_async to UamqpTransportAsync
EldertGrootenboerMS 043375a
fix(servicebus): docstring path + test import order
EldertGrootenboerMS d5474b1
fix(servicebus): satisfy CI pylint for new session browser code
EldertGrootenboerMS b4fdb86
Merge remote-tracking branch 'origin/main' into fix/servicebus-get-me…
EldertGrootenboerMS 2890c70
docs(servicebus): add list sessions changelog entry
EldertGrootenboerMS 9d7f59d
fix(servicebus): rename updated_since to updated_after, yield session…
EldertGrootenboerMS a1b894c
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer 6d49cb2
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer 8174f64
fix(servicebus): remove unused List import from client files
EldertGrootenboerMS e3c2608
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer 080c4c7
fix(servicebus): fix tests to consume iterators, bump version to 7.15…
EldertGrootenboerMS 800a93d
fix(servicebus): updated_after tests call set_state() per .NET parity
EldertGrootenboerMS 3a2ec56
docs(servicebus): correct error-handling comments per service-side code
EldertGrootenboerMS 5700448
fix: correct active-messages sentinel to 253402300800000 ms
EldertGrootenboerMS 9438829
fix: remove unused imports and update docstring in test_session_browser
EldertGrootenboerMS a8c40fb
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer aa73b13
fix: add missing blank line before 7.14.3 section in CHANGELOG
EldertGrootenboerMS 3dca34a
fix: set Development Status classifier to Beta for 7.15.0b1
EldertGrootenboerMS dbe8dde
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer b2d1432
fix: add EntityPath validation to list_queue_sessions and list_subscr…
EldertGrootenboerMS 7b29c94
fix(servicebus): rename updated_after to session_state_updated_after …
EldertGrootenboerMS 514d34c
fix: wrap long lines in async list_sessions calls (pylint C0301)
EldertGrootenboerMS 75d9965
Merge branch 'main' into fix/servicebus-get-message-sessions
EldertGrootenboer 9c9f56e
fix(servicebus): address PR #46575 review feedback from j7nw4r
EldertGrootenboerMS File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
134 changes: 134 additions & 0 deletions
134
sdk/servicebus/azure-servicebus/azure/servicebus/_session_browser.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| # -------------------------------------------------------------------------------------------- | ||
| # Copyright (c) Microsoft Corporation. All rights reserved. | ||
| # Licensed under the MIT License. See License.txt in the project root for license information. | ||
| # -------------------------------------------------------------------------------------------- | ||
| import time | ||
| import uuid | ||
| from datetime import datetime, timezone | ||
| from typing import List, Optional | ||
|
|
||
| from ._base_handler import BaseHandler | ||
| from ._common.utils import create_authentication | ||
| from ._common.constants import ( | ||
| REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION, | ||
| ) | ||
| from ._common import mgmt_handlers | ||
| from ._pyamqp.types import AMQPTypes, TYPE, VALUE | ||
|
|
||
| # The service checks for DateTime.MaxValue (C# 9999-12-31T23:59:59.9999999) to switch | ||
| # between "active messages" mode and "updated since" mode. On the AMQP wire, timestamps | ||
| # have millisecond precision, so DateTime.MaxValue becomes 253402300799999 ms from epoch. | ||
| # Python's datetime.timestamp() float math rounds this up by 1ms, so we use the | ||
| # pre-computed constant directly for the sentinel. | ||
| _MAX_DATETIME_MS = 253402300799999 | ||
| _PAGE_SIZE = 100 | ||
|
|
||
|
|
||
| def _amqp_int_value(value): | ||
| return {TYPE: AMQPTypes.int, VALUE: value} | ||
|
|
||
|
|
||
| class _SessionBrowser(BaseHandler): | ||
| """Internal handler that opens an AMQP connection for management-only operations. | ||
|
|
||
| Unlike ServiceBusSender/ServiceBusReceiver, this does NOT create a sender or | ||
| receiver link. It only opens a connection and authenticates, then sends | ||
| management requests to the $management endpoint. | ||
|
|
||
| Used for entity-level management operations like get-message-sessions where | ||
| a sender or receiver link is not needed. | ||
| """ | ||
|
|
||
| def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs): | ||
| super().__init__( | ||
| fully_qualified_namespace=fully_qualified_namespace, | ||
| entity_name=entity_name, | ||
| credential=credential, | ||
| **kwargs, | ||
| ) | ||
| self._auth_uri = f"sb://{self.fully_qualified_namespace}/{self._entity_path}" | ||
| self._error_policy = self._amqp_transport.create_retry_policy(self._config) | ||
| self._name = f"SBSessionBrowser-{uuid.uuid4()}" | ||
| self._connection = kwargs.get("connection") | ||
|
|
||
| def _create_handler(self, auth): | ||
| self._handler = self._amqp_transport.create_mgmt_client( | ||
| config=self._config, | ||
| auth=auth, | ||
| properties=self._properties, | ||
| retry_policy=self._error_policy, | ||
| client_name=self._name, | ||
| ) | ||
|
EldertGrootenboer marked this conversation as resolved.
EldertGrootenboer marked this conversation as resolved.
|
||
|
|
||
| def _open(self): | ||
| if self._running: | ||
| return | ||
| if self._handler: | ||
| self._handler.close() | ||
|
|
||
| auth = None if self._connection else create_authentication(self) | ||
| self._create_handler(auth) | ||
| try: | ||
| self._handler.open(connection=self._connection) | ||
| while not self._handler.client_ready(): | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
| time.sleep(0.05) | ||
| self._running = True | ||
| except: | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
| self._close_handler() | ||
| raise | ||
|
|
||
| def list_sessions( | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
| self, | ||
| *, | ||
| updated_since: Optional[datetime] = None, | ||
| timeout: Optional[float] = None, | ||
| ) -> List[str]: | ||
| """List session IDs for this entity. | ||
|
|
||
| :keyword ~datetime.datetime updated_since: If specified, only sessions whose last update | ||
| (state change or message activity) is after this time are returned. If not specified, | ||
| returns sessions with active messages in the entity. | ||
|
EldertGrootenboer marked this conversation as resolved.
Outdated
|
||
| :keyword float timeout: The total operation timeout in seconds. | ||
| :returns: A list of session ID strings. | ||
| :rtype: list[str] | ||
| """ | ||
| # DateTime.MaxValue triggers "active messages" mode on the service side. | ||
| # A real timestamp triggers "sessions updated since" mode. | ||
| if updated_since is None: | ||
| last_updated_time_ms = _MAX_DATETIME_MS | ||
| else: | ||
| # Normalize naive datetimes to UTC. Python's datetime.timestamp() | ||
| # interprets naive values as local time, which would make the wire | ||
| # value depend on the host's timezone. Treat naive values as UTC | ||
| # (consistent with how naive datetimes are handled elsewhere in | ||
| # this SDK) and convert aware values to UTC before serializing. | ||
| if updated_since.tzinfo is None: | ||
| normalized = updated_since.replace(tzinfo=timezone.utc) | ||
| else: | ||
| normalized = updated_since.astimezone(timezone.utc) | ||
| last_updated_time_ms = int(normalized.timestamp() * 1000) | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
|
|
||
| all_session_ids: List[str] = [] | ||
| skip = 0 | ||
|
|
||
| while True: | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
| message = { | ||
| "last-updated-time": {TYPE: AMQPTypes.timestamp, VALUE: last_updated_time_ms}, | ||
| "skip": _amqp_int_value(skip), | ||
| "top": _amqp_int_value(_PAGE_SIZE), | ||
| } | ||
|
EldertGrootenboer marked this conversation as resolved.
|
||
| result = self._mgmt_request_response_with_retry( | ||
| REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION, | ||
| message, | ||
| mgmt_handlers.list_sessions_op, | ||
| keep_alive_associated_link=False, | ||
| timeout=timeout, | ||
| ) | ||
| if not result: | ||
| break | ||
| all_session_ids.extend(result) | ||
| if len(result) < _PAGE_SIZE: | ||
| break | ||
| skip += len(result) | ||
|
|
||
| return all_session_ids | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.