Skip to content

Commit 4f08dc8

Browse files
committed
feat: add async and sync assert waiter utilities and update tests to remove temporary delays
1 parent 5007cb6 commit 4f08dc8

3 files changed

Lines changed: 81 additions & 21 deletions

File tree

ably/scripts/unasync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ def run():
239239

240240
_TOKEN_REPLACE["AsyncClient"] = "Client"
241241
_TOKEN_REPLACE["aclose"] = "close"
242+
_TOKEN_REPLACE["assert_waiter"] = "assert_waiter_sync"
242243

243244
_IMPORTS_REPLACE["ably"] = "ably.sync"
244245

test/ably/rest/restchannelpublish_test.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import mock
1010
import msgpack
1111
import pytest
12-
import asyncio
1312

1413
from ably import api_version
1514
from ably import AblyException, IncompatibleClientIdException
@@ -20,7 +19,7 @@
2019
from test.ably import utils
2120

2221
from test.ably.testapp import TestApp
23-
from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase
22+
from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase, assert_waiter
2423

2524
log = logging.getLogger(__name__)
2625

@@ -402,26 +401,31 @@ async def test_interoperability(self):
402401
expected_value = input_msg.get('expectedValue')
403402

404403
# 1)
405-
await channel.publish(data=expected_value)
406-
# temporary added delay, we need to investigate why messages don't appear immediately
407-
await asyncio.sleep(1)
408-
async with httpx.AsyncClient(http2=True) as client:
409-
r = await client.get(url, auth=auth)
410-
item = r.json()[0]
411-
assert item.get('encoding') == encoding
412-
if encoding == 'json':
413-
assert json.loads(item['data']) == json.loads(msg_data)
414-
else:
415-
assert item['data'] == msg_data
404+
response = await channel.publish(data=expected_value)
405+
assert response.status_code == 201
406+
407+
async def check_data():
408+
async with httpx.AsyncClient(http2=True) as client:
409+
r = await client.get(url, auth=auth)
410+
item = r.json()[0]
411+
encoding_is_correct = item.get('encoding') == encoding
412+
if encoding == 'json':
413+
return encoding_is_correct and json.loads(item['data']) == json.loads(msg_data)
414+
else:
415+
return encoding_is_correct and item['data'] == msg_data
416+
417+
await assert_waiter(check_data)
416418

417419
# 2)
418-
await channel.publish(messages=[Message(data=msg_data, encoding=encoding)])
419-
# temporary added delay, we need to investigate why messages don't appear immediately
420-
await asyncio.sleep(1)
421-
history = await channel.history()
422-
message = history.items[0]
423-
assert message.data == expected_value
424-
assert type(message.data) == type_mapping[expected_type]
420+
response = await channel.publish(messages=[Message(data=msg_data, encoding=encoding)])
421+
assert response.status_code == 201
422+
423+
async def check_history():
424+
history = await channel.history()
425+
message = history.items[0]
426+
return message.data == expected_value and type(message.data) == type_mapping[expected_type]
427+
428+
await assert_waiter(check_history)
425429

426430
# https://github.com/ably/ably-python/issues/130
427431
async def test_publish_slash(self):

test/ably/utils.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import asyncio
12
import functools
23
import os
34
import random
45
import string
5-
import unittest
66
import sys
7+
import time
8+
import unittest
9+
from typing import Callable, Awaitable
710

811
if sys.version_info >= (3, 8):
912
from unittest import IsolatedAsyncioTestCase
@@ -178,3 +181,55 @@ def get_submodule_dir(filepath):
178181
if os.path.exists(os.path.join(root_dir, 'submodules')):
179182
return os.path.join(root_dir, 'submodules')
180183
root_dir = os.path.dirname(root_dir)
184+
185+
186+
async def assert_waiter(block: Callable[[], Awaitable[bool]], timeout: float = 10) -> None:
187+
"""
188+
Polls a condition until it succeeds or times out.
189+
Args:
190+
block: A callable that returns a boolean indicating success
191+
timeout: Maximum time to wait in seconds (default: 10)
192+
Raises:
193+
TimeoutError: If condition not met within timeout
194+
"""
195+
try:
196+
await asyncio.wait_for(_poll_until_success(block), timeout=timeout)
197+
except asyncio.TimeoutError:
198+
raise asyncio.TimeoutError(f"Condition not met within {timeout}s")
199+
200+
201+
async def _poll_until_success(block: Callable[[], Awaitable[bool]]) -> None:
202+
while True:
203+
try:
204+
success = await block()
205+
if success:
206+
break
207+
except Exception:
208+
pass
209+
210+
await asyncio.sleep(0.1)
211+
212+
213+
def assert_waiter_sync(block: Callable[[], bool], timeout: float = 10) -> None:
214+
"""
215+
Blocking version of assert_waiter that polls a condition until it succeeds or times out.
216+
Args:
217+
block: A callable that returns a boolean indicating success
218+
timeout: Maximum time to wait in seconds (default: 10)
219+
Raises:
220+
TimeoutError: If condition not met within timeout
221+
"""
222+
start_time = time.time()
223+
224+
while True:
225+
try:
226+
success = block()
227+
if success:
228+
break
229+
except Exception:
230+
pass
231+
232+
if time.time() - start_time >= timeout:
233+
raise TimeoutError(f"Condition not met within {timeout}s")
234+
235+
time.sleep(0.1)

0 commit comments

Comments
 (0)