Skip to content

Commit 20410c3

Browse files
committed
Minor fixes
1 parent 9b1a164 commit 20410c3

3 files changed

Lines changed: 15 additions & 15 deletions

File tree

src/confluent_kafka/src/Consumer.c

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,12 +1161,11 @@ Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) {
11611161
rkqu, total_timeout_ms, rkmessages, num_messages);
11621162

11631163
if (msgs_received_count < 0) {
1164-
/* Error - need to restore GIL before setting error */
1165-
PyEval_RestoreThread(cs.thread_state);
1164+
if (CallState_end(self, &cs))
1165+
cfl_PyErr_Format(
1166+
rd_kafka_last_error(), "%s",
1167+
rd_kafka_err2str(rd_kafka_last_error()));
11661168
free(rkmessages);
1167-
cfl_PyErr_Format(
1168-
rd_kafka_last_error(), "%s",
1169-
rd_kafka_err2str(rd_kafka_last_error()));
11701169
return NULL;
11711170
}
11721171
} else {
@@ -1189,15 +1188,14 @@ Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) {
11891188
(unsigned int)msgs_received_count);
11901189

11911190
if (chunk_msg_count < 0) {
1192-
/* Error - destroy accumulated messages,
1193-
* restore GIL, and raise */
11941191
for (i = 0; i < msgs_received_count; i++)
11951192
rd_kafka_message_destroy(rkmessages[i]);
1196-
PyEval_RestoreThread(cs.thread_state);
1193+
if (CallState_end(self, &cs))
1194+
cfl_PyErr_Format(
1195+
rd_kafka_last_error(), "%s",
1196+
rd_kafka_err2str(
1197+
rd_kafka_last_error()));
11971198
free(rkmessages);
1198-
cfl_PyErr_Format(
1199-
rd_kafka_last_error(), "%s",
1200-
rd_kafka_err2str(rd_kafka_last_error()));
12011199
return NULL;
12021200
}
12031201

tests/integration/consumer/test_consumer_wakeable_poll_consume.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,8 @@ def produce_in_waves():
270270
msglist = consumer.consume(num_messages=10, timeout=10.0)
271271

272272
# Should have accumulated messages across multiple waves
273-
assert len(msglist) >= 10, (
274-
f"Expected at least 10 messages accumulated across waves, got {len(msglist)}. "
273+
assert len(msglist) == 10, (
274+
f"Expected exactly 10 messages accumulated across waves, got {len(msglist)}. "
275275
f"consume() may be returning early on the first wave."
276276
)
277277

tests/test_Wakeable.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,7 +1540,9 @@ def test_consume_accumulation_short_timeout_skips_chunking():
15401540

15411541
assert isinstance(msglist, list), "consume() should return a list"
15421542
assert len(msglist) == 0, "Expected empty list"
1543-
assert elapsed < WAKEABLE_POLL_TIMEOUT_MIN, f"Short timeout should not use chunking, took {elapsed:.2f}s"
1543+
assert (
1544+
elapsed <= WAKEABLE_POLL_TIMEOUT_MAX
1545+
), f"Short timeout should not behave like a long chunked wait, took {elapsed:.2f}s"
15441546
consumer.close()
15451547

15461548

@@ -1562,5 +1564,5 @@ def test_consume_accumulation_zero_timeout_nonblocking():
15621564

15631565
assert isinstance(msglist, list), "consume() should return a list"
15641566
assert len(msglist) == 0, "Expected empty list"
1565-
assert elapsed < WAKEABLE_POLL_TIMEOUT_MIN, f"Zero timeout should return immediately, took {elapsed:.2f}s"
1567+
assert elapsed <= WAKEABLE_POLL_TIMEOUT_MAX, f"Zero timeout should return immediately, took {elapsed:.2f}s"
15661568
consumer.close()

0 commit comments

Comments
 (0)