-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy pathtest_client_async.py
More file actions
391 lines (300 loc) · 13.3 KB
/
Copy pathtest_client_async.py
File metadata and controls
391 lines (300 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
import selectors
import socket
import time
import pytest
from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.cluster import ClusterMetadata
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.new.metadata import MetadataRequest
from kafka.protocol.new.producer import ProduceRequest
from kafka.structs import BrokerMetadata
@pytest.fixture
def client_poll_mocked(mocker):
cli = KafkaClient(request_timeout_ms=9999999,
reconnect_backoff_ms=2222,
connections_max_idle_ms=float('inf'),
api_version=(0, 9))
mocker.patch.object(cli, '_poll')
ttl = mocker.patch.object(cli.cluster, 'ttl')
ttl.return_value = 0
try:
yield cli
finally:
cli._close()
@pytest.fixture
def client_selector_mocked(mocker, conn):
client = KafkaClient(api_version=(0, 9))
mocker.patch.object(client, '_selector')
client.poll(future=client.cluster.request_update())
try:
yield client
finally:
client._close()
def test_bootstrap(mocker, conn):
conn.state = ConnectionStates.CONNECTED
cli = KafkaClient(api_version=(2, 1))
mocker.patch.object(cli, '_selector')
future = cli.cluster.request_update()
cli.poll(future=future)
assert future.succeeded()
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config
conn.send.assert_called_once_with(MetadataRequest[7]([], True), blocking=False, request_timeout_ms=None)
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
BrokerMetadata(1, 'bar', 34, None)])
def test_can_connect(client_selector_mocked, conn):
# Node is not in broker metadata - can't connect
assert not client_selector_mocked._can_connect(2)
# Node is in broker metadata but not in _conns
assert 0 not in client_selector_mocked._conns
assert client_selector_mocked._can_connect(0)
# Node is connected, can't reconnect
assert client_selector_mocked._init_connect(0) is True
assert not client_selector_mocked._can_connect(0)
# Node is disconnected, can connect
client_selector_mocked._conns[0].state = ConnectionStates.DISCONNECTED
assert client_selector_mocked._can_connect(0)
# Node is disconnected, but blacked out
conn.blacked_out.return_value = True
assert not client_selector_mocked._can_connect(0)
def test_init_connect(client_selector_mocked, conn):
# Node not in metadata, return False
assert not client_selector_mocked._init_connect(2)
# New node_id creates a conn object
assert 0 not in client_selector_mocked._conns
conn.state = ConnectionStates.DISCONNECTED
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
assert client_selector_mocked._init_connect(0) is True
assert client_selector_mocked._conns[0] is conn
def test_conn_state_change(client_selector_mocked, conn):
sel = client_selector_mocked._selector
node_id = 0
client_selector_mocked._conns[node_id] = conn
conn.state = ConnectionStates.CONNECTING
sock = conn._sock
client_selector_mocked._conn_state_change(node_id, sock, conn)
assert node_id in client_selector_mocked._connecting
sel.register.assert_called_with(sock, selectors.EVENT_WRITE, conn)
conn.state = ConnectionStates.CONNECTED
client_selector_mocked._conn_state_change(node_id, sock, conn)
assert node_id not in client_selector_mocked._connecting
sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn)
# Failure to connect should trigger metadata update
assert client_selector_mocked.cluster._need_update is False
conn.state = ConnectionStates.DISCONNECTED
client_selector_mocked._conn_state_change(node_id, sock, conn)
assert node_id not in client_selector_mocked._connecting
assert client_selector_mocked.cluster._need_update is True
sel.unregister.assert_called_with(sock)
conn.state = ConnectionStates.CONNECTING
client_selector_mocked._conn_state_change(node_id, sock, conn)
assert node_id in client_selector_mocked._connecting
conn.state = ConnectionStates.DISCONNECTED
client_selector_mocked._conn_state_change(node_id, sock, conn)
assert node_id not in client_selector_mocked._connecting
def test_ready(mocker, client_selector_mocked, conn):
maybe_connect = mocker.patch.object(client_selector_mocked, 'maybe_connect')
node_id = 1
client_selector_mocked.ready(node_id)
maybe_connect.assert_called_with(node_id)
def test_is_ready(client_selector_mocked, conn):
client_selector_mocked._init_connect(0)
client_selector_mocked._init_connect(1)
# metadata refresh blocks ready nodes
assert client_selector_mocked.is_ready(0)
assert client_selector_mocked.is_ready(1)
client_selector_mocked._metadata_refresh_in_progress = True
assert not client_selector_mocked.is_ready(0)
assert not client_selector_mocked.is_ready(1)
# requesting metadata update also blocks ready nodes
client_selector_mocked._metadata_refresh_in_progress = False
assert client_selector_mocked.is_ready(0)
assert client_selector_mocked.is_ready(1)
client_selector_mocked.cluster.request_update()
client_selector_mocked.cluster.config['retry_backoff_ms'] = 0
assert not client_selector_mocked._metadata_refresh_in_progress
assert not client_selector_mocked.is_ready(0)
assert not client_selector_mocked.is_ready(1)
client_selector_mocked.cluster._need_update = False
# if connection can't send more, not ready
assert client_selector_mocked.is_ready(0)
conn.can_send_more.return_value = False
assert not client_selector_mocked.is_ready(0)
conn.can_send_more.return_value = True
# disconnected nodes, not ready
assert client_selector_mocked.is_ready(0)
conn.state = ConnectionStates.DISCONNECTED
assert not client_selector_mocked.is_ready(0)
def test_close(client_selector_mocked, conn):
call_count = conn.close.call_count
# Unknown node - silent
client_selector_mocked.close(2)
call_count += 0
assert conn.close.call_count == call_count
# Single node close
client_selector_mocked._init_connect(0)
assert conn.close.call_count == call_count
client_selector_mocked.close(0)
call_count += 1
assert conn.close.call_count == call_count
# All node close
client_selector_mocked._init_connect(1)
client_selector_mocked.close()
# +2 close: node 1, node bootstrap (node 0 already closed)
call_count += 2
assert conn.close.call_count == call_count
def test_is_disconnected(client_selector_mocked, conn):
# False if not connected yet
conn.state = ConnectionStates.DISCONNECTED
assert not client_selector_mocked.is_disconnected(0)
client_selector_mocked._init_connect(0)
assert client_selector_mocked.is_disconnected(0)
conn.state = ConnectionStates.CONNECTING
assert not client_selector_mocked.is_disconnected(0)
conn.state = ConnectionStates.CONNECTED
assert not client_selector_mocked.is_disconnected(0)
def test_send(client_selector_mocked, conn):
# Send to unknown node => raises AssertionError
try:
client_selector_mocked.send(2, None)
assert False, 'Exception not raised'
except AssertionError:
pass
# Send to disconnected node => NodeNotReady
conn.state = ConnectionStates.DISCONNECTED
f = client_selector_mocked.send(0, None)
assert f.failed()
assert isinstance(f.exception, Errors.NodeNotReadyError)
conn.state = ConnectionStates.CONNECTED
client_selector_mocked._init_connect(0)
# ProduceRequest w/ 0 required_acks -> no response
request = ProduceRequest[0](0, 0, [])
assert request.expect_response() is False
ret = client_selector_mocked.send(0, request)
conn.send.assert_called_with(request, blocking=False, request_timeout_ms=None)
assert isinstance(ret, Future)
request = MetadataRequest[0]([])
client_selector_mocked.send(0, request)
conn.send.assert_called_with(request, blocking=False, request_timeout_ms=None)
def test_poll(mocker, client_poll_mocked):
metadata = mocker.patch.object(client_poll_mocked, '_maybe_refresh_metadata')
ifr_request_timeout = mocker.patch.object(client_poll_mocked, '_next_ifr_request_timeout_ms')
now = time.monotonic()
t = mocker.patch('time.monotonic')
t.return_value = now
# metadata timeout wins
ifr_request_timeout.return_value = float('inf')
metadata.return_value = 1000
client_poll_mocked.poll()
client_poll_mocked._poll.assert_called_with(1.0)
# user timeout wins
client_poll_mocked.poll(timeout_ms=250)
client_poll_mocked._poll.assert_called_with(0.25)
# ifr request timeout wins
ifr_request_timeout.return_value = 30000
metadata.return_value = 1000000
client_poll_mocked.poll()
client_poll_mocked._poll.assert_called_with(30.0)
def test__poll():
pass
def test_in_flight_request_count():
pass
def test_least_loaded_node():
pass
def test_set_topics(mocker):
request_update = mocker.patch.object(ClusterMetadata, 'request_update')
request_update.side_effect = lambda: Future()
cli = KafkaClient(api_version=(0, 10, 0))
# replace 'empty' with 'non empty'
request_update.reset_mock()
fut = cli.set_topics(['t1', 't2'])
assert not fut.is_done
request_update.assert_called_with()
# replace 'non empty' with 'same'
request_update.reset_mock()
fut = cli.set_topics(['t1', 't2'])
assert fut.is_done
assert fut.value == set(['t1', 't2'])
request_update.assert_not_called()
# replace 'non empty' with 'empty'
request_update.reset_mock()
fut = cli.set_topics([])
assert fut.is_done
assert fut.value == set()
request_update.assert_not_called()
def test_maybe_refresh_metadata_ttl(client_poll_mocked):
client_poll_mocked.cluster.ttl.return_value = 1234
client_poll_mocked.poll(timeout_ms=12345678)
client_poll_mocked._poll.assert_called_with(1.234)
def test_maybe_refresh_metadata_backoff(mocker, client_poll_mocked):
mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value=None)
mocker.patch.object(client_poll_mocked, 'least_loaded_node_refresh_ms', return_value=4321)
now = time.monotonic()
t = mocker.patch('time.monotonic')
t.return_value = now
client_poll_mocked.poll(timeout_ms=12345678)
client_poll_mocked._poll.assert_called_with(4.321)
def test_maybe_refresh_metadata_in_progress(client_poll_mocked):
client_poll_mocked._metadata_refresh_in_progress = True
client_poll_mocked.poll(timeout_ms=12345678)
client_poll_mocked._poll.assert_called_with(9999.999) # request_timeout_ms
def test_maybe_refresh_metadata_update(mocker, client_poll_mocked):
mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client_poll_mocked, '_can_send_request', return_value=True)
send = mocker.patch.object(client_poll_mocked, 'send')
client_poll_mocked.cluster.need_all_topic_metadata = True
client_poll_mocked.poll(timeout_ms=12345678)
client_poll_mocked._poll.assert_called_with(9999.999) # request_timeout_ms
assert client_poll_mocked._metadata_refresh_in_progress
request = MetadataRequest[0]([])
send.assert_called_once_with('foobar', request, wakeup=False)
def test_maybe_refresh_metadata_cant_send(mocker, client_poll_mocked):
mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client_poll_mocked, '_can_send_request', return_value=False)
mocker.patch.object(client_poll_mocked, '_can_connect', return_value=True)
mocker.patch.object(client_poll_mocked, '_init_connect', return_value=True)
now = time.monotonic()
t = mocker.patch('time.monotonic')
t.return_value = now
# first poll attempts connection
client_poll_mocked.poll()
client_poll_mocked._poll.assert_called()
client_poll_mocked._init_connect.assert_called_once_with('foobar')
# poll while connecting should not attempt a new connection
client_poll_mocked._connecting.add('foobar')
client_poll_mocked._can_connect.reset_mock()
client_poll_mocked.poll()
client_poll_mocked._poll.assert_called()
assert not client_poll_mocked._can_connect.called
assert not client_poll_mocked._metadata_refresh_in_progress
def test_schedule():
pass
def test_unschedule():
pass
def test_idle_connection_manager(mocker):
t = mocker.patch.object(time, 'monotonic')
t.return_value = 0
idle = IdleConnectionManager(100)
assert idle.next_check_ms() == float('inf')
idle.update('foo')
assert not idle.is_expired('foo')
assert idle.poll_expired_connection() is None
assert idle.next_check_ms() == 100
t.return_value = 90 / 1000
assert not idle.is_expired('foo')
assert idle.poll_expired_connection() is None
assert idle.next_check_ms() == 10
t.return_value = 100 / 1000
assert idle.is_expired('foo')
assert idle.next_check_ms() == 0
conn_id, conn_ts = idle.poll_expired_connection()
assert conn_id == 'foo'
assert conn_ts == 0
idle.remove('foo')
assert idle.next_check_ms() == float('inf')