Skip to content

Commit 29dd257

Browse files
committed
Minor fix
1 parent 20410c3 commit 29dd257

2 files changed

Lines changed: 39 additions & 1 deletion

File tree

src/confluent_kafka/src/Producer.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,10 @@ static int Producer_poll0(Handle *self, int tmout) {
394394
r = chunk_result;
395395
break;
396396
}
397-
r += chunk_result; /* Accumulate events processed */
397+
r += chunk_result;
398+
399+
if (chunk_result > 0)
400+
break;
398401

399402
chunk_count++;
400403

tests/integration/producer/test_producer_wakeable_poll_flush.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,41 @@ def delivery_callback(err, msg):
119119
consumer.close()
120120

121121

122+
def test_poll_returns_early_after_delivery_callback(kafka_cluster):
123+
"""Test that poll() returns early after delivery callback fires."""
124+
topic = kafka_cluster.create_topic_and_wait_propogation('test-poll-early-return')
125+
126+
delivery_called = []
127+
128+
def delivery_callback(err, msg):
129+
delivery_called.append(time.time())
130+
131+
producer_conf = kafka_cluster.client_conf(
132+
{
133+
'socket.timeout.ms': 100,
134+
'message.timeout.ms': 10000,
135+
}
136+
)
137+
producer = kafka_cluster.cimpl_producer(producer_conf)
138+
139+
producer.produce(topic, value=b'early-return-test', on_delivery=delivery_callback)
140+
141+
# Poll with a long timeout — should return early once callback fires
142+
poll_timeout = 5.0
143+
start = time.time()
144+
events = producer.poll(timeout=poll_timeout)
145+
elapsed = time.time() - start
146+
147+
assert len(delivery_called) == 1, "Expected delivery callback to fire"
148+
assert events > 0, "Expected at least 1 event served"
149+
assert elapsed < poll_timeout - 1.0, (
150+
f"poll({poll_timeout}) took {elapsed:.2f}s — should have returned "
151+
f"early after delivery callback, not blocked for full timeout"
152+
)
153+
154+
producer.close()
155+
156+
122157
def test_flush_message_delivery_with_wakeable_pattern(kafka_cluster):
123158
"""Test that flush() correctly delivers messages when using wakeable pattern.
124159

0 commit comments

Comments
 (0)