-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathtest_bidi_async.py
More file actions
316 lines (234 loc) · 9.51 KB
/
test_bidi_async.py
File metadata and controls
316 lines (234 loc) · 9.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# Copyright 2025, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import asyncio
from unittest import mock
try:
from unittest.mock import AsyncMock
except ImportError: # pragma: NO COVER
from mock import AsyncMock # type: ignore
import pytest
try:
from grpc import aio
except ImportError: # pragma: NO COVER
pytest.skip("No GRPC", allow_module_level=True)
from google.api_core import bidi_async
from google.api_core import exceptions
# TODO: remove this when droppping support for "Python 3.10" and below.
if sys.version_info <= (3, 10): # type: ignore[operator]
def aiter(obj):
return obj.__aiter__()
async def anext(obj):
return await obj.__anext__()
@pytest.mark.asyncio
class Test_AsyncRequestQueueGenerator:
async def test_bounded_consume(self):
call = mock.create_autospec(aio.Call, instance=True)
call.done.return_value = False
q = asyncio.Queue()
await q.put(mock.sentinel.A)
await q.put(mock.sentinel.B)
generator = bidi_async._AsyncRequestQueueGenerator(q)
generator.call = call
items = []
gen_aiter = aiter(generator)
items.append(await anext(gen_aiter))
items.append(await anext(gen_aiter))
# At this point, the queue is empty. The next call to anext will sleep.
# We make the call inactive.
call.done.return_value = True
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(anext(gen_aiter), timeout=0.01)
assert items == [mock.sentinel.A, mock.sentinel.B]
async def test_yield_initial_and_exit(self):
q = asyncio.Queue()
call = mock.create_autospec(aio.Call, instance=True)
call.done.return_value = True
generator = bidi_async._AsyncRequestQueueGenerator(
q, initial_request=mock.sentinel.A
)
generator.call = call
assert await anext(aiter(generator)) == mock.sentinel.A
async def test_yield_initial_callable_and_exit(self):
q = asyncio.Queue()
call = mock.create_autospec(aio.Call, instance=True)
call.done.return_value = True
generator = bidi_async._AsyncRequestQueueGenerator(
q, initial_request=lambda: mock.sentinel.A
)
generator.call = call
assert await anext(aiter(generator)) == mock.sentinel.A
async def test_exit_when_inactive_with_item(self):
q = asyncio.Queue()
await q.put(mock.sentinel.A)
call = mock.create_autospec(aio.Call, instance=True)
call.done.return_value = True
generator = bidi_async._AsyncRequestQueueGenerator(q)
generator.call = call
with pytest.raises(
StopAsyncIteration,
):
assert await anext(aiter(generator))
# Make sure it put the item back.
assert not q.empty()
assert await q.get() == mock.sentinel.A
async def test_exit_when_inactive_empty(self):
q = asyncio.Queue()
call = mock.create_autospec(aio.Call, instance=True)
call.done.return_value = True
generator = bidi_async._AsyncRequestQueueGenerator(q)
generator.call = call
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(anext(aiter(generator)), timeout=0.01)
async def test_exit_with_stop(self):
q = asyncio.Queue()
await q.put(None)
call = mock.create_autospec(aio.Call, instance=True)
call.done.return_value = False
generator = bidi_async._AsyncRequestQueueGenerator(q)
generator.call = call
with pytest.raises(StopAsyncIteration):
assert await anext(aiter(generator))
def make_async_rpc():
"""Makes a mock async RPC used to test Bidi classes."""
call = mock.create_autospec(aio.StreamStreamCall, instance=True)
rpc = AsyncMock()
def rpc_side_effect(request, metadata=None):
call.done.return_value = False
return call
rpc.side_effect = rpc_side_effect
def cancel_side_effect():
call.done.return_value = True
return True
call.cancel.side_effect = cancel_side_effect
call.read = AsyncMock()
return rpc, call
class AsyncClosedCall:
def __init__(self, exception):
self.exception = exception
def done(self):
return True
async def read(self):
raise self.exception
class TestAsyncBidiRpc:
def test_initial_state(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
assert bidi_rpc.is_active is False
def test_done_callbacks(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
callback = mock.Mock(spec=["__call__"])
bidi_rpc.add_done_callback(callback)
bidi_rpc._on_call_done(mock.sentinel.future)
callback.assert_called_once_with(mock.sentinel.future)
@pytest.mark.asyncio
async def test_metadata(self):
rpc, call = make_async_rpc()
bidi_rpc = bidi_async.AsyncBidiRpc(rpc, metadata=mock.sentinel.A)
assert bidi_rpc._rpc_metadata == mock.sentinel.A
await bidi_rpc.open()
assert bidi_rpc.call == call
rpc.assert_awaited_once()
assert rpc.call_args.kwargs["metadata"] == mock.sentinel.A
@pytest.mark.asyncio
async def test_open(self):
rpc, call = make_async_rpc()
bidi_rpc = bidi_async.AsyncBidiRpc(rpc)
await bidi_rpc.open()
assert bidi_rpc.call == call
assert bidi_rpc.is_active
call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done)
@pytest.mark.asyncio
async def test_open_error_already_open(self):
rpc, _ = make_async_rpc()
bidi_rpc = bidi_async.AsyncBidiRpc(rpc)
await bidi_rpc.open()
with pytest.raises(ValueError):
await bidi_rpc.open()
@pytest.mark.asyncio
async def test_open_error_call_error(self):
rpc, _ = make_async_rpc()
expected_exception = exceptions.GoogleAPICallError(
"test", response=mock.sentinel.response
)
rpc.side_effect = expected_exception
bidi_rpc = bidi_async.AsyncBidiRpc(rpc)
callback = mock.Mock(spec=["__call__"])
bidi_rpc.add_done_callback(callback)
with pytest.raises(exceptions.GoogleAPICallError) as exc_info:
await bidi_rpc.open()
assert exc_info.value == expected_exception
callback.assert_called_once_with(mock.sentinel.response)
@pytest.mark.asyncio
async def test_close(self):
rpc, call = make_async_rpc()
bidi_rpc = bidi_async.AsyncBidiRpc(rpc)
await bidi_rpc.open()
await bidi_rpc.close()
call.cancel.assert_called_once()
assert bidi_rpc.call is call
assert bidi_rpc.is_active is False
# ensure the request queue was signaled to stop.
assert bidi_rpc.pending_requests == 1
assert await bidi_rpc._request_queue.get() is None
# ensure request and callbacks are cleaned up
assert bidi_rpc._initial_request is None
assert not bidi_rpc._callbacks
@pytest.mark.asyncio
async def test_close_with_no_rpc(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
await bidi_rpc.close()
assert bidi_rpc.call is None
assert bidi_rpc.is_active is False
# ensure the request queue was signaled to stop.
assert bidi_rpc.pending_requests == 1
assert await bidi_rpc._request_queue.get() is None
# ensure request and callbacks are cleaned up
assert bidi_rpc._initial_request is None
assert not bidi_rpc._callbacks
@pytest.mark.asyncio
async def test_close_no_rpc(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
await bidi_rpc.close()
@pytest.mark.asyncio
async def test_send(self):
rpc, call = make_async_rpc()
bidi_rpc = bidi_async.AsyncBidiRpc(rpc)
await bidi_rpc.open()
await bidi_rpc.send(mock.sentinel.request)
assert bidi_rpc.pending_requests == 1
assert await bidi_rpc._request_queue.get() is mock.sentinel.request
@pytest.mark.asyncio
async def test_send_not_open(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
with pytest.raises(ValueError):
await bidi_rpc.send(mock.sentinel.request)
@pytest.mark.asyncio
async def test_send_dead_rpc(self):
error = ValueError()
bidi_rpc = bidi_async.AsyncBidiRpc(None)
bidi_rpc.call = AsyncClosedCall(error)
with pytest.raises(ValueError):
await bidi_rpc.send(mock.sentinel.request)
@pytest.mark.asyncio
async def test_recv(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
bidi_rpc.call = mock.create_autospec(aio.Call, instance=True)
bidi_rpc.call.read = AsyncMock(return_value=mock.sentinel.response)
response = await bidi_rpc.recv()
assert response == mock.sentinel.response
@pytest.mark.asyncio
async def test_recv_not_open(self):
bidi_rpc = bidi_async.AsyncBidiRpc(None)
with pytest.raises(ValueError):
await bidi_rpc.recv()