Skip to content

Commit ae7c50e

Browse files
committed
more tests
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 9de02db commit ae7c50e

1 file changed

Lines changed: 41 additions & 0 deletions

File tree

packages/pynumaflow/tests/sink/test_server.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime, timezone
44
from unittest import mock
55

6+
import grpc
67
import pytest
78
from grpc import StatusCode
89
from grpc_testing import server_from_dictionary, strict_real_time
@@ -415,3 +416,43 @@ def test_shutdown_event_set_on_handshake_error():
415416
assert "SinkFn: expected handshake message" in details
416417
assert servicer.shutdown_event.is_set()
417418
assert servicer.error is not None
419+
420+
421+
def test_shutdown_event_set_on_stream_close_before_handshake():
422+
"""grpc.RpcError on the first read (before handshake): shutdown_event set, req_queue is None so close is skipped."""
423+
servicer = SyncSinkServicer(handler=udsink_handler)
424+
425+
def _cancelled_iter():
426+
raise grpc.RpcError()
427+
yield # make it a generator
428+
429+
responses = list(servicer.SinkFn(_cancelled_iter(), mock.MagicMock()))
430+
431+
assert responses == []
432+
assert servicer.shutdown_event.is_set()
433+
assert servicer.error is None
434+
435+
436+
def test_shutdown_event_set_on_stream_close_mid_batch():
437+
"""grpc.RpcError mid-batch: req_queue is closed (unblocking the handler thread) and shutdown_event is set."""
438+
servicer = SyncSinkServicer(handler=udsink_handler)
439+
event_time_timestamp, watermark_timestamp = _make_timestamps()
440+
441+
def _cancelled_iter():
442+
yield sink_pb2.SinkRequest(handshake=sink_pb2.Handshake(sot=True))
443+
yield sink_pb2.SinkRequest(
444+
request=sink_pb2.SinkRequest.Request(
445+
id="test_id_0",
446+
value=mock_message(),
447+
event_time=event_time_timestamp,
448+
watermark=watermark_timestamp,
449+
metadata=METADATA,
450+
)
451+
)
452+
raise grpc.RpcError()
453+
454+
responses = list(servicer.SinkFn(_cancelled_iter(), mock.MagicMock()))
455+
456+
assert responses[0].handshake.sot
457+
assert servicer.shutdown_event.is_set()
458+
assert servicer.error is None

0 commit comments

Comments
 (0)