Skip to content

Commit 8fb1cf7

Browse files
committed
tests for error scenario
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 13918bc commit 8fb1cf7

1 file changed

Lines changed: 67 additions & 0 deletions

File tree

packages/pynumaflow/tests/reducestreamer/test_async_reduce_err.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
)
1919
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
2020
from pynumaflow.reducestreamer.servicer.async_servicer import AsyncReduceStreamServicer
21+
from pynumaflow.reducestreamer.servicer.task_manager import TaskManager
2122
from pynumaflow.shared.asynciter import NonBlockingIterator
2223
from tests.testing_utils import (
2324
mock_message,
@@ -279,6 +280,72 @@ async def requests():
279280
ctx.set_code.assert_called_once_with(grpc.StatusCode.INTERNAL)
280281

281282

283+
_original_process_input_stream = TaskManager.process_input_stream
284+
285+
286+
def test_cancelled_error_awaiting_producer():
287+
"""CancelledError from the producer task after it finishes its real work."""
288+
servicer = AsyncReduceStreamServicer(_emit_one_handler)
289+
shutdown_event = asyncio.Event()
290+
servicer.set_shutdown_event(shutdown_event)
291+
request, _ = start_request(multiple_window=False)
292+
293+
async def raise_after_real_work(self, request_iterator):
294+
await _original_process_input_stream(self, request_iterator)
295+
raise asyncio.CancelledError()
296+
297+
TaskManager.process_input_stream = raise_after_real_work
298+
try:
299+
300+
async def _run():
301+
async def requests():
302+
yield request
303+
304+
gen = servicer.ReduceFn(requests(), MagicMock())
305+
async for _ in gen:
306+
pass
307+
308+
asyncio.run(_run())
309+
finally:
310+
TaskManager.process_input_stream = _original_process_input_stream
311+
312+
assert shutdown_event.is_set()
313+
assert servicer._error is None
314+
315+
316+
def test_base_exception_awaiting_producer():
317+
"""BaseException from the producer task after it finishes its real work."""
318+
servicer = AsyncReduceStreamServicer(_emit_one_handler)
319+
shutdown_event = asyncio.Event()
320+
servicer.set_shutdown_event(shutdown_event)
321+
request, _ = start_request(multiple_window=False)
322+
323+
async def raise_after_real_work(self, request_iterator):
324+
await _original_process_input_stream(self, request_iterator)
325+
raise RuntimeError("producer boom")
326+
327+
TaskManager.process_input_stream = raise_after_real_work
328+
try:
329+
330+
async def _run():
331+
async def requests():
332+
yield request
333+
334+
ctx = MagicMock()
335+
gen = servicer.ReduceFn(requests(), ctx)
336+
async for _ in gen:
337+
pass
338+
return ctx
339+
340+
ctx = asyncio.run(_run())
341+
finally:
342+
TaskManager.process_input_stream = _original_process_input_stream
343+
344+
assert shutdown_event.is_set()
345+
assert isinstance(servicer._error, RuntimeError)
346+
ctx.set_code.assert_called_once_with(grpc.StatusCode.INTERNAL)
347+
348+
282349
if __name__ == "__main__":
283350
logging.basicConfig(level=logging.DEBUG)
284351
unittest.main()

0 commit comments

Comments
 (0)