Skip to content

Commit 00dafed

Browse files
test(servicebus): add subscription updated_since coverage and async tests
- tests/test_list_sessions.py: add list_subscription_sessions empty + updated_since coverage so the topic/subscription entity path is exercised in both wire modes. - tests/async_tests/test_list_sessions_async.py: new module covering the _SessionBrowserAsync path (sync parity: queue + subscription, active / empty / updated_since).
1 parent b9314f4 commit 00dafed

2 files changed

Lines changed: 277 additions & 0 deletions

File tree

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# -------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for
4+
# license information.
5+
# --------------------------------------------------------------------------
6+
7+
import pytest
8+
import uuid
9+
from datetime import datetime, timezone
10+
11+
from azure.servicebus import ServiceBusMessage
12+
from azure.servicebus.aio import ServiceBusClient
13+
14+
from devtools_testutils import AzureMgmtRecordedTestCase, get_credential
15+
from servicebus_preparer import (
16+
SERVICEBUS_ENDPOINT_SUFFIX,
17+
CachedServiceBusNamespacePreparer,
18+
CachedServiceBusResourceGroupPreparer,
19+
ServiceBusQueuePreparer,
20+
ServiceBusTopicPreparer,
21+
ServiceBusSubscriptionPreparer,
22+
)
23+
from utilities import uamqp_transport as get_uamqp_transport, ArgPasserAsync
24+
25+
uamqp_transport_params, uamqp_transport_ids = get_uamqp_transport()
26+
27+
28+
class TestServiceBusListSessionsAsync(AzureMgmtRecordedTestCase):
29+
30+
@pytest.mark.asyncio
31+
@pytest.mark.liveTest
32+
@pytest.mark.live_test_only
33+
@CachedServiceBusResourceGroupPreparer()
34+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
35+
@ServiceBusQueuePreparer(name_prefix="servicebustest", requires_session=True)
36+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
37+
@ArgPasserAsync()
38+
async def test_list_queue_sessions_with_active_messages_async(
39+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
40+
):
41+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
42+
credential = get_credential(is_async=True)
43+
async with ServiceBusClient(
44+
fully_qualified_namespace=fully_qualified_namespace,
45+
credential=credential,
46+
logging_enable=False,
47+
uamqp_transport=uamqp_transport,
48+
) as sb_client:
49+
session_ids = [str(uuid.uuid4()) for _ in range(3)]
50+
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
51+
for sid in session_ids:
52+
await sender.send_messages(ServiceBusMessage(
53+
f"test message for {sid}", session_id=sid))
54+
55+
result = await sb_client.list_queue_sessions(servicebus_queue.name)
56+
57+
assert isinstance(result, list)
58+
for sid in session_ids:
59+
assert sid in result
60+
61+
@pytest.mark.asyncio
62+
@pytest.mark.liveTest
63+
@pytest.mark.live_test_only
64+
@CachedServiceBusResourceGroupPreparer()
65+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
66+
@ServiceBusQueuePreparer(name_prefix="servicebustest", requires_session=True)
67+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
68+
@ArgPasserAsync()
69+
async def test_list_queue_sessions_empty_async(
70+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
71+
):
72+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
73+
credential = get_credential(is_async=True)
74+
async with ServiceBusClient(
75+
fully_qualified_namespace=fully_qualified_namespace,
76+
credential=credential,
77+
logging_enable=False,
78+
uamqp_transport=uamqp_transport,
79+
) as sb_client:
80+
# No messages sent; should return empty list
81+
result = await sb_client.list_queue_sessions(servicebus_queue.name)
82+
83+
assert isinstance(result, list)
84+
assert len(result) == 0
85+
86+
@pytest.mark.asyncio
87+
@pytest.mark.liveTest
88+
@pytest.mark.live_test_only
89+
@CachedServiceBusResourceGroupPreparer()
90+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
91+
@ServiceBusQueuePreparer(name_prefix="servicebustest", requires_session=True)
92+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
93+
@ArgPasserAsync()
94+
async def test_list_queue_sessions_updated_since_async(
95+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
96+
):
97+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
98+
credential = get_credential(is_async=True)
99+
async with ServiceBusClient(
100+
fully_qualified_namespace=fully_qualified_namespace,
101+
credential=credential,
102+
logging_enable=False,
103+
uamqp_transport=uamqp_transport,
104+
) as sb_client:
105+
before_send = datetime.now(timezone.utc)
106+
107+
session_id = str(uuid.uuid4())
108+
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
109+
await sender.send_messages(ServiceBusMessage(
110+
"test updated_since", session_id=session_id))
111+
112+
# updated_since mode: sessions whose state was updated after before_send
113+
result = await sb_client.list_queue_sessions(
114+
servicebus_queue.name, updated_since=before_send)
115+
116+
assert isinstance(result, list)
117+
assert session_id in result
118+
119+
@pytest.mark.asyncio
120+
@pytest.mark.liveTest
121+
@pytest.mark.live_test_only
122+
@CachedServiceBusResourceGroupPreparer()
123+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
124+
@ServiceBusTopicPreparer(name_prefix="servicebustest")
125+
@ServiceBusSubscriptionPreparer(name_prefix="servicebustest", requires_session=True)
126+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
127+
@ArgPasserAsync()
128+
async def test_list_subscription_sessions_with_active_messages_async(
129+
self, uamqp_transport, *, servicebus_namespace=None,
130+
servicebus_topic=None, servicebus_subscription=None, **kwargs
131+
):
132+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
133+
credential = get_credential(is_async=True)
134+
async with ServiceBusClient(
135+
fully_qualified_namespace=fully_qualified_namespace,
136+
credential=credential,
137+
logging_enable=False,
138+
uamqp_transport=uamqp_transport,
139+
) as sb_client:
140+
session_ids = [str(uuid.uuid4()) for _ in range(2)]
141+
async with sb_client.get_topic_sender(servicebus_topic.name) as sender:
142+
for sid in session_ids:
143+
await sender.send_messages(ServiceBusMessage(
144+
f"test message for {sid}", session_id=sid))
145+
146+
result = await sb_client.list_subscription_sessions(
147+
servicebus_topic.name, servicebus_subscription.name)
148+
149+
assert isinstance(result, list)
150+
for sid in session_ids:
151+
assert sid in result
152+
153+
@pytest.mark.asyncio
154+
@pytest.mark.liveTest
155+
@pytest.mark.live_test_only
156+
@CachedServiceBusResourceGroupPreparer()
157+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
158+
@ServiceBusTopicPreparer(name_prefix="servicebustest")
159+
@ServiceBusSubscriptionPreparer(name_prefix="servicebustest", requires_session=True)
160+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
161+
@ArgPasserAsync()
162+
async def test_list_subscription_sessions_empty_async(
163+
self, uamqp_transport, *, servicebus_namespace=None,
164+
servicebus_topic=None, servicebus_subscription=None, **kwargs
165+
):
166+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
167+
credential = get_credential(is_async=True)
168+
async with ServiceBusClient(
169+
fully_qualified_namespace=fully_qualified_namespace,
170+
credential=credential,
171+
logging_enable=False,
172+
uamqp_transport=uamqp_transport,
173+
) as sb_client:
174+
# No messages sent; should return empty list
175+
result = await sb_client.list_subscription_sessions(
176+
servicebus_topic.name, servicebus_subscription.name)
177+
178+
assert isinstance(result, list)
179+
assert len(result) == 0
180+
181+
@pytest.mark.asyncio
182+
@pytest.mark.liveTest
183+
@pytest.mark.live_test_only
184+
@CachedServiceBusResourceGroupPreparer()
185+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
186+
@ServiceBusTopicPreparer(name_prefix="servicebustest")
187+
@ServiceBusSubscriptionPreparer(name_prefix="servicebustest", requires_session=True)
188+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
189+
@ArgPasserAsync()
190+
async def test_list_subscription_sessions_updated_since_async(
191+
self, uamqp_transport, *, servicebus_namespace=None,
192+
servicebus_topic=None, servicebus_subscription=None, **kwargs
193+
):
194+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
195+
credential = get_credential(is_async=True)
196+
async with ServiceBusClient(
197+
fully_qualified_namespace=fully_qualified_namespace,
198+
credential=credential,
199+
logging_enable=False,
200+
uamqp_transport=uamqp_transport,
201+
) as sb_client:
202+
before_send = datetime.now(timezone.utc)
203+
204+
session_id = str(uuid.uuid4())
205+
async with sb_client.get_topic_sender(servicebus_topic.name) as sender:
206+
await sender.send_messages(ServiceBusMessage(
207+
"test updated_since", session_id=session_id))
208+
209+
# updated_since mode: sessions whose state was updated after before_send
210+
result = await sb_client.list_subscription_sessions(
211+
servicebus_topic.name, servicebus_subscription.name,
212+
updated_since=before_send)
213+
214+
assert isinstance(result, list)
215+
assert session_id in result

sdk/servicebus/azure-servicebus/tests/test_list_sessions.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,65 @@ def test_list_queue_sessions_updated_since(
149149

150150
assert isinstance(result, list)
151151
assert session_id in result
152+
153+
@pytest.mark.liveTest
154+
@pytest.mark.live_test_only
155+
@CachedServiceBusResourceGroupPreparer()
156+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
157+
@ServiceBusTopicPreparer(name_prefix="servicebustest")
158+
@ServiceBusSubscriptionPreparer(name_prefix="servicebustest", requires_session=True)
159+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
160+
@ArgPasser()
161+
def test_list_subscription_sessions_empty(
162+
self, uamqp_transport, *, servicebus_namespace=None,
163+
servicebus_topic=None, servicebus_subscription=None, **kwargs
164+
):
165+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
166+
credential = get_credential()
167+
with ServiceBusClient(
168+
fully_qualified_namespace=fully_qualified_namespace,
169+
credential=credential,
170+
logging_enable=False,
171+
uamqp_transport=uamqp_transport,
172+
) as sb_client:
173+
# No messages sent; should return empty list
174+
result = sb_client.list_subscription_sessions(
175+
servicebus_topic.name, servicebus_subscription.name)
176+
177+
assert isinstance(result, list)
178+
assert len(result) == 0
179+
180+
@pytest.mark.liveTest
181+
@pytest.mark.live_test_only
182+
@CachedServiceBusResourceGroupPreparer()
183+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
184+
@ServiceBusTopicPreparer(name_prefix="servicebustest")
185+
@ServiceBusSubscriptionPreparer(name_prefix="servicebustest", requires_session=True)
186+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
187+
@ArgPasser()
188+
def test_list_subscription_sessions_updated_since(
189+
self, uamqp_transport, *, servicebus_namespace=None,
190+
servicebus_topic=None, servicebus_subscription=None, **kwargs
191+
):
192+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
193+
credential = get_credential()
194+
with ServiceBusClient(
195+
fully_qualified_namespace=fully_qualified_namespace,
196+
credential=credential,
197+
logging_enable=False,
198+
uamqp_transport=uamqp_transport,
199+
) as sb_client:
200+
before_send = datetime.now(timezone.utc)
201+
202+
session_id = str(uuid.uuid4())
203+
with sb_client.get_topic_sender(servicebus_topic.name) as sender:
204+
sender.send_messages(ServiceBusMessage(
205+
"test updated_since", session_id=session_id))
206+
207+
# updated_since mode: sessions whose state was updated after before_send
208+
result = sb_client.list_subscription_sessions(
209+
servicebus_topic.name, servicebus_subscription.name,
210+
updated_since=before_send)
211+
212+
assert isinstance(result, list)
213+
assert session_id in result

0 commit comments

Comments
 (0)