Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eng/tools/azure-sdk-tools/azpysdk/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@
"blob_samples_container_access_policy_async.py",
"blob_samples_client_side_encryption_keyvault.py",
],
"azure-messaging-webpubsubservice": [
"integration_sample.py",
],
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-e ../../../eng/tools/azure-sdk-tools
../../core/azure-core
../../identity/azure-identity
azure-messaging-webpubsubservice==1.1.0b1
../azure-messaging-webpubsubservice
psutil
aiohttp>=3.9.3
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from azure.messaging.webpubsubclient import WebPubSubClient, WebPubSubClientCredential
from azure.messaging.webpubsubservice import WebPubSubServiceClient
from azure.identity import DefaultAzureCredential
from azure.messaging.webpubsubclient.models import (
OnConnectedArgs,
OnGroupDataMessageArgs,
Expand Down Expand Up @@ -37,8 +38,8 @@ def on_group_message(msg: OnGroupDataMessageArgs):


def main():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=DefaultAzureCredential()
)
client = WebPubSubClient(
credential=WebPubSubClientCredential(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import asyncio
from azure.messaging.webpubsubclient.aio import WebPubSubClient, WebPubSubClientCredential
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
from azure.identity.aio import DefaultAzureCredential
from azure.messaging.webpubsubclient.models import (
OnConnectedArgs,
OnGroupDataMessageArgs,
Expand Down Expand Up @@ -38,8 +39,9 @@ async def on_group_message(msg: OnGroupDataMessageArgs):


async def main():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
credential = DefaultAzureCredential()
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=credential
)
async def client_access_url_provider():
return (await service_client.get_client_access_token(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
from azure.messaging.webpubsubclient import WebPubSubClient, WebPubSubClientCredential
from azure.messaging.webpubsubservice import WebPubSubServiceClient
from azure.identity import DefaultAzureCredential
from azure.messaging.webpubsubclient.models import OpenClientError, SendMessageError
from dotenv import load_dotenv

Expand All @@ -17,8 +18,8 @@
# The following code is to show how to handle exceptions in WebPubSubClient, and it
# may not run directly
def main():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=DefaultAzureCredential()
)
client = WebPubSubClient(
credential=WebPubSubClientCredential(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from azure.messaging.webpubsubclient import WebPubSubClient as Client
from azure.messaging.webpubsubclient import WebPubSubClientCredential
from azure.messaging.webpubsubclient.models import WebPubSubDataType
from azure.identity import DefaultAzureCredential
from azure.identity.aio import DefaultAzureCredential as AsyncDefaultAzureCredential
from dotenv import load_dotenv

load_dotenv()
Expand All @@ -23,16 +25,17 @@


def client_access_url_provider():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=DefaultAzureCredential()
)
return service_client.get_client_access_token(
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
)["url"]

async def client_access_url_provider_async():
service_client_async = WebPubSubServiceClientAsync.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
credential = AsyncDefaultAzureCredential()
service_client_async = WebPubSubServiceClientAsync(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=credential
)
return (await service_client_async.get_client_access_token(
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
Expand Down Expand Up @@ -81,7 +84,7 @@ async def send_async() -> None:

if __name__ == "__main__":
send()
asyncio.get_event_loop().run_until_complete(send_async())
asyncio.run(send_async())
print(
f"it takes {TIME_COST} seconds to send {MESSAGE_COUNT} messages with Sync API"
)
Expand Down
19 changes: 16 additions & 3 deletions sdk/webpubsub/azure-messaging-webpubsubclient/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,29 @@
#
# --------------------------------------------------------------------------
import os
import re
import pytest
from dotenv import load_dotenv
from devtools_testutils import test_proxy, add_general_regex_sanitizer
from testcase import TEST_RESULT
from testcase_async import TEST_RESULT_ASYNC

load_dotenv()


@pytest.fixture(autouse=True)
def clear_test_results():
TEST_RESULT.clear()
TEST_RESULT_ASYNC.clear()
yield
TEST_RESULT.clear()
TEST_RESULT_ASYNC.clear()


@pytest.fixture(scope="session", autouse=True)
def add_sanitizers(test_proxy):
connection_string = os.environ.get("WEBPUBSUBCLIENT_CONNECTION_STRING", "WEBPUBSUBCLIENT_CONNECTION_STRING")
endpoint = os.environ.get("WEBPUBSUBCLIENT_ENDPOINT", "WEBPUBSUBCLIENT_ENDPOINT")
add_general_regex_sanitizer(
regex=connection_string,
value="Endpoint=https://myservice.webpubsub.azure.com;AccessKey=aaaaaaaaaaaaa;Version=1.0;",
regex=re.escape(endpoint),
value="https://myservice.webpubsub.azure.com",
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import time
import pytest
from devtools_testutils import recorded_by_proxy
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, TEST_RESULT, on_group_message
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, TEST_RESULT
from azure.messaging.webpubsubclient.models import WebPubSubProtocolType


Expand All @@ -16,26 +15,26 @@ class TestWebpubsubClientAutoConnect(WebpubsubClientTest):
# auto_connect will be triggered if connection is dropped by accident and we disable recovery
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_auto_connect(self, webpubsubclient_connection_string):
def test_auto_connect(self, webpubsubclient_endpoint):
client = self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
protocol_type=WebPubSubProtocolType.JSON,
message_retry_total=10,
reconnect_retry_total=10,
reconnect_retry_mode="fixed",
reconnect_retry_backoff_factor=0.1,
)
name = "test_auto_connect"
connected_event, _, message_event = self.setup_events(client)
with client:
time.sleep(0.001) # wait for connection_id to be updated
assert connected_event.wait(timeout=30), "Timed out waiting for initial connection"
conn_id0 = client._connection_id
group_name = name
client.subscribe("group-message", on_group_message)
client.join_group(group_name)
client.join_group(name)
connected_event.clear() # reset for reconnection detection
client._ws.sock.close(1001) # close the connection to trigger auto connect
time.sleep(3) # wait for reconnect
client.send_to_group(group_name, name, "text")
time.sleep(1) # wait for on_group_message to be called
# wait for reconnect
assert connected_event.wait(timeout=30), "Timed out waiting for reconnection"
self.retry_send_until_message(client, name, name, message_event, retries=10)
conn_id1 = client._connection_id
assert conn_id0 is not None
assert conn_id1 is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import asyncio
import pytest
from devtools_testutils.aio import recorded_by_proxy_async
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC, on_group_message
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC
from testcase import WebpubsubClientPowerShellPreparer
from azure.messaging.webpubsubclient.models import WebPubSubProtocolType

Expand All @@ -16,28 +16,26 @@ class TestWebpubsubClientAutoConnectAsync(WebpubsubClientTestAsync):
# auto_connect will be triggered if connection is dropped by accident and we disable recovery
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_auto_connect_async(self, webpubsubclient_connection_string):
async def test_auto_connect_async(self, webpubsubclient_endpoint):
client = await self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
protocol_type=WebPubSubProtocolType.JSON,
message_retry_total=10,
reconnect_retry_total=10,
reconnect_retry_mode="fixed",
reconnect_retry_backoff_factor=0.1,
)
name = "test_auto_connect_async"
connected_event, _, message_event = await self.setup_events(client)
async with client:
await asyncio.sleep(0.001) # wait for connection_id to be updated
await asyncio.wait_for(connected_event.wait(), timeout=30)
conn_id0 = client._connection_id
group_name = name
await client.subscribe("group-message", on_group_message)
await client.join_group(group_name)
await client._ws.sock.close(
code=1001
) # close the connection to trigger auto connect
await asyncio.sleep(3) # wait for reconnect
await client.send_to_group(group_name, name, "text")
await asyncio.sleep(1) # wait for on_group_message to be called
await client.join_group(name)
connected_event.clear() # reset for reconnection detection
await client._ws.sock.close(code=1001) # close the connection to trigger auto connect
# wait for reconnect
await asyncio.wait_for(connected_event.wait(), timeout=30)
await self.retry_send_until_message(client, name, name, message_event, retries=10)
conn_id1 = client._connection_id
assert conn_id0 is not None
assert conn_id1 is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,45 @@ class TestWebpubsubClientNoRecoveryNoReconnect(WebpubsubClientTest):
# disable recovery and auto reconnect
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_disable_recovery_and_autoconnect(self, webpubsubclient_connection_string):
def test_disable_recovery_and_autoconnect(self, webpubsubclient_endpoint):
client = self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
protocol_type=WebPubSubProtocolType.JSON,
)
name = "test_disable_recovery_and_autoconnect"
with client:
group_name = name
client.subscribe("group-message", on_group_message)
_, disconnected_event, _ = self.setup_events(client)
client.join_group(group_name)
client._ws.sock.close(1001) # close connection
assert disconnected_event.wait(timeout=30), "Timed out waiting for disconnection"
with pytest.raises(SendMessageError):
client.send_to_group(group_name, name, "text")
time.sleep(1) # wait for on_group_message to be called
time.sleep(3) # wait to confirm message was NOT received

assert name not in TEST_RESULT

# disable recovery and auto reconnect, then send message concurrently
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_disable_recovery_and_autoconnect_send_concurrently(
self, webpubsubclient_connection_string
self, webpubsubclient_endpoint
):
client = self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
message_retry_total=3,
protocol_type=WebPubSubProtocolType.JSON,
)

with client:
group_name = "test_disable_recovery_and_autoconnect_send_concurrently"
_, disconnected_event, _ = self.setup_events(client)
client.join_group(group_name)
client._ws.sock.close(1001) # close connection
assert disconnected_event.wait(timeout=30), "Timed out waiting for disconnection"
assert not client.is_connected()

def send(idx):
client.send_to_group(group_name, f"hello_{idx}", "text")
Expand All @@ -68,10 +73,7 @@ def send(idx):
t = SafeThread(target=send, args=(i,))
t.start()
all_threads.append(t)
if i == 50:
client._ws.sock.close(1001) # close connection

for i, t in enumerate(all_threads):
if i > 50:
with pytest.raises(Exception):
t.join()
for t in all_threads:
with pytest.raises(SendMessageError):
t.join()
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,47 @@ class TestWebpubsubClientNoRecoveryNoReconnectAsync(WebpubsubClientTestAsync):
# disable recovery and auto reconnect
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_connection_string):
async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_endpoint):
client = await self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
protocol_type=WebPubSubProtocolType.JSON,
)
name = "test_disable_recovery_and_autoconnect_async"
async with client:
group_name = name
await client.subscribe("group-message", on_group_message)
_, disconnected_event, _ = await self.setup_events(client)
await client.join_group(group_name)
await client._ws.session.close() # close connection
await asyncio.wait_for(disconnected_event.wait(), timeout=30)
with pytest.raises(SendMessageError):
await client.send_to_group(group_name, name, "text")
await asyncio.sleep(1) # wait for on_group_message to be called
await asyncio.sleep(3) # wait to confirm message was NOT received

assert name not in TEST_RESULT_ASYNC

# disable recovery and auto reconnect, then send message concurrently
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_disable_recovery_and_autoconnect_send_concurrently_async(
self, webpubsubclient_connection_string
self, webpubsubclient_endpoint
):
client = await self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
message_retry_total=3,
protocol_type=WebPubSubProtocolType.JSON,
)

async with client:
group_name = "test_disable_recovery_and_autoconnect_send_concurrently_async"
_, disconnected_event, _ = await self.setup_events(client)
await client.join_group(group_name)
count = 10
tasks = [client.send_to_group(group_name, "hello", "text") for _ in range(10)]
await client._ws.session.close() # close connection
await asyncio.wait_for(disconnected_event.wait(), timeout=30)
assert not client.is_connected()

tasks = [client.send_to_group(group_name, "hello", "text") for _ in range(10)]
for task in asyncio.as_completed(tasks):
with pytest.raises(SendMessageError):
await task
Loading
Loading