|
| 1 | +import threading |
| 2 | +from unittest.mock import MagicMock |
| 3 | + |
1 | 4 | import grpc |
2 | 5 | import pytest |
3 | 6 | from google.protobuf import empty_pb2 as _empty_pb2 |
4 | 7 | from grpc import StatusCode |
5 | 8 | from grpc_testing import server_from_dictionary, strict_real_time |
6 | 9 |
|
7 | 10 | from pynumaflow.mapper import MapServer |
| 11 | +from pynumaflow.mapper._servicer._sync_servicer import SyncMapServicer |
8 | 12 | from pynumaflow.proto.mapper import map_pb2 |
9 | 13 | from tests.map.utils import map_handler, err_map_handler, ExampleMap, get_test_datums |
10 | 14 |
|
@@ -121,3 +125,36 @@ def test_max_threads(self): |
121 | 125 | # defaults to 4 |
122 | 126 | server = MapServer(mapper_instance=map_handler) |
123 | 127 | assert server.max_threads == 4 |
| 128 | + |
| 129 | + def test_rpc_error_before_handshake(self): |
| 130 | + """RpcError on the very first read triggers the except grpc.RpcError in MapFn.""" |
| 131 | + |
| 132 | + def failing_iterator(): |
| 133 | + raise grpc.RpcError() |
| 134 | + yield # make it a generator |
| 135 | + |
| 136 | + servicer = SyncMapServicer(handler=map_handler) |
| 137 | + context = MagicMock() |
| 138 | + |
| 139 | + responses = list(servicer.MapFn(failing_iterator(), context)) |
| 140 | + |
| 141 | + assert responses == [] |
| 142 | + assert servicer.shutdown_event.is_set() |
| 143 | + |
| 144 | + def test_rpc_error_mid_stream(self): |
| 145 | + """RpcError while reading requests triggers the except grpc.RpcError |
| 146 | + in _process_requests.""" |
| 147 | + |
| 148 | + def interrupted_iterator(): |
| 149 | + yield map_pb2.MapRequest(handshake=map_pb2.Handshake(sot=True)) |
| 150 | + raise grpc.RpcError() |
| 151 | + |
| 152 | + servicer = SyncMapServicer(handler=map_handler) |
| 153 | + context = MagicMock() |
| 154 | + |
| 155 | + responses = list(servicer.MapFn(interrupted_iterator(), context)) |
| 156 | + |
| 157 | + # Should get the handshake response and then stop cleanly |
| 158 | + assert len(responses) == 1 |
| 159 | + assert responses[0].handshake.sot |
| 160 | + assert servicer.shutdown_event.is_set() |
0 commit comments