@@ -353,30 +353,54 @@ async def _blocking_handler(keys, datums, output, md):
353353 await output .put (Message (b"done" , keys = keys ))
354354
355355
356+ def _make_reduce_request (operation_event ):
357+ """Create a ReduceRequest DTO (not raw protobuf) matching what datum_generator produces."""
358+ from pynumaflow .reducestreamer ._dtypes import ReduceRequest as ReduceRequestDTO
359+
360+ event_time_timestamp , watermark_timestamp = get_time_args ()
361+ window = reduce_pb2 .Window (
362+ start = mock_interval_window_start (),
363+ end = mock_interval_window_end (),
364+ slot = "slot-0" ,
365+ )
366+ payload = Datum (
367+ keys = ["test_key" ],
368+ value = mock_message (),
369+ event_time = event_time_timestamp .ToDatetime (),
370+ watermark = watermark_timestamp .ToDatetime (),
371+ )
372+ return ReduceRequestDTO (
373+ operation = operation_event ,
374+ windows = [window ],
375+ payload = payload ,
376+ )
377+
378+
356379def test_cancel_and_await_remaining_tasks_on_post_processing_error ():
357380 """
358381 When a BaseException occurs during post-processing (after the input stream
359382 is exhausted), the TaskManager should cancel and await all remaining task
360- futures to suppress 'never retrieved' warnings .
383+ futures that are still running .
361384 """
362385 from unittest .mock import patch
386+ from pynumaflow .reducestreamer ._dtypes import WindowOperation
363387
364388 tm = TaskManager (_blocking_handler )
365-
366- request , _ = start_request (multiple_window = False )
367- # Use OPEN so create_task is called
368- request .operation .event = reduce_pb2 .ReduceRequest .WindowOperation .Event .OPEN
389+ req = _make_reduce_request (int (WindowOperation .OPEN ))
369390
370391 async def _run ():
371392 async def requests ():
372- yield request
393+ yield req
373394
374395 # Patch stream_send_eof to raise after the task is created but before
375396 # it completes, so the task futures are still running when the except
376397 # block executes.
377398 with patch .object (tm , "stream_send_eof" , side_effect = RuntimeError ("send_eof boom" )):
378399 await tm .process_input_stream (requests ())
379400
401+ # Verify tasks were actually created
402+ assert len (tm .get_tasks ()) > 0 , "tasks should have been created"
403+
380404 # After process_input_stream returns, verify the error was placed in
381405 # the global result queue.
382406 reader = tm .global_result_queue .read_iterator ()
@@ -395,38 +419,41 @@ async def requests():
395419def test_cancel_and_await_with_already_done_futures ():
396420 """
397421 When post-processing fails but some futures are already done,
398- the cleanup code should handle them gracefully (skip cancellation ).
422+ the cleanup code should skip cancellation for those (fut.done() is True ).
399423 """
400424 from unittest .mock import patch
425+ from pynumaflow .reducestreamer ._dtypes import WindowOperation
426+ from pynumaflow ._constants import STREAM_EOF
401427
402428 async def _fast_handler (keys , datums , output , md ):
403429 """Handler that finishes immediately without reading datums."""
404430 await output .put (Message (b"fast" , keys = keys ))
405431
406432 tm = TaskManager (_fast_handler )
407- request , _ = start_request (multiple_window = False )
408- request .operation .event = reduce_pb2 .ReduceRequest .WindowOperation .Event .OPEN
433+ req = _make_reduce_request (int (WindowOperation .OPEN ))
409434
410435 async def _run ():
411436 async def requests ():
412- yield request
437+ yield req
413438
414- # Let the real stream_send_eof run (which sends EOF to the handler),
415- # then patch get_unique_windows to raise after all tasks complete.
416439 original_send_eof = tm .stream_send_eof
417440
418441 async def send_eof_then_wait_and_raise ():
442+ # Let the real stream_send_eof run (sends EOF to handler input)
419443 await original_send_eof ()
420- # Wait for the task futures to finish
444+ # Wait for all task futures to complete so they are .done()
421445 for task in tm .get_tasks ():
422446 await task .future
423- await task .result_queue .put ("__STREAM_EOF__" )
447+ await task .result_queue .put (STREAM_EOF )
424448 await task .consumer_future
425449 raise RuntimeError ("late post-processing error" )
426450
427451 with patch .object (tm , "stream_send_eof" , side_effect = send_eof_then_wait_and_raise ):
428452 await tm .process_input_stream (requests ())
429453
454+ # Verify tasks were actually created
455+ assert len (tm .get_tasks ()) > 0 , "tasks should have been created"
456+
430457 # Verify cleanup completed without issues
431458 for task in tm .get_tasks ():
432459 assert task .future .done ()
0 commit comments