@@ -295,6 +295,70 @@ def test_max_threads(self):
295295 self .assertEqual (server .max_threads , 4 )
296296
297297
298+ def test_start_shutdown_handler_without_callback (self ):
299+ """Test that _shutdown_handler logs and works when no shutdown_callback is set."""
300+ from unittest .mock import patch , MagicMock
301+
302+ server = ReduceStreamAsyncServer (reduce_stream_instance = ExampleClass )
303+ self .assertIsNone (server .shutdown_callback )
304+
305+ def close_coro (coro , ** kwargs ):
306+ coro .close ()
307+
308+ with patch ("pynumaflow.reducestreamer.async_server.aiorun" ) as mock_aiorun :
309+ mock_aiorun .run .side_effect = close_coro
310+ server .start ()
311+
312+ # Extract the shutdown_callback passed to aiorun.run
313+ call_kwargs = mock_aiorun .run .call_args [1 ]
314+ shutdown_handler = call_kwargs ["shutdown_callback" ]
315+
316+ # Invoke the handler — should not raise even without a callback
317+ mock_loop = MagicMock ()
318+ shutdown_handler (mock_loop )
319+
320+ def test_start_shutdown_handler_with_callback (self ):
321+ """Test that _shutdown_handler invokes the user-provided shutdown_callback."""
322+ from unittest .mock import patch , MagicMock
323+
324+ user_callback = MagicMock ()
325+ server = ReduceStreamAsyncServer (
326+ reduce_stream_instance = ExampleClass , shutdown_callback = user_callback
327+ )
328+
329+ def close_coro (coro , ** kwargs ):
330+ coro .close ()
331+
332+ with patch ("pynumaflow.reducestreamer.async_server.aiorun" ) as mock_aiorun :
333+ mock_aiorun .run .side_effect = close_coro
334+ server .start ()
335+
336+ shutdown_handler = mock_aiorun .run .call_args [1 ]["shutdown_callback" ]
337+ mock_loop = MagicMock ()
338+ shutdown_handler (mock_loop )
339+
340+ user_callback .assert_called_once_with (mock_loop )
341+
342+ def test_start_exits_on_error (self ):
343+ """Test that start() calls sys.exit(1) when servicer reports an error."""
344+ from unittest .mock import patch
345+
346+ server = ReduceStreamAsyncServer (reduce_stream_instance = ExampleClass )
347+
348+ def fake_aiorun_run (coro , ** kwargs ):
349+ # Simulate aiorun completing after a UDF error was recorded
350+ coro .close () # prevent "coroutine never awaited" warning
351+ server ._error = RuntimeError ("UDF failure" )
352+
353+ with patch ("pynumaflow.reducestreamer.async_server.aiorun" ) as mock_aiorun , patch (
354+ "pynumaflow.reducestreamer.async_server.sys"
355+ ) as mock_sys :
356+ mock_aiorun .run .side_effect = fake_aiorun_run
357+ server .start ()
358+
359+ mock_sys .exit .assert_called_once_with (1 )
360+
361+
298362if __name__ == "__main__" :
299363 logging .basicConfig (level = logging .DEBUG )
300364 unittest .main ()
0 commit comments