-
Notifications
You must be signed in to change notification settings - Fork 955
Expand file tree
/
Copy pathtest_ShareConsumer.py
More file actions
948 lines (767 loc) · 33.5 KB
/
Copy pathtest_ShareConsumer.py
File metadata and controls
948 lines (767 loc) · 33.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for ShareConsumer class.
"""
import gc
import sys
import threading
import time
import weakref
import pytest
from confluent_kafka import (
AcknowledgeType,
ConcurrentModificationException,
IllegalStateException,
KafkaError,
KafkaException,
Message,
Messages,
ShareConsumer,
)
from tests.common import (
TestShareConsumer,
TestUtils,
unique_id,
)
@pytest.fixture
def share_consumer():
"""Default-configured ShareConsumer with teardown.
Each test gets a unique group.id so librdkafka's per-group internal state
can't leak from one test into the next.
"""
sc = TestShareConsumer(
{
'group.id': unique_id('test-share-group'),
'socket.timeout.ms': 100,
}
)
yield sc
sc.close()
def test_constructor_requires_config():
"""ShareConsumer constructor requires a configuration dict."""
with pytest.raises(TypeError) as ex:
ShareConsumer()
assert ex.match('expected configuration dict')
def test_constructor_with_valid_config(share_consumer):
"""ShareConsumer can be created with valid configuration."""
assert share_consumer is not None
def test_constructor_dict_with_kwargs():
"""ShareConsumer accepts a positional config dict + keyword arguments
(cimpl.pyi overload form 2).
The positional dict carries Kafka config; the kwargs carry runtime extras
like `logger`. If the C extension rejected this form, mypy would bless
user code that crashed at runtime.
"""
import logging
sc = ShareConsumer(
{
'group.id': unique_id('test-share-form2'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
},
logger=logging.getLogger('test-share-form2'),
)
sc.close()
def test_constructor_kwargs_only():
"""ShareConsumer accepts configuration entirely via keyword arguments
(cimpl.pyi overload form 3), spread from a dict at the call site.
"""
config = {
'group.id': unique_id('test-share-form3'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
sc = ShareConsumer(**config)
sc.close()
def test_constructor_rejects_on_commit():
"""Share consumers acknowledge records instead of committing offsets,
so on_commit has nothing to fire on. Setting it in the positional
config dict OR as a kwarg must be rejected at construction time so the
misconfiguration is visible to callers instead of being silently held
as a callback that never runs.
Wired via ShareConsumer_init's pre-filter pass over args[0] + kwargs
(ShareConsumer.c), which scans for inapplicable keys before handing
off to common_conf_setup.
"""
config = {
'group.id': unique_id('test-share-no-commit'),
'bootstrap.servers': 'localhost:9092',
}
cb = lambda *a, **kw: None # noqa: E731
with pytest.raises(ValueError, match='on_commit is not supported'):
ShareConsumer({**config, 'on_commit': cb})
with pytest.raises(ValueError, match='on_commit is not supported'):
ShareConsumer(config, on_commit=cb)
def test_subscription_on_fresh_consumer(share_consumer):
"""A consumer that has never called subscribe() reports an empty
subscription. Locks down the no-subscription representation so a future
librdkafka change (e.g. None instead of []) is caught immediately."""
assert share_consumer.subscription() == []
def test_subscribe_replaces_previous(share_consumer):
"""subscribe() replaces — does NOT extend — the previous subscription.
Locks down the documented merge-vs-replace contract."""
share_consumer.subscribe(['topic-a'])
share_consumer.subscribe(['topic-b'])
assert share_consumer.subscription() == ['topic-b']
def test_subscribe(share_consumer):
"""Test subscribe() method."""
share_consumer.subscribe(['test-topic'])
subscription = share_consumer.subscription()
assert subscription is not None
assert 'test-topic' in subscription
def test_subscribe_multiple_topics(share_consumer):
"""subscribe() with several topics: subscription() reports all of them.
The result comes back sorted rather than in insertion order, so compare
order-agnostically — what matters is that every topic survives the round
trip, not the ordering."""
share_consumer.subscribe(['topic-c', 'topic-a', 'topic-b'])
assert sorted(share_consumer.subscription()) == ['topic-a', 'topic-b', 'topic-c']
def test_subscribe_idempotent_and_incremental(share_consumer):
"""Re-subscribing to the same set is idempotent, growing the topic list
grows the subscription, and a repeated unsubscribe() is a harmless no-op.
"""
# Incremental: each subscribe() fully replaces the prior set, so a growing
# list yields a growing subscription.
share_consumer.subscribe(['a'])
assert share_consumer.subscription() == ['a']
share_consumer.subscribe(['a', 'b'])
assert sorted(share_consumer.subscription()) == ['a', 'b']
share_consumer.subscribe(['a', 'b', 'c'])
assert sorted(share_consumer.subscription()) == ['a', 'b', 'c']
# Idempotent: subscribing to the same set repeatedly doesn't duplicate it.
share_consumer.subscribe(['x', 'y'])
share_consumer.subscribe(['x', 'y'])
share_consumer.subscribe(['x', 'y'])
assert sorted(share_consumer.subscription()) == ['x', 'y']
# Repeated unsubscribe is a no-op, not an error.
share_consumer.unsubscribe()
share_consumer.unsubscribe()
assert share_consumer.subscription() == []
def test_unsubscribe(share_consumer):
"""Test unsubscribe() method."""
share_consumer.subscribe(['test-topic'])
share_consumer.unsubscribe()
subscription = share_consumer.subscription()
assert len(subscription) == 0
def test_unsubscribe_without_subscription_is_noop(share_consumer):
"""unsubscribe() before any subscribe() is a no-op: it returns None and
leaves the subscription empty rather than raising."""
assert share_consumer.subscription() == []
assert share_consumer.unsubscribe() is None
assert share_consumer.subscription() == []
def test_poll_no_broker(share_consumer):
"""Test poll() returns an empty batch when no broker available."""
share_consumer.subscribe(['test-topic'])
messages = share_consumer.poll(timeout=0.1)
assert messages.is_empty()
def test_poll_returns_messages(share_consumer):
"""poll() returns a Messages, not a bare list."""
share_consumer.subscribe(['test-topic'])
out = share_consumer.poll(timeout=0.1)
assert isinstance(out, Messages)
assert out.is_empty()
assert out.count() == 0
def test_poll_without_subscription_raises_state(share_consumer):
"""poll() before any subscribe() raises IllegalStateException(_STATE).
The "not subscribed" check fires before any broker I/O, so this returns
immediately without a broker. We pin the exception type but not the
message text: depending on timing it can be either "not subscribed" or
"consumer group not initialized", both of which are _STATE."""
with pytest.raises(IllegalStateException) as ex:
share_consumer.poll(timeout=0.1)
assert str(ex.value)
def test_commit_does_not_hang_on_unreachable_broker():
"""Commit on a fresh, unsubscribed consumer pointed at an unreachable
broker returns immediately (no acks pending). The interesting case
— pending acks against an unreachable broker — needs wire-frame
mocking to exercise."""
sc = ShareConsumer(
{
'bootstrap.servers': '127.0.0.1:1',
'group.id': unique_id('test-share-commit-unreachable'),
'share.acknowledgement.mode': 'implicit',
'socket.timeout.ms': 1000,
}
)
try:
# Don't subscribe — librdkafka has crashed in the past when commit
# is called on a subscribed-but-never-connected consumer.
start = time.monotonic()
result = sc.commit_sync(timeout=2.0)
elapsed = time.monotonic() - start
assert result == {}
assert elapsed < 5.0, f'commit hung for {elapsed:.2f}s'
finally:
sc.close()
def test_context_manager():
"""Test that ShareConsumer works as a context manager and closes on exit."""
with ShareConsumer(
{
'group.id': unique_id('test-share-ctx'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
) as sc:
assert sc is not None
sc.subscribe(['test-topic'])
subscription = sc.subscription()
assert 'test-topic' in subscription
# After exiting the context manager, the consumer should be closed
with pytest.raises(RuntimeError) as ex:
sc.subscribe(['test-topic'])
assert ex.match('Share consumer closed')
def test_close_idempotent():
"""Test that close() can be called multiple times."""
sc = ShareConsumer(
{
'group.id': unique_id('test-share-close-idem'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
)
sc.close()
# TODO: a second close() on an already-closed share consumer should
# raise a "Share consumer closed" RuntimeError (consistent with the
# post-close behavior of the other ShareConsumer methods). Today it
# silently no-ops, matching Consumer.close(); flip the assertion when
# the underlying behavior is changed.
sc.close()
def test_any_method_after_close_throws_exception():
"""All operations on a closed consumer raise IllegalStateException — a
RuntimeError subclass whose message (str(exc)) is the _STATE error."""
sc = ShareConsumer(
{
'group.id': unique_id('test-share-after-close'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
)
sc.subscribe(['test-topic'])
sc.close()
with pytest.raises(IllegalStateException) as ex:
sc.subscribe(['test'])
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.unsubscribe()
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.subscription()
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.poll(timeout=0.1)
assert str(ex.value)
assert ex.match('Share consumer closed')
# The closed-state check happens before argument parsing, so acknowledge(None)
# raises the closed-consumer IllegalStateException rather than a TypeError about
# the non-Message argument.
with pytest.raises(IllegalStateException) as ex:
sc.acknowledge(None, AcknowledgeType.ACCEPT)
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.acknowledge_offset('test-topic', 0, 0, AcknowledgeType.ACCEPT)
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.commit_sync(timeout=0.1)
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.commit_async()
assert str(ex.value)
assert ex.match('Share consumer closed')
with pytest.raises(IllegalStateException) as ex:
sc.set_sasl_credentials('user', 'pass')
assert str(ex.value)
assert ex.match('Share consumer closed')
def test_required_group_id():
"""Test that group.id is required."""
with pytest.raises(ValueError) as ex:
ShareConsumer(
{
'bootstrap.servers': 'localhost:9092',
}
)
assert ex.match('group.id must be set')
def test_subscribe_with_non_list_raises(share_consumer):
"""subscribe() must reject non-list arguments."""
with pytest.raises(TypeError, match='expected list'):
share_consumer.subscribe('not_a_list')
with pytest.raises(TypeError, match='expected list'):
share_consumer.subscribe(None)
def test_subscribe_with_empty_list_unsubscribes(share_consumer):
"""subscribe([]) is equivalent to unsubscribe(): an empty topic list clears
the current subscription instead of raising, after which poll() raises
_STATE (not subscribed).
This is an empty *list* — an empty topic *name* is a different case and
still raises _INVALID_ARG (test_subscribe_rejects_empty_and_duplicate_topic_names).
"""
share_consumer.subscribe(['test-topic'])
assert share_consumer.subscription() == ['test-topic']
assert share_consumer.subscribe([]) is None
assert share_consumer.subscription() == []
with pytest.raises(IllegalStateException) as ex:
share_consumer.poll(timeout=0.1)
assert str(ex.value)
def test_subscribe_rejects_empty_and_duplicate_topic_names(share_consumer):
"""An empty topic name and duplicate topic names are rejected with
_INVALID_ARG. (An empty *list* is a different case — it unsubscribes.)"""
with pytest.raises(ValueError) as ex:
share_consumer.subscribe([''])
assert str(ex.value)
with pytest.raises(ValueError) as ex:
share_consumer.subscribe(['dup-topic', 'dup-topic'])
assert str(ex.value)
def test_subscribe_accepts_caret_topic_as_literal_name(share_consumer):
"""A '^'-prefixed name is accepted and stored verbatim — it's treated as a
literal topic name, not a regex pattern. Whether it matches any topic is a
broker-side question; here we just confirm it's accepted and round-trips."""
share_consumer.subscribe(['^literal-name'])
assert share_consumer.subscription() == ['^literal-name']
def test_poll_with_non_numeric_timeout_raises(share_consumer):
"""poll(timeout=...) must reject non-numeric values."""
share_consumer.subscribe(['test-topic'])
with pytest.raises(TypeError):
share_consumer.poll(timeout='bad')
with pytest.raises(TypeError):
share_consumer.poll(timeout=None)
def test_poll_accepts_int_timeout(share_consumer):
"""poll() takes an int timeout, not just a float; it's coerced the same way.
The rejection side is covered by test_poll_with_non_numeric_timeout_raises."""
share_consumer.subscribe(['test-topic'])
result = share_consumer.poll(timeout=1)
assert result.is_empty()
# TODO: subscribe([123, 456]) and subscribe([None]) currently silently
# coerce non-string items to topic names via PyObject_Str (str(123) -> "123",
# str(None) -> "None"). This is inherited from Consumer's cfl_PyObject_Unistr
# helper. Strict isinstance(item, str) checking would catch buggy callers but
# is a backward-incompatible change. Add a negative test for these once the
# behavior is tightened.
def test_acknowledge_rejects_non_message_argument(share_consumer):
"""acknowledge() must reject non-Message arguments."""
for bad in (None, 'not-a-message', 42, object(), []):
with pytest.raises(TypeError):
share_consumer.acknowledge(bad, AcknowledgeType.ACCEPT)
def test_acknowledge_none_topic_message_rejected(share_consumer):
"""acknowledge() of a Message with no topic (topic() is None) is rejected
with _INVALID_ARG rather than crashing on the missing topic.
partition/offset are valid, so the absent topic is the only thing wrong.
A missing topic is checked before the ack-mode check, which is why the
default implicit-mode fixture works here — the same call with a real topic
would instead return _STATE (an explicit ack in implicit mode)."""
msg = Message(partition=0, offset=0)
assert msg.topic() is None
with pytest.raises(ValueError) as ex:
share_consumer.acknowledge(msg, AcknowledgeType.ACCEPT)
assert str(ex.value)
def test_set_sasl_credentials_accepts_strings(share_consumer):
"""Setting credentials doesn't touch the network, so it works on an
unconnected consumer and just returns None."""
assert share_consumer.set_sasl_credentials('user', 'secret') is None
# keyword form
assert share_consumer.set_sasl_credentials(username='user2', password='s2') is None
def test_set_sasl_credentials_rejects_bad_arguments(share_consumer):
"""set_sasl_credentials() requires exactly two string arguments."""
with pytest.raises(TypeError):
share_consumer.set_sasl_credentials() # missing both
with pytest.raises(TypeError):
share_consumer.set_sasl_credentials('user') # missing password
with pytest.raises(TypeError):
share_consumer.set_sasl_credentials(123, 'pw') # non-str username
with pytest.raises(TypeError):
share_consumer.set_sasl_credentials('user', None) # non-str password
def test_acknowledge_offset_rejects_non_str_topic(share_consumer):
"""acknowledge_offset() must reject non-str topic."""
for bad in (None, 42, object(), []):
with pytest.raises(TypeError):
share_consumer.acknowledge_offset(bad, 0, 0, AcknowledgeType.ACCEPT)
def test_acknowledge_offset_rejects_non_int_partition(share_consumer):
"""acknowledge_offset() must reject non-int partition."""
for bad in ('str', None, object(), [], 1.5):
with pytest.raises(TypeError):
share_consumer.acknowledge_offset('topic', bad, 0, AcknowledgeType.ACCEPT)
def test_acknowledge_offset_rejects_non_int_offset(share_consumer):
"""acknowledge_offset() must reject non-int offset."""
for bad in ('str', None, object(), [], 1.5):
with pytest.raises(TypeError):
share_consumer.acknowledge_offset('topic', 0, bad, AcknowledgeType.ACCEPT)
def test_acknowledge_offset_rejects_negative_partition(share_consumer):
"""librdkafka rejects negative partition with _INVALID_ARG."""
with pytest.raises(ValueError) as ex:
share_consumer.acknowledge_offset('topic', -1, 0, AcknowledgeType.ACCEPT)
assert str(ex.value)
def test_acknowledge_offset_rejects_negative_offset(share_consumer):
"""librdkafka rejects negative offset with _INVALID_ARG."""
with pytest.raises(ValueError) as ex:
share_consumer.acknowledge_offset('topic', 0, -1, AcknowledgeType.ACCEPT)
assert str(ex.value)
def test_acknowledge_offset_rejects_out_of_range_ack_type():
"""An out-of-range AcknowledgeType is rejected with _INVALID_ARG.
Only ACCEPT(1), RELEASE(2) and REJECT(3) are valid. The ack_type is only
checked after the ack-mode check, and an explicit ack in implicit mode
returns _STATE first — so this needs an explicit-mode consumer (not the
shared implicit fixture) to actually reach the type check. topic/partition/
offset are valid, so the bad ack_type is the only thing wrong. acknowledge()
takes the same path, so this covers both ack APIs."""
sc = ShareConsumer(
{
'group.id': unique_id('test-share-bad-ack-type'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
'share.acknowledgement.mode': 'explicit',
}
)
try:
# 0 sits just below ACCEPT(1), 4 just above REJECT(3); 999 is far out.
for bad_ack_type in (0, 4, 999):
with pytest.raises(ValueError) as ex:
sc.acknowledge_offset('test-topic', 0, 0, bad_ack_type)
assert str(ex.value), f'ack_type={bad_ack_type} should be rejected with _INVALID_ARG'
finally:
sc.close()
def test_commit_sync_rejects_non_numeric_timeout(share_consumer):
"""commit_sync(timeout=...) must reject non-numeric values."""
for bad in ('str', None, object(), []):
with pytest.raises(TypeError):
share_consumer.commit_sync(timeout=bad)
def test_commit_sync_rejects_unknown_kwargs(share_consumer):
"""commit_sync() must reject unknown keyword arguments."""
with pytest.raises(TypeError):
share_consumer.commit_sync(unknown_kwarg=1.0)
def test_commit_async_rejects_any_argument(share_consumer):
"""commit_async() takes no arguments."""
with pytest.raises(TypeError):
share_consumer.commit_async(1.0)
with pytest.raises(TypeError):
share_consumer.commit_async(timeout=1.0)
def test_poll_interruptible_by_signal():
"""ShareConsumer.poll uses chunked polling so SIGINT surfaces as
KeyboardInterrupt instead of being swallowed until the librdkafka timeout
expires. Verifies the chunked loop in ShareConsumer.c::ShareConsumer_poll
actually checks for signals between chunks. Mirrors the pattern in
test_Wakeable.py for the regular Consumer.
"""
sc1 = TestShareConsumer(
{
'group.id': unique_id('test-poll-signal-finite'),
'socket.timeout.ms': 100,
}
)
sc1.subscribe(['test-topic'])
interrupt_thread = threading.Thread(target=lambda: TestUtils.send_sigint_after_delay(0.4))
interrupt_thread.daemon = True
interrupt_thread.start()
interrupted = False
try:
sc1.poll(timeout=5.0) # 5s budget — interrupt should fire well before
except KeyboardInterrupt:
interrupted = True
finally:
sc1.close()
assert interrupted, "poll(timeout=5.0) should have been interrupted by SIGINT"
sc2 = TestShareConsumer(
{
'group.id': unique_id('test-poll-signal-infinite'),
'socket.timeout.ms': 100,
}
)
sc2.subscribe(['test-topic'])
interrupt_thread = threading.Thread(target=lambda: TestUtils.send_sigint_after_delay(0.4))
interrupt_thread.daemon = True
interrupt_thread.start()
interrupted = False
try:
sc2.poll() # infinite timeout
except KeyboardInterrupt:
interrupted = True
finally:
sc2.close()
assert interrupted, "poll() (infinite) should have been interrupted by SIGINT"
def test_concurrent_thread_access_raises_conflict():
"""A ShareConsumer is not safe for concurrent use: touching it from a
second thread while another thread is inside poll() raises
ConcurrentModificationException(_CONFLICT).
Ownership is held by whichever thread is currently in a call, for the whole
duration of that call (including poll()'s blocking wait), so a second
thread's call is rejected. No broker needed — the guard is local and stays
held across the idle poll, so the hammer thread reliably hits it.
commit_async() makes a good probe: it's guarded but returns immediately.
"""
sc = TestShareConsumer(
{
'group.id': unique_id('test-share-conflict'),
'socket.timeout.ms': 100,
}
)
sc.subscribe(['test-topic'])
conflicts = []
other_errors = []
stop = threading.Event()
def hammer():
while not stop.is_set():
try:
sc.commit_async()
except ConcurrentModificationException as exc:
# The exception type alone signals _CONFLICT - no KafkaError code to inspect.
conflicts.append(exc)
except Exception as exc: # noqa: BLE001 - record anything unexpected
other_errors.append(repr(exc))
hammer_thread = threading.Thread(target=hammer, daemon=True)
hammer_thread.start()
try:
# Keep the consumer busy inside poll() until the hammer thread sees a
# conflict, or give up after a few seconds. The main thread can lose the
# race too if the hammer briefly grabs ownership — that's fine, swallow
# it and keep polling.
deadline = time.monotonic() + 3.0
while not conflicts and time.monotonic() < deadline:
try:
sc.poll(timeout=0.2)
except ConcurrentModificationException:
pass
finally:
stop.set()
hammer_thread.join(timeout=2.0)
sc.close()
assert conflicts, "second-thread access during poll() should have raised _CONFLICT"
assert all(isinstance(exc, ConcurrentModificationException) for exc in conflicts)
assert not other_errors, f"unexpected errors from second thread: {[str(e) for e in other_errors]}"
def test_dealloc_without_close_destroys_handle():
"""Dropping a ShareConsumer without close() must let dealloc destroy the
handle cleanly.
close() destroys the handle and NULLs it, so a consumer that gets closed
leaves dealloc nothing to do. Not closing is the only way to reach
dealloc's destroy path. A regression there (use-after-free / double-free)
crashes the interpreter; a milder error surfaces as an unraisable exception
from the destructor, which we capture and assert against.
"""
sc = ShareConsumer(
{
'group.id': unique_id('test-share-dealloc-no-close'),
'bootstrap.servers': 'localhost:9092',
}
)
sc.subscribe(['test-topic'])
unraisables = []
prev_hook = sys.unraisablehook
sys.unraisablehook = lambda args: unraisables.append(args)
try:
# Drop the last reference: refcounting runs dealloc right here. The
# collect() is a safety net in case a callback ever forms a cycle.
del sc
gc.collect()
finally:
sys.unraisablehook = prev_hook
assert unraisables == [], f"dealloc raised: {[u.exc_value for u in unraisables]}"
def test_share_exceptions_exported():
"""Both new types are defined in cimpl, re-exported from the package, and
listed in __all__."""
import confluent_kafka
from confluent_kafka import cimpl
assert confluent_kafka.IllegalStateException is cimpl.IllegalStateException is IllegalStateException
assert (
confluent_kafka.ConcurrentModificationException
is cimpl.ConcurrentModificationException
is ConcurrentModificationException
)
assert 'IllegalStateException' in confluent_kafka.__all__
assert 'ConcurrentModificationException' in confluent_kafka.__all__
@pytest.mark.parametrize('exc_type', [IllegalStateException, ConcurrentModificationException])
def test_share_exception_subclass_contract(exc_type):
"""Subclass RuntimeError (so pre-KIP-932 `except RuntimeError` still
catches them) but deliberately NOT KafkaException (so `except
KafkaException` won't silently swallow them)."""
assert issubclass(exc_type, RuntimeError)
assert not issubclass(exc_type, KafkaException)
@pytest.mark.parametrize(
'trigger, exc_type',
[
(lambda sc: sc.poll(timeout=0.1), IllegalStateException),
(lambda sc: sc.subscribe(['']), ValueError),
],
ids=['_STATE->IllegalStateException', '_INVALID_ARG->ValueError'],
)
def test_error_code_maps_to_python_exception(share_consumer, trigger, exc_type):
"""Each reachable error code surfaces as its mapped Python type. The type
conveys the code; args[0] is the plain message string (not a KafkaError),
and librdkafka's detail is preserved in it.
_CONFLICT -> ConcurrentModificationException needs two threads, so it has
its own test (test_concurrent_thread_access_raises_conflict)."""
with pytest.raises(exc_type) as ex:
trigger(share_consumer)
assert not isinstance(ex.value.args[0], KafkaError)
assert str(ex.value) # original message preserved, not blanked
def test_state_error_is_catchable_as_runtimeerror(share_consumer):
"""Compat bridge: because IllegalStateException is a RuntimeError, code
written against the pre-KIP-932 closed-consumer RuntimeError keeps working
without having to catch KafkaException."""
with pytest.raises(RuntimeError) as ex:
share_consumer.poll(timeout=0.1)
assert isinstance(ex.value, IllegalStateException)
assert str(ex.value)
def test_gc_of_never_closed_consumer_is_clean():
"""Dropping a consumer that was never close()d should tear down cleanly.
Garbage collection has to close it for us, so point it at a dead broker
with a short socket.timeout.ms to keep teardown from blocking.
"""
sc = TestShareConsumer(
{
'bootstrap.servers': '127.0.0.1:1',
'group.id': unique_id('test-share-gc-noclose'),
'socket.timeout.ms': 100,
}
)
sc.subscribe(['test-topic'])
ref = weakref.ref(sc)
start = time.monotonic()
del sc
gc.collect()
elapsed = time.monotonic() - start
assert ref() is None, 'consumer was not collected'
assert elapsed < 5.0, f'GC teardown took {elapsed:.2f}s (possible hang)'
def test_gc_after_close_does_not_double_free():
"""Garbage collection after close() must not double-free.
close() already tore the consumer down, so collection has to notice it's
gone and not free it a second time. A regression here usually crashes the
interpreter rather than raising.
"""
sc = TestShareConsumer(
{
'bootstrap.servers': '127.0.0.1:1',
'group.id': unique_id('test-share-gc-afterclose'),
'socket.timeout.ms': 100,
}
)
sc.subscribe(['test-topic'])
sc.close()
ref = weakref.ref(sc)
del sc
gc.collect()
assert ref() is None, 'consumer was not finalized after close()'
def test_gc_after_failed_construction_is_clean():
"""Collecting a consumer whose __init__ raised must not crash.
Both configs fail before the consumer is fully built, leaving a half-made
object that cleanup still has to handle.
"""
failing_configs = [
# group.id is required.
{'bootstrap.servers': 'localhost:9092'},
# on_commit isn't supported on a share consumer.
{'group.id': unique_id('test-share-gc-init'), 'on_commit': lambda *a: None},
]
for conf in failing_configs:
with pytest.raises(ValueError):
ShareConsumer(conf)
gc.collect() # cleaning up the failed objects shouldn't crash
def test_with_block_closes_on_exception_and_propagates():
"""An exception inside a `with` block still closes the consumer, and the
original exception propagates (__exit__ doesn't swallow it).
"""
with pytest.raises(ValueError, match='boom'):
with ShareConsumer(
{
'group.id': unique_id('test-share-ctx-raise'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
) as sc:
sc.subscribe(['test-topic'])
raise ValueError('boom')
# `sc` is still bound out here, and __exit__ already closed it.
with pytest.raises(RuntimeError) as ex:
sc.subscribe(['test-topic'])
assert ex.match('Share consumer closed')
def test_explicit_close_inside_with_block_is_noop_on_exit():
"""Closing inside a `with` block shouldn't trip up the close() that runs on
the way out — the second one is a no-op.
"""
with ShareConsumer(
{
'group.id': unique_id('test-share-ctx-close'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
) as sc:
sc.subscribe(['test-topic'])
sc.close() # __exit__ will see it's already closed and skip
with pytest.raises(RuntimeError) as ex:
sc.subscribe(['test-topic'])
assert ex.match('Share consumer closed')
def test_close_does_not_hang_on_unreachable_broker():
"""close() against a dead broker should return quickly, not hang.
It blocks for roughly socket.timeout.ms while trying to leave the group, so
a short timeout keeps the whole teardown bounded.
"""
sc = ShareConsumer(
{
'bootstrap.servers': '127.0.0.1:1',
'group.id': unique_id('test-share-close-unreachable'),
'socket.timeout.ms': 1000,
}
)
sc.subscribe(['test-topic'])
# Poll once so the group is actually joined and close() has to leave it.
sc.poll(timeout=0.2)
start = time.monotonic()
sc.close()
elapsed = time.monotonic() - start
assert elapsed < 10.0, f'close() hung for {elapsed:.2f}s on an unreachable broker'
def test_construction_failure_raises_cleanly():
"""A config that passes the early checks but fails while the consumer is
being built should raise ValueError, not crash or leak.
A bad ssl.ca.location is accepted up front but fails when the SSL context
is loaded during construction.
"""
with pytest.raises(ValueError) as ex:
ShareConsumer(
{
'group.id': unique_id('test-share-new-fail'),
'bootstrap.servers': 'localhost:9092',
'security.protocol': 'ssl',
'ssl.ca.location': '/nonexistent/path/ca.pem',
'socket.timeout.ms': 100,
}
)
assert ex.match('Failed to create share consumer')
gc.collect() # cleaning up the failed object shouldn't crash
def test_double_init_raises():
"""Calling __init__ twice should raise rather than overwrite (and leak)
the existing handle.
"""
sc = ShareConsumer(
{
'group.id': unique_id('test-share-double-init'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
)
try:
with pytest.raises(RuntimeError) as ex:
sc.__init__(
{
'group.id': unique_id('test-share-double-init-2'),
'bootstrap.servers': 'localhost:9092',
'socket.timeout.ms': 100,
}
)
assert ex.match('already initialized')
finally:
sc.close()