Skip to content

Commit 965d955

Browse files
committed
The logs for each test should be printed along with the test name itself
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 27653c7 commit 965d955

18 files changed

Lines changed: 169 additions & 446 deletions

packages/pynumaflow/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ test-unit:
2323
test-integration:
2424
uv run pytest tests/ -m integration -rA
2525

26+
test-debug:
27+
uv run pytest tests/ -rA --log-cli-level=DEBUG
28+
2629

2730
setup:
2831
uv sync --all-groups

packages/pynumaflow/pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ exclude = '''
8888
strict = true
8989
collect_imported_tests = false
9090
console_output_style = "times"
91-
log_cli = true
92-
log_cli_level = "DEBUG"
9391
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
9492
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
9593
markers = [

packages/pynumaflow/tests/accumulator/test_async_accumulator.py

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import asyncio
21
import logging
3-
import threading
42
from collections.abc import AsyncIterable
53

64
import grpc
@@ -16,6 +14,7 @@
1614
)
1715
from pynumaflow.proto.accumulator import accumulator_pb2, accumulator_pb2_grpc
1816
from pynumaflow.shared.asynciter import NonBlockingIterator
17+
from tests.conftest import create_async_loop, start_async_server, teardown_async_server
1918
from tests.testing_utils import (
2019
mock_message,
2120
mock_interval_window_start,
@@ -140,11 +139,6 @@ def start_request_without_open() -> accumulator_pb2.AccumulatorRequest:
140139
return request
141140

142141

143-
def startup_callable(loop):
144-
asyncio.set_event_loop(loop)
145-
loop.run_forever()
146-
147-
148142
class ExampleClass(Accumulator):
149143
def __init__(self, counter):
150144
self.counter = counter
@@ -176,36 +170,17 @@ async def _start_server(udfs):
176170
server.add_insecure_port(SOCK_PATH)
177171
logging.info("Starting server on %s", SOCK_PATH)
178172
await server.start()
179-
return server
173+
return server, SOCK_PATH
180174

181175

182176
@pytest.fixture(scope="module")
183177
def async_accumulator_server():
184178
"""Module-scoped fixture: starts an async gRPC accumulator server in a background thread."""
185-
loop = asyncio.new_event_loop()
186-
thread = threading.Thread(target=startup_callable, args=(loop,), daemon=True)
187-
thread.start()
188-
179+
loop = create_async_loop()
189180
udfs = NewAsyncAccumulator()
190-
future = asyncio.run_coroutine_threadsafe(_start_server(udfs), loop=loop)
191-
future.result(timeout=10)
192-
193-
# Wait for the server to be ready
194-
while True:
195-
try:
196-
with grpc.insecure_channel(SOCK_PATH) as channel:
197-
f = grpc.channel_ready_future(channel)
198-
f.result(timeout=10)
199-
if f.done():
200-
break
201-
except grpc.FutureTimeoutError as e:
202-
LOGGER.error("error trying to connect to grpc server")
203-
LOGGER.error(e)
204-
181+
server = start_async_server(loop, _start_server(udfs))
205182
yield loop
206-
207-
loop.stop()
208-
LOGGER.info("stopped the event loop")
183+
teardown_async_server(loop, server)
209184

210185

211186
@pytest.fixture()

packages/pynumaflow/tests/accumulator/test_async_accumulator_err.py

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import asyncio
21
import logging
3-
import threading
42
from collections.abc import AsyncIterable
53

64
import grpc
@@ -15,6 +13,7 @@
1513
)
1614
from pynumaflow.proto.accumulator import accumulator_pb2, accumulator_pb2_grpc
1715
from pynumaflow.shared.asynciter import NonBlockingIterator
16+
from tests.conftest import create_async_loop, start_async_server, teardown_async_server
1817
from tests.testing_utils import (
1918
mock_message,
2019
get_time_args,
@@ -59,11 +58,6 @@ def start_request() -> accumulator_pb2.AccumulatorRequest:
5958
return request
6059

6160

62-
def startup_callable(loop):
63-
asyncio.set_event_loop(loop)
64-
loop.run_forever()
65-
66-
6761
class ExampleErrorClass(Accumulator):
6862
def __init__(self, counter):
6963
self.counter = counter
@@ -101,36 +95,17 @@ async def _start_server(udfs):
10195
server.add_insecure_port(SOCK_PATH)
10296
logging.info("Starting server on %s", SOCK_PATH)
10397
await server.start()
104-
return server
98+
return server, SOCK_PATH
10599

106100

107101
@pytest.fixture(scope="module")
108102
def async_accumulator_err_server():
109103
"""Module-scoped fixture: starts an async gRPC accumulator error server."""
110-
loop = asyncio.new_event_loop()
111-
thread = threading.Thread(target=startup_callable, args=(loop,), daemon=True)
112-
thread.start()
113-
104+
loop = create_async_loop()
114105
udfs = NewAsyncAccumulatorError()
115-
future = asyncio.run_coroutine_threadsafe(_start_server(udfs), loop=loop)
116-
future.result(timeout=10)
117-
118-
# Wait for the server to be ready
119-
while True:
120-
try:
121-
with grpc.insecure_channel(SOCK_PATH) as channel:
122-
f = grpc.channel_ready_future(channel)
123-
f.result(timeout=10)
124-
if f.done():
125-
break
126-
except grpc.FutureTimeoutError as e:
127-
LOGGER.error("error trying to connect to grpc server")
128-
LOGGER.error(e)
129-
106+
server = start_async_server(loop, _start_server(udfs))
130107
yield loop
131-
132-
loop.stop()
133-
LOGGER.info("stopped the event loop")
108+
teardown_async_server(loop, server)
134109

135110

136111
@pytest.fixture()

packages/pynumaflow/tests/batchmap/test_async_batch_map.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import asyncio
21
import logging
3-
import threading
42
from collections.abc import AsyncIterable
53

64
import grpc
@@ -18,6 +16,7 @@
1816
)
1917
from pynumaflow.proto.mapper import map_pb2_grpc
2018
from tests.batchmap.utils import request_generator
19+
from tests.conftest import create_async_loop, start_async_server, teardown_async_server
2120

2221
pytestmark = pytest.mark.integration
2322

@@ -26,11 +25,6 @@
2625
listen_addr = "unix:///tmp/batch_map.sock"
2726

2827

29-
def startup_callable(loop):
30-
asyncio.set_event_loop(loop)
31-
loop.run_forever()
32-
33-
3428
class ExampleClass(BatchMapper):
3529
async def handler(
3630
self,
@@ -86,34 +80,20 @@ async def start_server(udfs):
8680
server.add_insecure_port(listen_addr)
8781
logging.info("Starting server on %s", listen_addr)
8882
await server.start()
89-
await server.wait_for_termination()
83+
return server, listen_addr
9084

9185

9286
@pytest.fixture(scope="module")
9387
def async_batch_map_server():
9488
"""Module-scoped fixture: starts an async gRPC batch map server in a background thread."""
95-
loop = asyncio.new_event_loop()
96-
thread = threading.Thread(target=startup_callable, args=(loop,), daemon=True)
97-
thread.start()
89+
loop = create_async_loop()
9890

9991
udfs = NewAsyncBatchMapper()
100-
asyncio.run_coroutine_threadsafe(start_server(udfs), loop=loop)
101-
102-
while True:
103-
try:
104-
with grpc.insecure_channel(listen_addr) as channel:
105-
f = grpc.channel_ready_future(channel)
106-
f.result(timeout=10)
107-
if f.done():
108-
break
109-
except grpc.FutureTimeoutError as e:
110-
LOGGER.error("error trying to connect to grpc server")
111-
LOGGER.error(e)
92+
server = start_async_server(loop, start_server(udfs))
11293

11394
yield loop
11495

115-
loop.stop()
116-
LOGGER.info("stopped the event loop")
96+
teardown_async_server(loop, server)
11797

11898

11999
@pytest.fixture()

packages/pynumaflow/tests/batchmap/test_async_batch_map_err.py

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import asyncio
21
import logging
3-
import threading
42

53
import grpc
64
import pytest
@@ -10,6 +8,7 @@
108
from pynumaflow.batchmapper import BatchMapAsyncServer
119
from pynumaflow.proto.mapper import map_pb2_grpc
1210
from tests.batchmap.utils import request_generator
11+
from tests.conftest import create_async_loop, start_async_server, teardown_async_server
1312

1413
pytestmark = pytest.mark.integration
1514

@@ -29,11 +28,6 @@ async def err_handler(datums) -> BatchResponses:
2928
listen_addr = "unix:///tmp/async_batch_map_err.sock"
3029

3130

32-
def startup_callable(loop):
33-
asyncio.set_event_loop(loop)
34-
loop.run_forever()
35-
36-
3731
async def start_server():
3832
server = grpc.aio.server()
3933
server_instance = BatchMapAsyncServer(err_handler)
@@ -42,33 +36,16 @@ async def start_server():
4236
server.add_insecure_port(listen_addr)
4337
logging.info("Starting server on %s", listen_addr)
4438
await server.start()
45-
await server.wait_for_termination()
39+
return server, listen_addr
4640

4741

4842
@pytest.fixture(scope="module")
4943
def async_batch_map_err_server():
5044
"""Module-scoped fixture: starts an async gRPC batch map error server in a background thread."""
51-
loop = asyncio.new_event_loop()
52-
thread = threading.Thread(target=startup_callable, args=(loop,), daemon=True)
53-
thread.start()
54-
55-
asyncio.run_coroutine_threadsafe(start_server(), loop=loop)
56-
57-
while True:
58-
try:
59-
with grpc.insecure_channel(listen_addr) as channel:
60-
f = grpc.channel_ready_future(channel)
61-
f.result(timeout=10)
62-
if f.done():
63-
break
64-
except grpc.FutureTimeoutError as e:
65-
LOGGER.error("error trying to connect to grpc server")
66-
LOGGER.error(e)
67-
45+
loop = create_async_loop()
46+
server = start_async_server(loop, start_server())
6847
yield loop
69-
70-
loop.stop()
71-
LOGGER.info("stopped the event loop")
48+
teardown_async_server(loop, server)
7249

7350

7451
@pytest.fixture()

packages/pynumaflow/tests/conftest.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,93 @@
55
sync, multiproc, and async test files.
66
"""
77

8+
import asyncio
9+
import logging
10+
import threading
11+
12+
import grpc
13+
14+
_logger = logging.getLogger(__name__)
15+
16+
17+
def start_async_server(loop, start_server_coro):
18+
"""Start an async gRPC server on the given event loop and wait until it is ready.
19+
20+
Args:
21+
loop: The asyncio event loop running in a background thread.
22+
start_server_coro: An awaitable that starts the server and returns
23+
a tuple of (grpc.aio.Server, sock_path).
24+
25+
Returns:
26+
The grpc.aio.Server instance.
27+
"""
28+
future = asyncio.run_coroutine_threadsafe(start_server_coro, loop=loop)
29+
server, sock_path = future.result(timeout=10)
30+
31+
# Block until the server is accepting connections
32+
while True:
33+
try:
34+
with grpc.insecure_channel(sock_path) as channel:
35+
f = grpc.channel_ready_future(channel)
36+
f.result(timeout=10)
37+
if f.done():
38+
break
39+
except grpc.FutureTimeoutError as e:
40+
_logger.error("error trying to connect to grpc server")
41+
_logger.error(e)
42+
43+
return server
44+
45+
46+
def create_async_loop():
47+
"""Create a new asyncio event loop running in a daemon thread.
48+
49+
Returns:
50+
The running event loop.
51+
"""
52+
loop = asyncio.new_event_loop()
53+
54+
def _run(lp):
55+
asyncio.set_event_loop(lp)
56+
lp.run_forever()
57+
58+
thread = threading.Thread(target=_run, args=(loop,), daemon=True)
59+
thread.start()
60+
return loop
61+
62+
63+
def teardown_async_server(loop, server):
64+
"""Gracefully shut down an async gRPC server and its event loop.
65+
66+
Stops the gRPC server, cancels any remaining tasks, then stops the loop.
67+
This prevents 'Task was destroyed but it is pending!' warnings.
68+
"""
69+
70+
async def _shutdown():
71+
await server.stop(grace=1)
72+
# Cancel any lingering tasks on this loop, excluding the current
73+
# _shutdown task itself to avoid recursive cancel chains.
74+
current = asyncio.current_task()
75+
tasks = [t for t in asyncio.all_tasks(loop) if not t.done() and t is not current]
76+
for task in tasks:
77+
task.cancel()
78+
# Await each cancelled task individually so a RecursionError in one
79+
# deeply-nested cancel chain does not prevent the others from being
80+
# reaped, and does not propagate up to the caller.
81+
for task in tasks:
82+
try:
83+
await task
84+
except (asyncio.CancelledError, RecursionError, Exception):
85+
pass
86+
87+
try:
88+
future = asyncio.run_coroutine_threadsafe(_shutdown(), loop=loop)
89+
future.result(timeout=10)
90+
except Exception as e:
91+
_logger.error("error during async server teardown: %s", e)
92+
finally:
93+
loop.call_soon_threadsafe(loop.stop)
94+
895

996
def collect_responses(method):
1097
"""Collect all responses from a grpc_testing stream method until exhausted.

0 commit comments

Comments
 (0)