Skip to content

AIOProducer.produce execution order and flush() best practices for batching #2276

Description

Hi everyone, I am using the new confluent_kafka.aio.AIOProducer and trying to find the most efficient way to batch-produce around 1,800 messages at once.

  • OS: Debian 13.3
  • Python version: 3.12
  • confluent-kafka version: 2.14.2
  • Kafka Broker: locally via Docker Compose

Looking at the official example provided in the documentation:

import asyncio
async def batch_produce(producer, topic):
    # Create multiple produce futures
    futures = [
        await producer.produce(topic=topic, key=f'key{i}', value=f'value{i}')
        for i in range(100)
    ]

    # Flush to ensure messages are in flight
    await producer.flush()

    # Wait for all deliveries
    messages = await asyncio.gather(*futures)
    print(f'Delivered {len(messages)} messages')

asyncio.run(batch_produce())

I implemented a similar approach in my production code to collect and send financial contracts.

class KafkaProducer:
    def __init__(self, bootstrap_servers: str):
        self._producer = AIOProducer({
            "bootstrap.servers": bootstrap_servers,
            "acks": 1,
            "client.id": "producer-1",
            "linger.ms": 30,
            "batch.num.messages": 2000,
            "compression.type": "lz4"
        })

    async def produce_message(self, topic: str, encoded_payload: bytes, key: str | None = None) -> asyncio.Future:
        encoded_key = key.encode("utf-8") if key else None

        return await self._producer.produce(
            topic=topic,
            key=encoded_key,
            value=encoded_payload
        )

    async def flush(self):
        await self._producer.flush()

    async def close(self):
        await self._producer.flush()
        await self._producer.close()
        logger.info("[Kafka] AIOProducer closed.")

class ContractCollector:
    def __init__(self, name: str, contract_provider: ContractProvider, kafka_config: KafkaConfig, kafka_producer: KafkaProducer):
        self._name = name
        self._contract_provider = contract_provider
        self._kafka_topic = kafka_config.kafka_contracts_topic
        self._kafka_producer = kafka_producer

    async def collect(self) -> list[Contract]:
        contracts = await self._contract_provider.get_contracts()
        logger.info(f"[{self._name}] Collected {len(contracts)} contracts.")

        delivery_futures = []
        for contract in contracts:
            key = f"{contract.cex_id.id}_{contract.chain_id.id}_{contract.symbol}"
            encoded_payload = contract.model_dump_json().encode('utf-8')

            future = await self._kafka_producer.produce_message(
                topic=self._kafka_topic,
                key=key,
                encoded_payload=encoded_payload,
            )
            delivery_futures.append(future)
            
        # await self._kafka_producer.flush()
        messages = await asyncio.gather(*delivery_futures)

        logger.info(f'Delivered {len(messages)} messages')
        return contracts

I have two questions regarding performance and best practices here:

1. Is await producer.produce() inside a loop blocking concurrent execution?

Since produce is an async def method, using await inside the list comprehension (or a for loop) sequentially waits for each message registration.

  • If we await it inside the loop, doesn't it defeat the purpose of asyncio.gather(*futures) later?
  • Should we instead use asyncio.gather(*[producer.produce(...) for ...]) to schedule all internal queue enqueueing concurrently, and then get the delivery futures also using asyncio.gather(*delivery_futures)?

2. Is explicit await producer.flush() a good practice before asyncio.gather(*futures)?

I noticed a massive difference in behavior when benchmarking ~1,800 messages:

  • Without flush(): Iterating and getting futures takes 15–30 ms, but awaiting asyncio.gather(*delivery_futures) blocks for 2.0–2.5 seconds before resolving.
  • With flush(): The total execution time drops significantly.

Is calling flush() explicitly the recommended way to force the underlying C-library to trigger queue polling and network delivery immediately in batch scenarios, or is it considered an anti-pattern for high-throughput async pipelines?

Thank you for your help!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions