Skip to content

[pull] master from DataDog:master#619

Merged
pull[bot] merged 3 commits into
ConnectionMaster:masterfrom
DataDog:master
Jun 24, 2026
Merged

[pull] master from DataDog:master#619
pull[bot] merged 3 commits into
ConnectionMaster:masterfrom
DataDog:master

Conversation

@pull

@pull pull Bot commented Jun 24, 2026

Copy link
Copy Markdown

See Commits and Changes for more details.


Created by pull[bot] (v2.0.0-alpha.4)

Can you help keep this open source service alive? 💖 Please sponsor : )

piochelepiotr and others added 3 commits June 24, 2026 09:03
…g is enabled (#24149)

* kafka_consumer: always fetch highwater offsets when cluster monitoring is enabled

When enable_cluster_monitoring is true the consumer context count can easily
exceed the default max_partition_contexts (500), causing the check to skip
highwater offset collection entirely. This silently zeros out
kafka.topic.message_rate and stops kafka.broker_offset from being emitted,
because _collect_topic_metadata receives an empty highwater_offsets dict.

Bypass the context limit guard when cluster monitoring is active so that
highwater offsets are always fetched; the existing per-metric context caps in
report_highwater_offsets and report_consumer_offsets_and_lag still apply.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* kafka_consumer: add changelog entry for PR #24149

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* kafka_consumer: bypass context reporting limit when cluster monitoring is enabled

When enable_cluster_monitoring is true, report all consumer lag and highwater
offset metrics without capping at max_partition_contexts. Cluster monitoring
users need full cluster visibility by design, so capping metric reporting makes
no sense in that mode.

Uses float('inf') as the reporting limit, which works correctly with the
existing int comparisons in report_highwater_offsets and
report_consumer_offsets_and_lag (int == inf is False, int >= inf is False,
int < inf is True). Also suppresses the misleading "narrow your target" warning
in cluster monitoring mode.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…fka_consumer (#23915)

* Add consumer-group rebalance, empty-group, and metadata signals

Enrich the cluster-monitoring consumer-group collection with signals that
cannot be derived from existing tagged metrics:

- kafka.consumer_group.rebalancing (1/0): detected via group state
  (PreparingRebalance/CompletingRebalance) for classic groups and via
  assignment != target_assignment for KIP-848 consumer-protocol groups.
- kafka.consumer_group.empty (1/0): 1 when the group is in the EMPTY state
  (committed offsets but no active members).

Dimensional metadata is added as tags on existing gauges rather than new
metrics: partition_assignor, consumer_group_type, and is_simple_consumer_group
on consumer_group.members, and group_instance_id (static membership) on
consumer_group.member.partitions.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* Add changelog entry for PR #23915

* Address review feedback on consumer-group signals

- Extract _build_group_meta_tags helper from the collection loop.
- Use `is not None` guards for partition_assignor and group_instance_id so
  empty-string values are not silently dropped.
- Emit consumer_group.rebalancing and consumer_group.empty with the same
  group_meta_tags as consumer_group.members so the sibling gauges share a
  tag set and can be correlated in dashboards.
- Reduce test mock duplication: _collect_groups now reuses
  seed_mock_kafka_client and a shared _stub_consumer_groups helper.
- Add tests for the dimensional-tag omission path and the
  no-target-assignment rebalance-skip branch.
- Name the new tag keys in the README.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* Address round-2 review feedback on consumer-group signals

- Revert partition_assignor guard to `if assignor:` so KIP-848 and
  EMPTY-state groups (which report an empty assignor) don't emit a
  blank-value partition_assignor: tag. Parametrize the absent-tags test
  to cover both None and "".
- Type-hint state_name on _is_group_rebalancing.
- Add comments documenting the member-level vs group-level tag-set choice
  and the EMPTY-state basis for consumer_group.empty.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* Add kafka.consumer_group.membership_changes count metric

Caches a hash of sorted member IDs per consumer group after each check
run. Emits consumer_group.membership_changes (+1 count) whenever the
hash differs from the previous run, catching rebalances that complete
between two polling intervals and would otherwise be invisible to the
rebalancing gauge.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Remove kafka.consumer_group.empty — redundant with consumer_group_state tag

The EMPTY state is already visible as consumer_group_state:EMPTY on the
rebalancing metric, making a dedicated gauge unnecessary.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix ruff formatting in cluster_metadata.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Address review feedback on consumer-group signals

- Extract _load_member_hashes_cache / _save_member_hashes_cache helpers to
  match the _load_*/_save_* pattern used by every other cache in the class
- Use getattr for m.member_id to avoid TypeError on missing/null values
- Fix README and changelog: replace "empty-group detection" with accurate
  description of rebalance detection and membership-change counting; note
  that empty groups are visible via the consumer_group_state:EMPTY tag
- Add three unit tests covering membership_changes: no prior cache (no
  emit), unchanged members (no emit), changed members (emit once)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Remove consumer-group signals demo compose file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Update kafka_consumer/README.md

Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>

* Update kafka_consumer/README.md

Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>

* Update kafka_consumer/README.md

Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>

* Update kafka_consumer/README.md

Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>

* Update kafka_consumer/metadata.csv

Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>

* Update kafka_consumer/metadata.csv

Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>

* Address review comments on consumer-group signals PR

- Fix _is_group_rebalancing to return True when assignment is None but
  target_assignment is present (member has target but no current
  assignment is unambiguous drift); add test covering this case.
- Replace comma-joined member-ID hash with json.dumps to prevent
  delimiter collisions (e.g. ['a,b','c'] and ['a','b,c'] now produce
  distinct hashes); update expected hash in existing test.
- Validate cache decoded from JSON is a dict; discard and continue if
  not to prevent AttributeError on .get(); add test seeding a list-shaped
  cache and asserting group gauges still emit.
- Update changelog entry to name both new DSM-only metrics explicitly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix ruff formatting in cluster_metadata.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Update changelog entry wording

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>
)

* [kafka_actions] Bound message reads to a start-of-check snapshot

read_messages could hang until its global timeout whenever a selective
filter matched fewer messages than n_messages_retrieved: once the consumer
drained the existing backlog it kept polling the live head, and because a
continuously-produced topic almost always delivers a message within the
poll window, the "no more messages" (poll == None) exit never fired.

Fix consumption to a snapshot of the log taken when the check starts:

- Capture each partition's high watermark up front and never yield a
  message at or beyond it, so messages produced after the check began are
  excluded and live-tailing is impossible.
- Enable enable.partition.eof and stop a partition on its EOF event or when
  its captured watermark is reached; return as soon as all are drained.
- Reduce the default timeout from 20s to 5s (now only a safety net) and
  surface a hit_timeout stat so a truncated read is distinguishable from a
  complete one.

Verified against a live 10-partition topic: the previously-hanging filtered
read now returns in ~0.3s instead of 20s.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com>

* [kafka_actions] Add changelog entry

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com>

* [kafka_actions] Fix import grouping for ruff isort

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com>

---------

Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@pull pull Bot locked and limited conversation to collaborators Jun 24, 2026
@pull pull Bot added the ⤵️ pull label Jun 24, 2026
@pull pull Bot merged commit bd332d9 into ConnectionMaster:master Jun 24, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant