Skip to content

Commit 4435aa5

Browse files
committed
use event-based signaling
1 parent 32fb1b1 commit 4435aa5

12 files changed

Lines changed: 151 additions & 117 deletions

sdk/webpubsub/azure-messaging-webpubsubclient/tests/conftest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,21 @@
2828
import pytest
2929
from dotenv import load_dotenv
3030
from devtools_testutils import test_proxy, add_general_regex_sanitizer
31+
from testcase import TEST_RESULT
32+
from testcase_async import TEST_RESULT_ASYNC
3133

3234
load_dotenv()
3335

36+
37+
@pytest.fixture(autouse=True)
38+
def clear_test_results():
39+
TEST_RESULT.clear()
40+
TEST_RESULT_ASYNC.clear()
41+
yield
42+
TEST_RESULT.clear()
43+
TEST_RESULT_ASYNC.clear()
44+
45+
3446
@pytest.fixture(scope="session", autouse=True)
3547
def add_sanitizers(test_proxy):
3648
endpoint = os.environ.get("WEBPUBSUBCLIENT_ENDPOINT", "WEBPUBSUBCLIENT_ENDPOINT")

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_auto_connect.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
# Licensed under the MIT License. See License.txt in the project root for
55
# license information.
66
# -------------------------------------------------------------------------
7-
import time
87
import pytest
98
from devtools_testutils import recorded_by_proxy
10-
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, TEST_RESULT, on_group_message
9+
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, TEST_RESULT
1110
from azure.messaging.webpubsubclient.models import WebPubSubProtocolType
1211

1312

@@ -26,28 +25,16 @@ def test_auto_connect(self, webpubsubclient_endpoint):
2625
reconnect_retry_backoff_factor=0.1,
2726
)
2827
name = "test_auto_connect"
28+
connected_event, message_event = self.setup_events(client)
2929
with client:
30-
# wait for connection_id to be updated
31-
for _ in range(30):
32-
if client._connection_id is not None:
33-
break
34-
time.sleep(1)
30+
assert connected_event.wait(timeout=30), "Timed out waiting for initial connection"
3531
conn_id0 = client._connection_id
36-
group_name = name
37-
client.subscribe("group-message", on_group_message)
38-
client.join_group(group_name)
32+
client.join_group(name)
33+
connected_event.clear() # reset for reconnection detection
3934
client._ws.sock.close(1001) # close the connection to trigger auto connect
4035
# wait for reconnect
41-
for _ in range(30):
42-
if client.is_connected() and client._connection_id != conn_id0:
43-
break
44-
time.sleep(1)
45-
# retry send_to_group to allow async group rejoin to complete
46-
for _ in range(10):
47-
client.send_to_group(group_name, name, "text")
48-
time.sleep(1)
49-
if name in TEST_RESULT:
50-
break
36+
assert connected_event.wait(timeout=30), "Timed out waiting for reconnection"
37+
self.retry_send_until_message(client, name, name, message_event, retries=10)
5138
conn_id1 = client._connection_id
5239
assert conn_id0 is not None
5340
assert conn_id1 is not None

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_auto_connect_async.py

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import asyncio
88
import pytest
99
from devtools_testutils.aio import recorded_by_proxy_async
10-
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC, on_group_message
10+
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC
1111
from testcase import WebpubsubClientPowerShellPreparer
1212
from azure.messaging.webpubsubclient.models import WebPubSubProtocolType
1313

@@ -26,30 +26,16 @@ async def test_auto_connect_async(self, webpubsubclient_endpoint):
2626
reconnect_retry_backoff_factor=0.1,
2727
)
2828
name = "test_auto_connect_async"
29+
connected_event, message_event = await self.setup_events(client)
2930
async with client:
30-
# wait for connection_id to be updated
31-
for _ in range(30):
32-
if client._connection_id is not None:
33-
break
34-
await asyncio.sleep(1)
31+
await asyncio.wait_for(connected_event.wait(), timeout=30)
3532
conn_id0 = client._connection_id
36-
group_name = name
37-
await client.subscribe("group-message", on_group_message)
38-
await client.join_group(group_name)
39-
await client._ws.sock.close(
40-
code=1001
41-
) # close the connection to trigger auto connect
33+
await client.join_group(name)
34+
connected_event.clear() # reset for reconnection detection
35+
await client._ws.sock.close(code=1001) # close the connection to trigger auto connect
4236
# wait for reconnect
43-
for _ in range(30):
44-
if client.is_connected() and client._connection_id != conn_id0:
45-
break
46-
await asyncio.sleep(1)
47-
await client.send_to_group(group_name, name, "text")
48-
# wait for on_group_message callback to fire
49-
for _ in range(10):
50-
if name in TEST_RESULT_ASYNC:
51-
break
52-
await asyncio.sleep(1)
37+
await asyncio.wait_for(connected_event.wait(), timeout=30)
38+
await self.retry_send_until_message(client, name, name, message_event, retries=10)
5339
conn_id1 = client._connection_id
5440
assert conn_id0 is not None
5541
assert conn_id1 is not None

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_no_recovery_no_connect.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ def test_disable_recovery_and_autoconnect_send_concurrently(
6464
with client:
6565
group_name = "test_disable_recovery_and_autoconnect_send_concurrently"
6666
client.join_group(group_name)
67+
client._ws.sock.close(1001) # close connection
68+
for _ in range(30):
69+
if not client.is_connected():
70+
break
71+
time.sleep(1)
72+
assert not client.is_connected()
6773

6874
def send(idx):
6975
client.send_to_group(group_name, f"hello_{idx}", "text")
@@ -73,10 +79,7 @@ def send(idx):
7379
t = SafeThread(target=send, args=(i,))
7480
t.start()
7581
all_threads.append(t)
76-
if i == 50:
77-
client._ws.sock.close(1001) # close connection
7882

79-
for i, t in enumerate(all_threads):
80-
if i > 50:
81-
with pytest.raises(Exception):
82-
t.join()
83+
for t in all_threads:
84+
with pytest.raises(SendMessageError):
85+
t.join()

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_no_recovery_no_connect_async.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,15 @@ async def test_disable_recovery_and_autoconnect_send_concurrently_async(
5959
async with client:
6060
group_name = "test_disable_recovery_and_autoconnect_send_concurrently_async"
6161
await client.join_group(group_name)
62-
count = 10
63-
tasks = [client.send_to_group(group_name, "hello", "text") for _ in range(10)]
6462
await client._ws.session.close() # close connection
63+
# wait for client to detect disconnection
64+
for _ in range(30):
65+
if not client.is_connected():
66+
break
67+
await asyncio.sleep(1)
68+
assert not client.is_connected()
69+
70+
tasks = [client.send_to_group(group_name, "hello", "text") for _ in range(10)]
6571
for task in asyncio.as_completed(tasks):
6672
with pytest.raises(SendMessageError):
6773
await task

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_recovery.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
# Licensed under the MIT License. See License.txt in the project root for
55
# license information.
66
# -------------------------------------------------------------------------
7-
import time
87
import pytest
98
from devtools_testutils import recorded_by_proxy
10-
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, TEST_RESULT, on_group_message
9+
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, TEST_RESULT
1110

1211

1312
@pytest.mark.live_test_only
@@ -18,24 +17,14 @@ class TestWebpubsubClientRecovery(WebpubsubClientTest):
1817
def test_recovery(self, webpubsubclient_endpoint):
1918
client = self.create_client(endpoint=webpubsubclient_endpoint, message_retry_total=10)
2019
name = "test_recovery"
20+
connected_event, message_event = self.setup_events(client)
2121
with client:
22-
# wait for connection_id to be updated
23-
for _ in range(30):
24-
if client._connection_id is not None:
25-
break
26-
time.sleep(1)
22+
assert connected_event.wait(timeout=30), "Timed out waiting for connection"
2723
conn_id0 = client._connection_id
28-
group_name = name
29-
client.subscribe("group-message", on_group_message)
30-
client.join_group(group_name)
24+
client.join_group(name)
3125
client._ws.sock.close(1001) # close connection to trigger recovery
32-
client.send_to_group(group_name, name, "text")
26+
self.retry_send_until_message(client, name, name, message_event)
3327
conn_id1 = client._connection_id
34-
# wait for on_group_message callback to fire
35-
for _ in range(10):
36-
if name in TEST_RESULT:
37-
break
38-
time.sleep(1)
3928

4029
assert name in TEST_RESULT
4130
assert conn_id0 is not None

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_recovery_async.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pytest
99
from devtools_testutils.aio import recorded_by_proxy_async
1010
from testcase import WebpubsubClientPowerShellPreparer
11-
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC, on_group_message
11+
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC
1212

1313

1414
@pytest.mark.live_test_only
@@ -19,24 +19,14 @@ class TestWebpubsubClientRecoveryAsync(WebpubsubClientTestAsync):
1919
async def test_recovery_async(self, webpubsubclient_endpoint):
2020
client = await self.create_client(endpoint=webpubsubclient_endpoint, message_retry_total=10)
2121
name = "test_recovery_async"
22+
connected_event, message_event = await self.setup_events(client)
2223
async with client:
23-
# wait for connection_id to be updated
24-
for _ in range(30):
25-
if client._connection_id is not None:
26-
break
27-
await asyncio.sleep(1)
24+
await asyncio.wait_for(connected_event.wait(), timeout=30)
2825
conn_id0 = client._connection_id
29-
group_name = name
30-
await client.subscribe("group-message", on_group_message)
31-
await client.join_group(group_name)
26+
await client.join_group(name)
3227
await client._ws.session.close() # close connection to trigger recovery
33-
await client.send_to_group(group_name, name, "text")
28+
await self.retry_send_until_message(client, name, name, message_event)
3429
conn_id1 = client._connection_id
35-
# wait for on_group_message callback to fire
36-
for _ in range(10):
37-
if name in TEST_RESULT_ASYNC:
38-
break
39-
await asyncio.sleep(1)
4030

4131
assert name in TEST_RESULT_ASYNC
4232
assert conn_id0 is not None

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_send_concurrently_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@ async def test_send_concurrently_async(self, webpubsubclient_endpoint):
2020
async with client:
2121
group_name = "test_send_concurrently_async"
2222
await client.join_group(group_name)
23+
for _ in range(30):
24+
if client.is_connected():
25+
break
26+
await asyncio.sleep(1)
2327
await asyncio.gather(*[client.send_to_group(group_name, f"hello_{idx}", "text") for idx in range(100)])

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_smoke.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from testcase import (
1313
WebpubsubClientTest,
1414
WebpubsubClientPowerShellPreparer,
15-
on_group_message,
1615
TEST_RESULT,
1716
)
1817
from azure.messaging.webpubsubclient import WebPubSubClient, WebPubSubClientCredential
@@ -149,29 +148,20 @@ def _test(enable_auto_rejoin, test_group_name, assert_func):
149148
auto_rejoin_groups=enable_auto_rejoin,
150149
message_retry_total=10,
151150
)
152-
group_name = test_group_name
153-
client.subscribe("group-message", on_group_message)
151+
connected_event, message_event = self.setup_events(client)
154152
with client:
155-
client.join_group(group_name)
153+
client.join_group(test_group_name)
156154

155+
connected_event.clear()
156+
message_event.clear()
157157
with client:
158-
# retry send until connection is ready (open() runs in background thread)
159-
for attempt in range(30):
160-
try:
161-
if not client.is_connected():
162-
time.sleep(1)
163-
continue
164-
client.send_to_group(group_name, group_name, "text")
165-
break
166-
except SendMessageError:
167-
time.sleep(1)
158+
assert connected_event.wait(timeout=30), "Timed out waiting for connection"
159+
if enable_auto_rejoin:
160+
self.retry_send_until_message(client, test_group_name, test_group_name, message_event)
168161
else:
169-
raise RuntimeError("Failed to send after 30 attempts")
162+
client.send_to_group(test_group_name, test_group_name, "text")
170163
# wait for on_group_message callback to fire
171-
for _ in range(10):
172-
if assert_func(test_group_name):
173-
break
174-
time.sleep(1)
164+
message_event.wait(timeout=10)
175165
assert assert_func(test_group_name)
176166

177167
_test(

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_smoke_async.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from testcase import WebpubsubClientPowerShellPreparer
1212
from testcase_async import (
1313
WebpubsubClientTestAsync,
14-
on_group_message,
1514
TEST_RESULT_ASYNC,
1615
)
1716
from azure.messaging.webpubsubclient.aio import WebPubSubClient as AsyncWebPubSubClient, WebPubSubClientCredential as AsyncWebPubSubClientCredential
@@ -146,24 +145,23 @@ async def _test(enable_auto_rejoin, test_group_name, assert_func):
146145
endpoint=webpubsubclient_endpoint,
147146
auto_rejoin_groups=enable_auto_rejoin,
148147
)
149-
group_name = test_group_name
150-
await client.subscribe("group-message", on_group_message)
148+
connected_event, message_event = await self.setup_events(client)
151149
async with client:
152-
await client.join_group(group_name)
150+
await client.join_group(test_group_name)
153151

152+
connected_event.clear()
153+
message_event.clear()
154154
async with client:
155-
# wait for connection and auto-rejoin to complete
156-
for _ in range(30):
157-
if client.is_connected():
158-
break
159-
await asyncio.sleep(1)
160-
await asyncio.sleep(2) # extra time for auto-rejoin
161-
await client.send_to_group(group_name, group_name, "text")
155+
await asyncio.wait_for(connected_event.wait(), timeout=30)
156+
if enable_auto_rejoin:
157+
await self.retry_send_until_message(client, test_group_name, test_group_name, message_event)
158+
else:
159+
await client.send_to_group(test_group_name, test_group_name, "text")
162160
# wait for on_group_message callback to fire
163-
for _ in range(10):
164-
if assert_func(test_group_name):
165-
break
166-
await asyncio.sleep(1)
161+
try:
162+
await asyncio.wait_for(message_event.wait(), timeout=10)
163+
except asyncio.TimeoutError:
164+
pass
167165
assert assert_func(test_group_name)
168166

169167
await _test(

0 commit comments

Comments
 (0)