Hi everyone, I am using the new confluent_kafka.aio.AIOProducer and trying to find the most efficient way to batch-produce around 1,800 messages at once.
- OS: Debian 13.3
- Python version: 3.12
- confluent-kafka version: 2.14.2
- Kafka Broker: locally via Docker Compose
Looking at the official example provided in the documentation:
import asyncio
async def batch_produce(producer, topic):
# Create multiple produce futures
futures = [
await producer.produce(topic=topic, key=f'key{i}', value=f'value{i}')
for i in range(100)
]
# Flush to ensure messages are in flight
await producer.flush()
# Wait for all deliveries
messages = await asyncio.gather(*futures)
print(f'Delivered {len(messages)} messages')
asyncio.run(batch_produce())
I implemented a similar approach in my production code to collect and send financial contracts.
class KafkaProducer:
def __init__(self, bootstrap_servers: str):
self._producer = AIOProducer({
"bootstrap.servers": bootstrap_servers,
"acks": 1,
"client.id": "producer-1",
"linger.ms": 30,
"batch.num.messages": 2000,
"compression.type": "lz4"
})
async def produce_message(self, topic: str, encoded_payload: bytes, key: str | None = None) -> asyncio.Future:
encoded_key = key.encode("utf-8") if key else None
return await self._producer.produce(
topic=topic,
key=encoded_key,
value=encoded_payload
)
async def flush(self):
await self._producer.flush()
async def close(self):
await self._producer.flush()
await self._producer.close()
logger.info("[Kafka] AIOProducer closed.")
class ContractCollector:
def __init__(self, name: str, contract_provider: ContractProvider, kafka_config: KafkaConfig, kafka_producer: KafkaProducer):
self._name = name
self._contract_provider = contract_provider
self._kafka_topic = kafka_config.kafka_contracts_topic
self._kafka_producer = kafka_producer
async def collect(self) -> list[Contract]:
contracts = await self._contract_provider.get_contracts()
logger.info(f"[{self._name}] Collected {len(contracts)} contracts.")
delivery_futures = []
for contract in contracts:
key = f"{contract.cex_id.id}_{contract.chain_id.id}_{contract.symbol}"
encoded_payload = contract.model_dump_json().encode('utf-8')
future = await self._kafka_producer.produce_message(
topic=self._kafka_topic,
key=key,
encoded_payload=encoded_payload,
)
delivery_futures.append(future)
# await self._kafka_producer.flush()
messages = await asyncio.gather(*delivery_futures)
logger.info(f'Delivered {len(messages)} messages')
return contracts
I have two questions regarding performance and best practices here:
1. Is await producer.produce() inside a loop blocking concurrent execution?
Since produce is an async def method, using await inside the list comprehension (or a for loop) sequentially waits for each message registration.
- If we await it inside the loop, doesn't it defeat the purpose of asyncio.gather(*futures) later?
- Should we instead use asyncio.gather(*[producer.produce(...) for ...]) to schedule all internal queue enqueueing concurrently, and then get the delivery futures also using asyncio.gather(*delivery_futures)?
2. Is explicit await producer.flush() a good practice before asyncio.gather(*futures)?
I noticed a massive difference in behavior when benchmarking ~1,800 messages:
- Without flush(): Iterating and getting futures takes 15–30 ms, but awaiting asyncio.gather(*delivery_futures) blocks for 2.0–2.5 seconds before resolving.
- With flush(): The total execution time drops significantly.
Is calling flush() explicitly the recommended way to force the underlying C-library to trigger queue polling and network delivery immediately in batch scenarios, or is it considered an anti-pattern for high-throughput async pipelines?
Thank you for your help!
Hi everyone, I am using the new confluent_kafka.aio.AIOProducer and trying to find the most efficient way to batch-produce around 1,800 messages at once.
Looking at the official example provided in the documentation:
I implemented a similar approach in my production code to collect and send financial contracts.
I have two questions regarding performance and best practices here:
1. Is await producer.produce() inside a loop blocking concurrent execution?
Since produce is an async def method, using await inside the list comprehension (or a for loop) sequentially waits for each message registration.
2. Is explicit await producer.flush() a good practice before asyncio.gather(*futures)?
I noticed a massive difference in behavior when benchmarking ~1,800 messages:
Is calling flush() explicitly the recommended way to force the underlying C-library to trigger queue polling and network delivery immediately in batch scenarios, or is it considered an anti-pattern for high-throughput async pipelines?
Thank you for your help!