Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Agent Check: Airflow

## Overview

<div class="alert alert-info">
<a href="https://docs.datadoghq.com/data_observability/jobs_monitoring/">Data Observability: Jobs Monitoring</a> provides out-of-the-box tracing for Airflow DAG runs, helping you quickly troubleshoot problematic tasks, correlate DAG runs to logs, and understand complex pipelines with data lineage across DAGs.<br/><br/>
<strong>Note</strong>: This page covers only the documentation for collecting Airflow integration metrics and logs using the Datadog Agent.
</div>

## Overview

The Datadog Agent collects many metrics from Airflow, including those for:

- DAGs (Directed Acyclic Graphs): Number of DAG processes, DAG bag size, etc.
Expand Down
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/23430.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[kafka_actions] Remove unused MessageFilter class
172 changes: 0 additions & 172 deletions kafka_actions/datadog_checks/kafka_actions/message_filter.py

This file was deleted.

1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/23409.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When a topic's highwater offset decreases (retention wipe, topic recreation, or offset reset), purge cached (offset, timestamp) pairs whose offset is above the new highwater and switch eviction to oldest-timestamp instead of smallest-offset. Previously, stale pre-reset entries poisoned interpolation and pinned `kafka.estimated_consumer_lag` to a wall-clock value equal to how long ago the reset happened.
13 changes: 11 additions & 2 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,19 @@ def _mark_messages_retrieved(self, config_id):
def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
for (topic, partition), highwater_offset in highwater_offsets.items():
timestamps = broker_timestamps["{}_{}".format(topic, partition)]
# If the highwater offset went backwards (topic recreated,
# retention wipe, or offset reset) any cached pair with a larger
# offset points to a now-nonexistent message and would poison
# interpolation. Drop those entries.
stale = [o for o in timestamps if o > highwater_offset]
for o in stale:
del timestamps[o]
timestamps[highwater_offset] = time()
# If there's too many timestamps, we delete the oldest
# If there's too many timestamps, we delete the oldest one (by
# timestamp, not by offset — evicting by min offset would discard
# the fresh post-reset entries and keep poisonous stale ones).
if len(timestamps) > self._max_timestamps:
del timestamps[min(timestamps)]
del timestamps[min(timestamps, key=timestamps.get)]

def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key):
"""Saves broker timestamps to persistent cache."""
Expand Down
31 changes: 31 additions & 0 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,37 @@ def test_client_init(kafka_instance, check, dd_run_check):
assert check.client.open_consumer.mock_calls == [mock.call("datadog-agent")]


def test_add_broker_timestamps_purges_stale_offsets_on_reset(kafka_instance, check):
# When the highwater offset goes backwards (topic recreated / retention
# wipe / offset reset), cached (offset, timestamp) pairs with offsets
# above the new highwater are stale and must be purged — otherwise they
# poison interpolation and pin estimated_consumer_lag to a wall-clock
# offset equal to how long ago the reset happened.
check = check(kafka_instance)
broker_timestamps = {"topic1_0": {1_000_000: 100.0, 999_000: 99.0}}
check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 170})

timestamps = broker_timestamps["topic1_0"]
assert 1_000_000 not in timestamps
assert 999_000 not in timestamps
assert 170 in timestamps


def test_add_broker_timestamps_evicts_by_oldest_timestamp(kafka_instance, check):
# Eviction must drop the entry with the oldest timestamp, not the smallest
# offset. Evicting by min(offset) would discard fresh post-reset entries
# and keep poisonous ones.
kafka_instance['timestamp_history_size'] = 2
check = check(kafka_instance)
broker_timestamps = {"topic1_0": {500: 50.0, 400: 999.0}}
check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 600})

timestamps = broker_timestamps["topic1_0"]
assert 500 not in timestamps # oldest by timestamp
assert 400 in timestamps
assert 600 in timestamps


def test_resolve_start_offsets():
highwater_offsets = {
("topic1", 0): 100,
Expand Down
Loading
Loading