Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eng/tools/azure-sdk-tools/azpysdk/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@
"blob_samples_container_access_policy_async.py",
"blob_samples_client_side_encryption_keyvault.py",
],
"azure-messaging-webpubsubservice": [
"integration_sample.py",
],
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-e ../../../eng/tools/azure-sdk-tools
../../core/azure-core
../../identity/azure-identity
azure-messaging-webpubsubservice==1.1.0b1
../azure-messaging-webpubsubservice
psutil
aiohttp>=3.9.3
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from azure.messaging.webpubsubclient import WebPubSubClient, WebPubSubClientCredential
from azure.messaging.webpubsubservice import WebPubSubServiceClient
from azure.identity import DefaultAzureCredential
from azure.messaging.webpubsubclient.models import (
OnConnectedArgs,
OnGroupDataMessageArgs,
Expand Down Expand Up @@ -37,8 +38,8 @@ def on_group_message(msg: OnGroupDataMessageArgs):


def main():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=DefaultAzureCredential()
)
client = WebPubSubClient(
credential=WebPubSubClientCredential(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import asyncio
from azure.messaging.webpubsubclient.aio import WebPubSubClient, WebPubSubClientCredential
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
from azure.identity.aio import DefaultAzureCredential
from azure.messaging.webpubsubclient.models import (
OnConnectedArgs,
OnGroupDataMessageArgs,
Expand Down Expand Up @@ -38,8 +39,9 @@ async def on_group_message(msg: OnGroupDataMessageArgs):


async def main():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
credential = DefaultAzureCredential()
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=credential
)
async def client_access_url_provider():
return (await service_client.get_client_access_token(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
from azure.messaging.webpubsubclient import WebPubSubClient, WebPubSubClientCredential
from azure.messaging.webpubsubservice import WebPubSubServiceClient
from azure.identity import DefaultAzureCredential
from azure.messaging.webpubsubclient.models import OpenClientError, SendMessageError
from dotenv import load_dotenv

Expand All @@ -17,8 +18,8 @@
# The following code is to show how to handle exceptions in WebPubSubClient, and it
# may not run directly
def main():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=DefaultAzureCredential()
)
client = WebPubSubClient(
credential=WebPubSubClientCredential(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from azure.messaging.webpubsubclient import WebPubSubClient as Client
from azure.messaging.webpubsubclient import WebPubSubClientCredential
from azure.messaging.webpubsubclient.models import WebPubSubDataType
from azure.identity import DefaultAzureCredential
from azure.identity.aio import DefaultAzureCredential as AsyncDefaultAzureCredential
from dotenv import load_dotenv

load_dotenv()
Expand All @@ -23,16 +25,17 @@


def client_access_url_provider():
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
service_client = WebPubSubServiceClient(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=DefaultAzureCredential()
)
return service_client.get_client_access_token(
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
)["url"]

async def client_access_url_provider_async():
service_client_async = WebPubSubServiceClientAsync.from_connection_string( # type: ignore
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
credential = AsyncDefaultAzureCredential()
service_client_async = WebPubSubServiceClientAsync(
endpoint=os.getenv("WEBPUBSUB_ENDPOINT", ""), hub="hub", credential=credential
)
return (await service_client_async.get_client_access_token(
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
Expand Down Expand Up @@ -81,7 +84,7 @@ async def send_async() -> None:

if __name__ == "__main__":
send()
asyncio.get_event_loop().run_until_complete(send_async())
asyncio.run(send_async())
print(
f"it takes {TIME_COST} seconds to send {MESSAGE_COUNT} messages with Sync API"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

@pytest.fixture(scope="session", autouse=True)
def add_sanitizers(test_proxy):
connection_string = os.environ.get("WEBPUBSUBCLIENT_CONNECTION_STRING", "WEBPUBSUBCLIENT_CONNECTION_STRING")
endpoint = os.environ.get("WEBPUBSUBCLIENT_ENDPOINT", "WEBPUBSUBCLIENT_ENDPOINT")
add_general_regex_sanitizer(
regex=connection_string,
value="Endpoint=https://myservice.webpubsub.azure.com;AccessKey=aaaaaaaaaaaaa;Version=1.0;",
regex=endpoint,
value="https://myservice.webpubsub.azure.com",
)
Comment thread
MoChilia marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class TestWebpubsubClientAutoConnect(WebpubsubClientTest):
# auto_connect will be triggered if connection is dropped by accident and we disable recovery
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_auto_connect(self, webpubsubclient_connection_string):
def test_auto_connect(self, webpubsubclient_endpoint):
client = self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
protocol_type=WebPubSubProtocolType.JSON,
message_retry_total=10,
reconnect_retry_total=10,
Expand All @@ -27,15 +27,27 @@ def test_auto_connect(self, webpubsubclient_connection_string):
)
name = "test_auto_connect"
with client:
time.sleep(0.001) # wait for connection_id to be updated
# wait for connection_id to be updated
for _ in range(30):
if client._connection_id is not None:
break
time.sleep(1)
conn_id0 = client._connection_id
group_name = name
client.subscribe("group-message", on_group_message)
client.join_group(group_name)
client._ws.sock.close(1001) # close the connection to trigger auto connect
time.sleep(3) # wait for reconnect
client.send_to_group(group_name, name, "text")
time.sleep(1) # wait for on_group_message to be called
# wait for reconnect
for _ in range(30):
if client.is_connected() and client._connection_id != conn_id0:
break
time.sleep(1)
# retry send_to_group to allow async group rejoin to complete
for _ in range(10):
client.send_to_group(group_name, name, "text")
time.sleep(1)
if name in TEST_RESULT:
break
conn_id1 = client._connection_id
assert conn_id0 is not None
assert conn_id1 is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class TestWebpubsubClientAutoConnectAsync(WebpubsubClientTestAsync):
# auto_connect will be triggered if connection is dropped by accident and we disable recovery
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_auto_connect_async(self, webpubsubclient_connection_string):
async def test_auto_connect_async(self, webpubsubclient_endpoint):
client = await self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
protocol_type=WebPubSubProtocolType.JSON,
message_retry_total=10,
reconnect_retry_total=10,
Expand All @@ -27,17 +27,29 @@ async def test_auto_connect_async(self, webpubsubclient_connection_string):
)
name = "test_auto_connect_async"
async with client:
await asyncio.sleep(0.001) # wait for connection_id to be updated
# wait for connection_id to be updated
for _ in range(30):
if client._connection_id is not None:
break
await asyncio.sleep(1)
conn_id0 = client._connection_id
group_name = name
await client.subscribe("group-message", on_group_message)
await client.join_group(group_name)
await client._ws.sock.close(
code=1001
) # close the connection to trigger auto connect
await asyncio.sleep(3) # wait for reconnect
# wait for reconnect
for _ in range(30):
if client.is_connected() and client._connection_id != conn_id0:
break
await asyncio.sleep(1)
await client.send_to_group(group_name, name, "text")
await asyncio.sleep(1) # wait for on_group_message to be called
# wait for on_group_message callback to fire
for _ in range(10):
if name in TEST_RESULT_ASYNC:
break
await asyncio.sleep(1)
conn_id1 = client._connection_id
assert conn_id0 is not None
assert conn_id1 is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ class TestWebpubsubClientNoRecoveryNoReconnect(WebpubsubClientTest):
# disable recovery and auto reconnect
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_disable_recovery_and_autoconnect(self, webpubsubclient_connection_string):
def test_disable_recovery_and_autoconnect(self, webpubsubclient_endpoint):
client = self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
protocol_type=WebPubSubProtocolType.JSON,
)
Expand All @@ -37,20 +37,25 @@ def test_disable_recovery_and_autoconnect(self, webpubsubclient_connection_strin
client.subscribe("group-message", on_group_message)
client.join_group(group_name)
client._ws.sock.close(1001) # close connection
# wait for client to detect disconnection so send raises SendMessageError
for _ in range(30):
if not client.is_connected():
break
time.sleep(1)
with pytest.raises(SendMessageError):
client.send_to_group(group_name, name, "text")
time.sleep(1) # wait for on_group_message to be called
time.sleep(3) # wait to confirm message was NOT received

assert name not in TEST_RESULT

# disable recovery and auto reconnect, then send message concurrently
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_disable_recovery_and_autoconnect_send_concurrently(
self, webpubsubclient_connection_string
self, webpubsubclient_endpoint
):
client = self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
message_retry_total=3,
protocol_type=WebPubSubProtocolType.JSON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class TestWebpubsubClientNoRecoveryNoReconnectAsync(WebpubsubClientTestAsync):
# disable recovery and auto reconnect
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_connection_string):
async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_endpoint):
client = await self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
protocol_type=WebPubSubProtocolType.JSON,
)
Expand All @@ -32,20 +32,25 @@ async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_conn
await client.subscribe("group-message", on_group_message)
await client.join_group(group_name)
await client._ws.session.close() # close connection
# wait for client to detect disconnection so send raises SendMessageError
for _ in range(30):
if not client.is_connected():
break
await asyncio.sleep(1)
with pytest.raises(SendMessageError):
await client.send_to_group(group_name, name, "text")
await asyncio.sleep(1) # wait for on_group_message to be called
await asyncio.sleep(3) # wait to confirm message was NOT received

assert name not in TEST_RESULT_ASYNC

# disable recovery and auto reconnect, then send message concurrently
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_disable_recovery_and_autoconnect_send_concurrently_async(
self, webpubsubclient_connection_string
self, webpubsubclient_endpoint
):
client = await self.create_client(
connection_string=webpubsubclient_connection_string,
endpoint=webpubsubclient_endpoint,
reconnect_retry_total=0,
message_retry_total=3,
protocol_type=WebPubSubProtocolType.JSON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@ class TestWebpubsubClientRecovery(WebpubsubClientTest):
# recovery will be triggered if connection is dropped by accident
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_recovery(self, webpubsubclient_connection_string):
client = self.create_client(connection_string=webpubsubclient_connection_string, message_retry_total=10)
def test_recovery(self, webpubsubclient_endpoint):
client = self.create_client(endpoint=webpubsubclient_endpoint, message_retry_total=10)
name = "test_recovery"
with client:
time.sleep(0.001) # wait for connection_id to be updated
# wait for connection_id to be updated
for _ in range(30):
if client._connection_id is not None:
break
time.sleep(1)
conn_id0 = client._connection_id
group_name = name
client.subscribe("group-message", on_group_message)
client.join_group(group_name)
client._ws.sock.close(1001) # close connection to trigger recovery
client.send_to_group(group_name, name, "text")
conn_id1 = client._connection_id
time.sleep(1) # wait for on_group_message to be called
# wait for on_group_message callback to fire
for _ in range(10):
if name in TEST_RESULT:
break
time.sleep(1)

assert name in TEST_RESULT
assert conn_id0 is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,27 @@ class TestWebpubsubClientRecoveryAsync(WebpubsubClientTestAsync):
# recovery will be triggered if connection is dropped by accident
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy_async
async def test_recovery_async(self, webpubsubclient_connection_string):
client = await self.create_client(connection_string=webpubsubclient_connection_string, message_retry_total=10)
async def test_recovery_async(self, webpubsubclient_endpoint):
client = await self.create_client(endpoint=webpubsubclient_endpoint, message_retry_total=10)
name = "test_recovery_async"
async with client:
await asyncio.sleep(0.001) # wait for connection_id to be updated
# wait for connection_id to be updated
for _ in range(30):
if client._connection_id is not None:
break
await asyncio.sleep(1)
conn_id0 = client._connection_id
group_name = name
await client.subscribe("group-message", on_group_message)
await client.join_group(group_name)
await client._ws.session.close() # close connection to trigger recovery
await client.send_to_group(group_name, name, "text")
conn_id1 = client._connection_id
await asyncio.sleep(1) # wait for on_group_message to be called
# wait for on_group_message callback to fire
for _ in range(10):
if name in TEST_RESULT_ASYNC:
break
await asyncio.sleep(1)

assert name in TEST_RESULT_ASYNC
assert conn_id0 is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import time
import pytest
from devtools_testutils import recorded_by_proxy
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, SafeThread
Expand All @@ -13,11 +14,16 @@
class TestWebpubsubClientSendConcurrently(WebpubsubClientTest):
@WebpubsubClientPowerShellPreparer()
@recorded_by_proxy
def test_send_concurrently(self, webpubsubclient_connection_string):
client = self.create_client(connection_string=webpubsubclient_connection_string)
def test_send_concurrently(self, webpubsubclient_endpoint):
client = self.create_client(endpoint=webpubsubclient_endpoint)
with client:
group_name = "test_send_concurrently"
client.join_group(group_name)
# wait for connection to stabilize before concurrent sends
for _ in range(30):
if client.is_connected():
break
time.sleep(1)

def send(idx):
client.send_to_group(group_name, f"hello_{idx}", "text")
Expand Down
Loading
Loading