Skip to content

Commit adfad47

Browse files
committed
conftest.py with shared fixtures
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent e1aa90f commit adfad47

10 files changed

Lines changed: 121 additions & 261 deletions
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""
2+
Root conftest.py — shared pytest fixtures and helpers for all test modules.
3+
4+
Provides helpers for common gRPC testing patterns that are duplicated across
5+
sync, multiproc, and async test files.
6+
"""
7+
8+
9+
def collect_responses(method):
10+
"""Collect all responses from a grpc_testing stream method until exhausted.
11+
12+
Replaces the repeated pattern:
13+
responses = []
14+
while True:
15+
try:
16+
resp = method.take_response()
17+
responses.append(resp)
18+
except ValueError as err:
19+
if "No more responses!" in err.__str__():
20+
break
21+
22+
Returns a list of response protos.
23+
"""
24+
responses = []
25+
while True:
26+
try:
27+
resp = method.take_response()
28+
responses.append(resp)
29+
except ValueError as err:
30+
if "No more responses!" in str(err):
31+
break
32+
return responses
33+
34+
35+
def drain_responses(method):
36+
"""Drain all responses from a grpc_testing stream method, discarding them.
37+
38+
Replaces the repeated pattern:
39+
while True:
40+
try:
41+
method.take_response()
42+
except ValueError:
43+
break
44+
45+
Useful in shutdown tests where we only care about termination status.
46+
"""
47+
while True:
48+
try:
49+
method.take_response()
50+
except ValueError:
51+
break
52+
53+
54+
def send_test_requests(method, datums):
55+
"""Send a list of test datums to a grpc_testing stream method and close.
56+
57+
Replaces the repeated pattern:
58+
for d in test_datums:
59+
method.send_request(d)
60+
method.requests_closed()
61+
"""
62+
for d in datums:
63+
method.send_request(d)
64+
method.requests_closed()

packages/pynumaflow/tests/map/test_multiproc_map_shutdown.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from pynumaflow.mapper import MapMultiprocServer
1616
from pynumaflow.proto.mapper import map_pb2
17+
from tests.conftest import drain_responses, send_test_requests
1718
from tests.map.utils import map_handler, err_map_handler, get_test_datums
1819

1920

@@ -33,15 +34,8 @@ def test_shutdown_event_set_on_handler_error():
3334
timeout=2,
3435
)
3536

36-
for d in test_datums:
37-
method.send_request(d)
38-
method.requests_closed()
39-
40-
while True:
41-
try:
42-
method.take_response()
43-
except ValueError:
44-
break
37+
send_test_requests(method, test_datums)
38+
drain_responses(method)
4539

4640
_, code, _ = method.termination()
4741
assert code == StatusCode.INTERNAL
@@ -65,15 +59,8 @@ def test_shutdown_event_set_on_handshake_error():
6559
timeout=1,
6660
)
6761

68-
for d in test_datums:
69-
method.send_request(d)
70-
method.requests_closed()
71-
72-
while True:
73-
try:
74-
method.take_response()
75-
except ValueError:
76-
break
62+
send_test_requests(method, test_datums)
63+
drain_responses(method)
7764

7865
_, code, details = method.termination()
7966
assert code == StatusCode.INTERNAL

packages/pynumaflow/tests/map/test_multiproc_mapper.py

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from pynumaflow.mapper import MapMultiprocServer
1111
from pynumaflow.proto.mapper import map_pb2
1212
from tests.map.utils import map_handler, err_map_handler, get_test_datums
13+
from tests.conftest import collect_responses, drain_responses, send_test_requests
1314
from tests.testing_utils import (
1415
mock_terminate_on_stop,
1516
)
@@ -50,18 +51,8 @@ def test_udf_map_err_handshake(self):
5051
invocation_metadata={},
5152
timeout=1,
5253
)
53-
for x in test_datums:
54-
method.send_request(x)
55-
method.requests_closed()
56-
57-
responses = []
58-
while True:
59-
try:
60-
resp = method.take_response()
61-
responses.append(resp)
62-
except ValueError as err:
63-
if "No more responses!" in err.__str__():
64-
break
54+
send_test_requests(method, test_datums)
55+
drain_responses(method)
6556

6657
metadata, code, details = method.termination()
6758
self.assertTrue("MapFn: expected handshake as the first message" in details)
@@ -77,18 +68,8 @@ def test_udf_map_err(self):
7768
invocation_metadata={},
7869
timeout=1,
7970
)
80-
for x in test_datums:
81-
method.send_request(x)
82-
method.requests_closed()
83-
84-
responses = []
85-
while True:
86-
try:
87-
resp = method.take_response()
88-
responses.append(resp)
89-
except ValueError as err:
90-
if "No more responses!" in err.__str__():
91-
break
71+
send_test_requests(method, test_datums)
72+
drain_responses(method)
9273

9374
metadata, code, details = method.termination()
9475
self.assertTrue("Something is fishy!" in details)
@@ -116,18 +97,8 @@ def test_map_forward_message(self):
11697
invocation_metadata={},
11798
timeout=1,
11899
)
119-
for x in test_datums:
120-
method.send_request(x)
121-
method.requests_closed()
122-
123-
responses = []
124-
while True:
125-
try:
126-
resp = method.take_response()
127-
responses.append(resp)
128-
except ValueError as err:
129-
if "No more responses!" in err.__str__():
130-
break
100+
send_test_requests(method, test_datums)
101+
responses = collect_responses(method)
131102

132103
metadata, code, details = method.termination()
133104

packages/pynumaflow/tests/map/test_sync_map_shutdown.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from pynumaflow.mapper._servicer._sync_servicer import SyncMapServicer
1717
from pynumaflow.proto.mapper import map_pb2
18+
from tests.conftest import drain_responses, send_test_requests
1819
from tests.map.utils import map_handler, err_map_handler, get_test_datums
1920

2021

@@ -33,15 +34,8 @@ def test_shutdown_event_set_on_handler_error():
3334
timeout=2,
3435
)
3536

36-
for d in test_datums:
37-
method.send_request(d)
38-
method.requests_closed()
39-
40-
while True:
41-
try:
42-
method.take_response()
43-
except ValueError:
44-
break
37+
send_test_requests(method, test_datums)
38+
drain_responses(method)
4539

4640
_, code, _ = method.termination()
4741
assert code == StatusCode.INTERNAL
@@ -65,15 +59,8 @@ def test_shutdown_event_set_on_handshake_error():
6559
timeout=1,
6660
)
6761

68-
for d in test_datums:
69-
method.send_request(d)
70-
method.requests_closed()
71-
72-
while True:
73-
try:
74-
method.take_response()
75-
except ValueError:
76-
break
62+
send_test_requests(method, test_datums)
63+
drain_responses(method)
7764

7865
_, code, details = method.termination()
7966
assert code == StatusCode.INTERNAL

packages/pynumaflow/tests/map/test_sync_mapper.py

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pynumaflow.mapper import MapServer
1010
from pynumaflow.proto.mapper import map_pb2
1111
from tests.map.utils import map_handler, err_map_handler, ExampleMap, get_test_datums
12+
from tests.conftest import collect_responses, drain_responses, send_test_requests
1213
from tests.testing_utils import (
1314
mock_terminate_on_stop,
1415
)
@@ -44,18 +45,8 @@ def test_udf_map_err_handshake(self):
4445
invocation_metadata={},
4546
timeout=1,
4647
)
47-
for x in test_datums:
48-
method.send_request(x)
49-
method.requests_closed()
50-
51-
responses = []
52-
while True:
53-
try:
54-
resp = method.take_response()
55-
responses.append(resp)
56-
except ValueError as err:
57-
if "No more responses!" in err.__str__():
58-
break
48+
send_test_requests(method, test_datums)
49+
drain_responses(method)
5950

6051
metadata, code, details = method.termination()
6152
self.assertTrue("MapFn: expected handshake as the first message" in details)
@@ -72,18 +63,8 @@ def test_udf_map_error_response(self):
7263
invocation_metadata={},
7364
timeout=1,
7465
)
75-
for x in test_datums:
76-
method.send_request(x)
77-
method.requests_closed()
78-
79-
responses = []
80-
while True:
81-
try:
82-
resp = method.take_response()
83-
responses.append(resp)
84-
except ValueError as err:
85-
if "No more responses!" in err.__str__():
86-
break
66+
send_test_requests(method, test_datums)
67+
drain_responses(method)
8768

8869
metadata, code, details = method.termination()
8970
self.assertTrue("Something is fishy!" in details)
@@ -111,18 +92,8 @@ def test_map_forward_message(self):
11192
invocation_metadata={},
11293
timeout=1,
11394
)
114-
for x in test_datums:
115-
method.send_request(x)
116-
method.requests_closed()
117-
118-
responses = []
119-
while True:
120-
try:
121-
resp = method.take_response()
122-
responses.append(resp)
123-
except ValueError as err:
124-
if "No more responses!" in err.__str__():
125-
break
95+
send_test_requests(method, test_datums)
96+
responses = collect_responses(method)
12697

12798
metadata, code, details = method.termination()
12899
# 1 handshake + 3 data responses

packages/pynumaflow/tests/sink/test_server.py

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from pynumaflow.proto.sinker import sink_pb2
2323
from pynumaflow.sinker import Responses, Datum, Response, SinkServer, Message, UserMetadata
2424
from pynumaflow.sinker.servicer.sync_servicer import SyncSinkServicer
25+
from tests.conftest import collect_responses, drain_responses, send_test_requests
2526

2627

2728
def mockenv(**envvars):
@@ -201,18 +202,8 @@ def test_udsink_err(err_sink_test_server):
201202
timeout=1,
202203
)
203204

204-
for d in test_datums:
205-
method.send_request(d)
206-
method.requests_closed()
207-
208-
responses = []
209-
while True:
210-
try:
211-
resp = method.take_response()
212-
responses.append(resp)
213-
except ValueError as err:
214-
if "No more responses!" in str(err):
215-
break
205+
send_test_requests(method, test_datums)
206+
drain_responses(method)
216207

217208
metadata, code, details = method.termination()
218209
assert code == StatusCode.INTERNAL
@@ -267,18 +258,8 @@ def test_forward_message(sink_test_server):
267258
invocation_metadata={},
268259
timeout=1,
269260
)
270-
for x in test_datums:
271-
method.send_request(x)
272-
method.requests_closed()
273-
274-
responses = []
275-
while True:
276-
try:
277-
resp = method.take_response()
278-
responses.append(resp)
279-
except ValueError as err:
280-
if "No more responses!" in str(err):
281-
break
261+
send_test_requests(method, test_datums)
262+
responses = collect_responses(method)
282263

283264
# 1 handshake + 1 data messages + 1 EOT
284265
assert len(responses) == 3
@@ -370,15 +351,8 @@ def test_shutdown_event_set_on_handler_error():
370351
timeout=2,
371352
)
372353

373-
for d in test_datums:
374-
method.send_request(d)
375-
method.requests_closed()
376-
377-
while True:
378-
try:
379-
method.take_response()
380-
except ValueError:
381-
break
354+
send_test_requests(method, test_datums)
355+
drain_responses(method)
382356

383357
_, code, _ = method.termination()
384358
assert code == StatusCode.INTERNAL

0 commit comments

Comments
 (0)