Skip to content

Commit bd332d9

Browse files
[kafka_actions] Bound message reads to a start-of-check snapshot (#24162)
* [kafka_actions] Bound message reads to a start-of-check snapshot read_messages could hang until its global timeout whenever a selective filter matched fewer messages than n_messages_retrieved: once the consumer drained the existing backlog it kept polling the live head, and because a continuously-produced topic almost always delivers a message within the poll window, the "no more messages" (poll == None) exit never fired. Fix consumption to a snapshot of the log taken when the check starts: - Capture each partition's high watermark up front and never yield a message at or beyond it, so messages produced after the check began are excluded and live-tailing is impossible. - Enable enable.partition.eof and stop a partition on its EOF event or when its captured watermark is reached; return as soon as all are drained. - Reduce the default timeout from 20s to 5s (now only a safety net) and surface a hit_timeout stat so a truncated read is distinguishable from a complete one. Verified against a live 10-partition topic: the previously-hanging filtered read now returns in ~0.3s instead of 20s. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> * [kafka_actions] Add changelog entry Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> * [kafka_actions] Fix import grouping for ruff isort Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> --------- Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 534afff commit bd332d9

6 files changed

Lines changed: 237 additions & 113 deletions

File tree

kafka_actions/assets/configuration/spec.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,10 @@ files:
205205
- name: read_messages
206206
description: |
207207
Configuration for reading messages from Kafka topics.
208-
Messages are streamed in real-time and sent to Datadog as they arrive.
209-
The check has a 20-second timeout for the entire operation.
208+
Only messages already present in the log when the check starts are read; the
209+
per-partition high watermark is captured up front and consumption stops once every
210+
partition is drained, so messages produced after the check began are never returned.
211+
A 5-second timeout bounds the entire operation as a safety net.
210212
Supports JSON, BSON, Protobuf, and Avro with optional Schema Registry integration.
211213
Filtering is applied after deserialization using jq-style expressions.
212214
fleet_configurable: true
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix `read_messages` hanging until the global timeout when a filter matched fewer messages than `n_messages_retrieved`. Consumption is now bounded to a snapshot of the log taken when the check starts (per-partition high watermark + `enable.partition.eof`), the default timeout is reduced from 20s to 5s, and a `hit_timeout` stat distinguishes a truncated read from a complete one.

kafka_actions/datadog_checks/kafka_actions/check.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ def _action_read_messages(self):
267267
start_timestamp = config.get('start_timestamp')
268268
n_messages_retrieved = config.get('n_messages_retrieved', 10)
269269
max_scanned_messages = config.get('max_scanned_messages', 1000)
270-
timeout_ms = config.get('timeout_ms', 20000)
270+
timeout_ms = config.get('timeout_ms', 5000)
271271
filter_expression = config.get('filter', '')
272272
consumer_group_id = config.get('consumer_group_id') or f"datadog-agent-{self.remote_config_id}"
273273

@@ -326,6 +326,8 @@ def _action_read_messages(self):
326326
if scanned_count >= max_scanned_messages and sent_count < n_messages_retrieved:
327327
hit_scan_limit = True
328328

329+
hit_timeout = self.kafka_client.hit_timeout and not hit_retrieved_limit and not hit_scan_limit
330+
329331
elapsed_time = time.time() - start_time
330332

331333
stats = {
@@ -337,6 +339,7 @@ def _action_read_messages(self):
337339
'messages_filtered_out': filtered_out_count,
338340
'hit_scan_limit': hit_scan_limit,
339341
'hit_retrieved_limit': hit_retrieved_limit,
342+
'hit_timeout': hit_timeout,
340343
'elapsed_time_seconds': round(elapsed_time, 3),
341344
'n_messages_retrieved': n_messages_retrieved,
342345
'max_scanned_messages': max_scanned_messages,
@@ -358,6 +361,14 @@ def _action_read_messages(self):
358361
sent_count,
359362
)
360363

364+
if hit_timeout:
365+
self.log.warning(
366+
"Hit the %dms timeout after scanning %d messages and retrieving %d. Result may be incomplete.",
367+
timeout_ms,
368+
scanned_count,
369+
sent_count,
370+
)
371+
361372
return stats
362373

363374
def _evaluate_filter(self, filter_expression: str, deserialized_msg: DeserializedMessage) -> bool:

kafka_actions/datadog_checks/kafka_actions/data/conf.yaml.example

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,10 @@ instances:
197197

198198
## @param read_messages - mapping - optional
199199
## Configuration for reading messages from Kafka topics.
200-
## Messages are streamed in real-time and sent to Datadog as they arrive.
201-
## The check has a 20-second timeout for the entire operation.
200+
## Only messages already present in the log when the check starts are read; the
201+
## per-partition high watermark is captured up front and consumption stops once every
202+
## partition is drained, so messages produced after the check began are never returned.
203+
## A 5-second timeout bounds the entire operation as a safety net.
202204
## Supports JSON, BSON, Protobuf, and Avro with optional Schema Registry integration.
203205
## Filtering is applied after deserialization using jq-style expressions.
204206
#

kafka_actions/datadog_checks/kafka_actions/kafka_client.py

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ def __init__(self, config: KafkaActionsConfig, log):
3232
self.consumer = None
3333
self.producer = None
3434
self.admin_client = None
35+
# True when consume_messages stopped on the timeout rather than draining all partitions.
36+
self.hit_timeout = False
3537

3638
def _get_authentication_config(self) -> dict[str, Any]:
3739
"""Build authentication configuration for librdkafka."""
@@ -134,6 +136,8 @@ def get_consumer(self, group_id: str = 'kafka_actions') -> Consumer:
134136
'group.id': group_id,
135137
'auto.offset.reset': 'earliest',
136138
'enable.auto.commit': False,
139+
# Signal end-of-partition via a _PARTITION_EOF event so we stop once drained.
140+
'enable.partition.eof': True,
137141
}
138142
)
139143
self.consumer = Consumer(config)
@@ -193,29 +197,34 @@ def consume_messages(
193197
start_offset: int = -2,
194198
start_timestamp: int | None = None,
195199
max_messages: int = 1000,
196-
timeout_ms: int = 30000,
200+
timeout_ms: int = 5000,
197201
group_id: str = 'kafka_actions',
198202
):
199-
"""Consume messages from a Kafka topic, yielding them as they arrive.
203+
"""Consume the messages already present in a topic, yielding them as they are read.
200204
201-
This is a generator that yields messages in real-time as they're consumed,
202-
allowing for immediate processing and sending to Datadog.
205+
The per-partition high watermark is captured before consumption begins and no message
206+
at or beyond it is yielded, so messages produced after the check starts are never
207+
returned and the generator can't tail a live topic. A partition stops on EOF or when its
208+
captured watermark is reached; the generator returns once all are drained. ``timeout_ms``
209+
is only a safety net.
203210
204211
Args:
205212
topic: Topic name
206213
partition: Partition number (-1 for all partitions)
207214
start_offset: Starting offset (-1 for latest, -2 for earliest)
208215
start_timestamp: Starting timestamp in milliseconds since epoch. When set, start_offset is ignored.
209216
max_messages: Maximum messages to consume
210-
timeout_ms: Global timeout in milliseconds for the entire consumption
217+
timeout_ms: Safety-net timeout in milliseconds for the entire consumption
211218
group_id: Consumer group ID
212219
213220
Yields:
214-
Kafka messages as they arrive
221+
Kafka messages that existed in the log when consumption began
215222
"""
216223
consumer = self.get_consumer(group_id)
224+
admin = self.get_admin_client()
217225
start_time = time.time()
218226
global_timeout_s = timeout_ms / 1000.0
227+
self.hit_timeout = False
219228

220229
try:
221230
if partition == -1:
@@ -227,71 +236,113 @@ def consume_messages(
227236
else:
228237
partition_ids = [partition]
229238

230-
if start_timestamp is not None:
231-
# Resolve timestamp to per-partition offsets using offsets_for_times.
232-
timestamp_partitions = [TopicPartition(topic, p, start_timestamp) for p in partition_ids]
233-
partitions = consumer.offsets_for_times(timestamp_partitions, timeout=10)
234-
for tp in partitions:
235-
if tp.offset != -1:
236-
self.log.debug(
237-
"Partition %d: timestamp %d resolved to offset %d",
238-
tp.partition,
239-
start_timestamp,
240-
tp.offset,
241-
)
242-
elif start_offset == -1:
243-
# For "latest" offset, seek back from the high watermark to read the last N existing messages.
244-
# Use AdminClient.list_offsets to fetch all high watermarks in a single batched call.
245-
admin = self.get_admin_client()
246-
offset_request = {TopicPartition(topic, p): OffsetSpec.latest() for p in partition_ids}
247-
futures = admin.list_offsets(offset_request, request_timeout=10)
248-
249-
partitions = []
250-
for tp, future in futures.items():
251-
result = future.result()
252-
seek_offset = max(0, result.offset - max_messages)
253-
partitions.append(TopicPartition(topic, tp.partition, seek_offset))
254-
self.log.debug("Partition %d: high=%d, seeking to %d", tp.partition, result.offset, seek_offset)
255-
else:
256-
partitions = [TopicPartition(topic, p, start_offset) for p in partition_ids]
239+
# Snapshot each partition's high watermark; we never read at or beyond it.
240+
end_request = {TopicPartition(topic, p): OffsetSpec.latest() for p in partition_ids}
241+
end_futures = admin.list_offsets(end_request, request_timeout=10)
242+
end_offsets = {tp.partition: future.result().offset for tp, future in end_futures.items()}
257243

258-
self.log.debug("Assigning partitions: %s", partitions)
244+
start_offsets = self._resolve_start_offsets(
245+
consumer, admin, topic, partition_ids, start_offset, start_timestamp, max_messages, end_offsets
246+
)
247+
248+
# Assign only partitions that have messages in [start, high_watermark).
249+
partitions = []
250+
active = set()
251+
for p in partition_ids:
252+
start = start_offsets.get(p, 0)
253+
end = end_offsets.get(p, 0)
254+
if start < end:
255+
partitions.append(TopicPartition(topic, p, start))
256+
active.add(p)
257+
else:
258+
self.log.debug("Partition %d: nothing to read (start=%d, high=%d)", p, start, end)
259+
260+
if not partitions:
261+
self.log.debug("No messages to read for topic %s in [start, high-watermark)", topic)
262+
return
263+
264+
self.log.debug("Assigning partitions: %s (high watermarks: %s)", partitions, end_offsets)
259265
consumer.assign(partitions)
260266

261267
consumed = 0
262268

263-
while consumed < max_messages:
269+
while consumed < max_messages and active:
264270
elapsed = time.time() - start_time
265271
remaining_timeout = global_timeout_s - elapsed
266272

267273
if remaining_timeout <= 0:
274+
self.hit_timeout = True
268275
self.log.debug("Global timeout reached after %d messages", consumed)
269276
break
270277

271278
poll_timeout = min(1.0, remaining_timeout)
272279
msg = consumer.poll(timeout=poll_timeout)
273280

274281
if msg is None:
275-
self.log.debug("Poll returned None (no more messages available), stopping consumption")
276-
break
282+
# End-of-data arrives as an EOF event, not None; keep polling until drained.
283+
continue
277284

278285
if msg.error():
279286
if msg.error().code() == KafkaError._PARTITION_EOF:
280-
self.log.debug("Reached end of partition")
287+
active.discard(msg.partition())
281288
continue
282289
else:
283290
raise KafkaException(msg.error())
284291

292+
p = msg.partition()
293+
# Never surface a message at or beyond the captured high watermark.
294+
if p not in active or msg.offset() >= end_offsets.get(p, 0):
295+
active.discard(p)
296+
continue
297+
285298
yield msg
286299
consumed += 1
287300

301+
if msg.offset() >= end_offsets[p] - 1:
302+
active.discard(p)
303+
288304
self.log.debug("Consumed %d messages from topic %s in %.2fs", consumed, topic, time.time() - start_time)
289305

290306
finally:
291307
if consumer:
292308
consumer.close()
293309
self.consumer = None
294310

311+
def _resolve_start_offsets(
312+
self,
313+
consumer,
314+
admin,
315+
topic: str,
316+
partition_ids: list[int],
317+
start_offset: int,
318+
start_timestamp: int | None,
319+
max_messages: int,
320+
end_offsets: dict[int, int],
321+
) -> dict[int, int]:
322+
"""Return a {partition: start_offset} map. A start at or beyond the high watermark
323+
means there is nothing to read for that partition."""
324+
if start_timestamp is not None:
325+
# An offset < 0 means the timestamp is past the end of the log: nothing to read.
326+
timestamp_partitions = [TopicPartition(topic, p, start_timestamp) for p in partition_ids]
327+
resolved = consumer.offsets_for_times(timestamp_partitions, timeout=10)
328+
start_offsets = {}
329+
for tp in resolved:
330+
end = end_offsets.get(tp.partition, 0)
331+
start_offsets[tp.partition] = tp.offset if tp.offset is not None and tp.offset >= 0 else end
332+
return start_offsets
333+
334+
if start_offset == -1:
335+
# "latest": seek back from the high watermark to read the last N existing messages.
336+
return {p: max(0, end_offsets.get(p, 0) - max_messages) for p in partition_ids}
337+
338+
if start_offset == -2:
339+
# "earliest": use the low watermark as the numeric start.
340+
low_request = {TopicPartition(topic, p): OffsetSpec.earliest() for p in partition_ids}
341+
low_futures = admin.list_offsets(low_request, request_timeout=10)
342+
return {tp.partition: future.result().offset for tp, future in low_futures.items()}
343+
344+
return dict.fromkeys(partition_ids, start_offset)
345+
295346
def produce_message(
296347
self,
297348
topic: str,

0 commit comments

Comments
 (0)