Skip to content

v2.13.0 - Consumer '.consume()' method does not respect the 'num_messages' and 'timeout' parameters #2205

@andriy-buryy

Description

Description

In the confluent-kafka version 2.13.0 the behavior of the consumer .consume() method was changed.
It seems like the num_messages and timeout parameters don't have effect.
Messages are consumed in batches as soon as they available in broker.
The confluent-kafka version 2.12.2 works as expected.

Script to reproduce

import json
import logging
import sys
import threading
import time
from itertools import batched

from confluent_kafka import Consumer, Producer, version
from confluent_kafka.admin import AdminClient, NewTopic


BOOTSTRAP_SERVERS = '127.0.0.1:29092'
CONSUMER_CONFIG = {
    'bootstrap.servers': BOOTSTRAP_SERVERS,
    'group.id': 'test-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

KAFKA_TOPIC = 'test-topic'

PRODUCER_BATCHES_COUNT = 10
PRODUCER_BATCH_SIZE = 4
PRODUCER_TIMEOUT = 4

CONSUMER_BATCH_SIZE = 10
CONSUMER_TIMEOUT = 10


def create_topic(topic: str, partitions: int = 1):
    admin_client = AdminClient({'bootstrap.servers': BOOTSTRAP_SERVERS})
    cluster_metadata = admin_client.list_topics()

    if topic not in cluster_metadata.topics:
        topic_instance = NewTopic(topic=topic, num_partitions=partitions)
        create_topic_features = admin_client.create_topics(new_topics=[topic_instance])
        create_topic_features[topic].result()


def generate_messages(messages_count: int) -> list[dict[str, str]]:
    messages = [
        {'key': json.dumps({'key': i}), 'value': json.dumps({'value': i * 10})}
        for i in range(messages_count)
    ]
    return messages


def produce_messages():
    producer = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS})
    messages = generate_messages(messages_count=PRODUCER_BATCHES_COUNT * PRODUCER_BATCH_SIZE)

    for messages_batch in batched(messages, PRODUCER_BATCH_SIZE):
        producer.produce_batch(topic=KAFKA_TOPIC, messages=list(messages_batch))
        producer.flush()
        logging.info(f'[Producer] produced messages count: {len(messages_batch)}')
        time.sleep(PRODUCER_TIMEOUT)


def consume_messages():
    consumer = Consumer(CONSUMER_CONFIG)
    try:
        consumer.subscribe(topics=[KAFKA_TOPIC])

        while True:
            start_time = time.perf_counter()
            logging.info('[Consumer] start message batch consuming')
            messages = consumer.consume(num_messages=CONSUMER_BATCH_SIZE, timeout=CONSUMER_TIMEOUT)
            elapsed_time = time.perf_counter() - start_time
            logging.info(f'[Consumer] consumed messages count: {len(messages)}; elapsed time: {elapsed_time:.2f}s')
            if messages:
                consumer.commit(asynchronous=False)

    finally:
        consumer.close()


def main():
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', stream=sys.stdout)
    logging.info(f'python version: {sys.version}')
    logging.info(f'confluent-kafka version: {version()}')

    create_topic(topic=KAFKA_TOPIC)

    producer_thread = threading.Thread(target=produce_messages)
    producer_thread.start()

    consumer_thread = threading.Thread(target=consume_messages)
    consumer_thread.start()

    for thread in (producer_thread, consumer_thread):
        thread.join()


if __name__ == '__main__':
    main()

Script logs

v2.12.2 (as expected)

2026-02-26 22:41:45,760 - python version: 3.13.12 (main, Feb  4 2026, 00:00:00) [GCC 15.2.1 20260123 (Red Hat 15.2.1-7)]
2026-02-26 22:41:45,760 - confluent-kafka version: 2.12.2
2026-02-26 22:41:45,837 - [Consumer] start message batch consuming
2026-02-26 22:41:45,838 - [Producer] produced messages count: 4
2026-02-26 22:41:49,840 - [Producer] produced messages count: 4
2026-02-26 22:41:53,842 - [Producer] produced messages count: 4
2026-02-26 22:41:53,842 - [Consumer] consumed messages count: 10; elapsed time: 8.01s
2026-02-26 22:41:53,844 - [Consumer] start message batch consuming
2026-02-26 22:41:57,843 - [Producer] produced messages count: 4
2026-02-26 22:42:01,845 - [Producer] produced messages count: 4
2026-02-26 22:42:01,845 - [Consumer] consumed messages count: 10; elapsed time: 8.00s
2026-02-26 22:42:01,847 - [Consumer] start message batch consuming
2026-02-26 22:42:05,847 - [Producer] produced messages count: 4
2026-02-26 22:42:09,848 - [Producer] produced messages count: 4
2026-02-26 22:42:11,848 - [Consumer] consumed messages count: 8; elapsed time: 10.00s

v2.13.0 (changed)

2026-02-26 22:43:42,903 - python version: 3.13.12 (main, Feb  4 2026, 00:00:00) [GCC 15.2.1 20260123 (Red Hat 15.2.1-7)]
2026-02-26 22:43:42,903 - confluent-kafka version: 2.13.0
2026-02-26 22:43:42,982 - [Consumer] start message batch consuming
2026-02-26 22:43:42,984 - [Producer] produced messages count: 4
2026-02-26 22:43:46,185 - [Consumer] consumed messages count: 4; elapsed time: 3.20s
2026-02-26 22:43:46,187 - [Consumer] start message batch consuming
2026-02-26 22:43:46,986 - [Producer] produced messages count: 4
2026-02-26 22:43:46,989 - [Consumer] consumed messages count: 4; elapsed time: 0.80s
2026-02-26 22:43:46,990 - [Consumer] start message batch consuming
2026-02-26 22:43:50,988 - [Producer] produced messages count: 4
2026-02-26 22:43:50,993 - [Consumer] consumed messages count: 4; elapsed time: 4.00s
2026-02-26 22:43:50,994 - [Consumer] start message batch consuming
2026-02-26 22:43:54,990 - [Producer] produced messages count: 4
2026-02-26 22:43:54,996 - [Consumer] consumed messages count: 4; elapsed time: 4.00s

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions