Skip to content

Commit 24c3c1b

Browse files
[kafka_consumer] Purge stale broker_timestamps when offsets go backwards (DataDog#23409)
* kafka_consumer: purge stale broker_timestamps when offsets go backwards When a topic's highwater offset decreases (retention wipe, topic recreation, or offset reset), the cached (offset, timestamp) pairs with offsets above the new highwater no longer correspond to real messages. _get_interpolated_timestamp then extrapolates between those stale pairs and fresh ones, pinning estimated_consumer_lag to a wall-clock value equal to how long ago the reset happened (e.g. a steady ~18h lag reading while kafka.consumer_lag shows only ~170 offsets). Fix _add_broker_timestamps to: - drop cached entries with offsets above the new highwater - evict by oldest timestamp rather than smallest offset, so fresh post-reset entries are kept over poisonous stale ones. * Add changelog fragment
1 parent c97f135 commit 24c3c1b

3 files changed

Lines changed: 43 additions & 2 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
When a topic's highwater offset decreases (retention wipe, topic recreation, or offset reset), purge cached (offset, timestamp) pairs whose offset is above the new highwater and switch eviction to oldest-timestamp instead of smallest-offset. Previously, stale pre-reset entries poisoned interpolation and pinned `kafka.estimated_consumer_lag` to a wall-clock value equal to how long ago the reset happened.

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,19 @@ def _mark_messages_retrieved(self, config_id):
259259
def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
260260
for (topic, partition), highwater_offset in highwater_offsets.items():
261261
timestamps = broker_timestamps["{}_{}".format(topic, partition)]
262+
# If the highwater offset went backwards (topic recreated,
263+
# retention wipe, or offset reset) any cached pair with a larger
264+
# offset points to a now-nonexistent message and would poison
265+
# interpolation. Drop those entries.
266+
stale = [o for o in timestamps if o > highwater_offset]
267+
for o in stale:
268+
del timestamps[o]
262269
timestamps[highwater_offset] = time()
263-
# If there's too many timestamps, we delete the oldest
270+
# If there's too many timestamps, we delete the oldest one (by
271+
# timestamp, not by offset — evicting by min offset would discard
272+
# the fresh post-reset entries and keep poisonous stale ones).
264273
if len(timestamps) > self._max_timestamps:
265-
del timestamps[min(timestamps)]
274+
del timestamps[min(timestamps, key=timestamps.get)]
266275

267276
def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key):
268277
"""Saves broker timestamps to persistent cache."""

kafka_consumer/tests/test_unit.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,37 @@ def test_client_init(kafka_instance, check, dd_run_check):
512512
assert check.client.open_consumer.mock_calls == [mock.call("datadog-agent")]
513513

514514

515+
def test_add_broker_timestamps_purges_stale_offsets_on_reset(kafka_instance, check):
516+
# When the highwater offset goes backwards (topic recreated / retention
517+
# wipe / offset reset), cached (offset, timestamp) pairs with offsets
518+
# above the new highwater are stale and must be purged — otherwise they
519+
# poison interpolation and pin estimated_consumer_lag to a wall-clock
520+
# offset equal to how long ago the reset happened.
521+
check = check(kafka_instance)
522+
broker_timestamps = {"topic1_0": {1_000_000: 100.0, 999_000: 99.0}}
523+
check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 170})
524+
525+
timestamps = broker_timestamps["topic1_0"]
526+
assert 1_000_000 not in timestamps
527+
assert 999_000 not in timestamps
528+
assert 170 in timestamps
529+
530+
531+
def test_add_broker_timestamps_evicts_by_oldest_timestamp(kafka_instance, check):
532+
# Eviction must drop the entry with the oldest timestamp, not the smallest
533+
# offset. Evicting by min(offset) would discard fresh post-reset entries
534+
# and keep poisonous ones.
535+
kafka_instance['timestamp_history_size'] = 2
536+
check = check(kafka_instance)
537+
broker_timestamps = {"topic1_0": {500: 50.0, 400: 999.0}}
538+
check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 600})
539+
540+
timestamps = broker_timestamps["topic1_0"]
541+
assert 500 not in timestamps # oldest by timestamp
542+
assert 400 in timestamps
543+
assert 600 in timestamps
544+
545+
515546
def test_resolve_start_offsets():
516547
highwater_offsets = {
517548
("topic1", 0): 100,

0 commit comments

Comments
 (0)