[pull] master from DataDog:master#619
Merged
Merged
Conversation
…g is enabled (#24149) * kafka_consumer: always fetch highwater offsets when cluster monitoring is enabled When enable_cluster_monitoring is true the consumer context count can easily exceed the default max_partition_contexts (500), causing the check to skip highwater offset collection entirely. This silently zeros out kafka.topic.message_rate and stops kafka.broker_offset from being emitted, because _collect_topic_metadata receives an empty highwater_offsets dict. Bypass the context limit guard when cluster monitoring is active so that highwater offsets are always fetched; the existing per-metric context caps in report_highwater_offsets and report_consumer_offsets_and_lag still apply. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: add changelog entry for PR #24149 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: bypass context reporting limit when cluster monitoring is enabled When enable_cluster_monitoring is true, report all consumer lag and highwater offset metrics without capping at max_partition_contexts. Cluster monitoring users need full cluster visibility by design, so capping metric reporting makes no sense in that mode. Uses float('inf') as the reporting limit, which works correctly with the existing int comparisons in report_highwater_offsets and report_consumer_offsets_and_lag (int == inf is False, int >= inf is False, int < inf is True). Also suppresses the misleading "narrow your target" warning in cluster monitoring mode. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…fka_consumer (#23915) * Add consumer-group rebalance, empty-group, and metadata signals Enrich the cluster-monitoring consumer-group collection with signals that cannot be derived from existing tagged metrics: - kafka.consumer_group.rebalancing (1/0): detected via group state (PreparingRebalance/CompletingRebalance) for classic groups and via assignment != target_assignment for KIP-848 consumer-protocol groups. - kafka.consumer_group.empty (1/0): 1 when the group is in the EMPTY state (committed offsets but no active members). Dimensional metadata is added as tags on existing gauges rather than new metrics: partition_assignor, consumer_group_type, and is_simple_consumer_group on consumer_group.members, and group_instance_id (static membership) on consumer_group.member.partitions. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Add changelog entry for PR #23915 * Address review feedback on consumer-group signals - Extract _build_group_meta_tags helper from the collection loop. - Use `is not None` guards for partition_assignor and group_instance_id so empty-string values are not silently dropped. - Emit consumer_group.rebalancing and consumer_group.empty with the same group_meta_tags as consumer_group.members so the sibling gauges share a tag set and can be correlated in dashboards. - Reduce test mock duplication: _collect_groups now reuses seed_mock_kafka_client and a shared _stub_consumer_groups helper. - Add tests for the dimensional-tag omission path and the no-target-assignment rebalance-skip branch. - Name the new tag keys in the README. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Address round-2 review feedback on consumer-group signals - Revert partition_assignor guard to `if assignor:` so KIP-848 and EMPTY-state groups (which report an empty assignor) don't emit a blank-value partition_assignor: tag. Parametrize the absent-tags test to cover both None and "". - Type-hint state_name on _is_group_rebalancing. - Add comments documenting the member-level vs group-level tag-set choice and the EMPTY-state basis for consumer_group.empty. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Add kafka.consumer_group.membership_changes count metric Caches a hash of sorted member IDs per consumer group after each check run. Emits consumer_group.membership_changes (+1 count) whenever the hash differs from the previous run, catching rebalances that complete between two polling intervals and would otherwise be invisible to the rebalancing gauge. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Remove kafka.consumer_group.empty — redundant with consumer_group_state tag The EMPTY state is already visible as consumer_group_state:EMPTY on the rebalancing metric, making a dedicated gauge unnecessary. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix ruff formatting in cluster_metadata.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Address review feedback on consumer-group signals - Extract _load_member_hashes_cache / _save_member_hashes_cache helpers to match the _load_*/_save_* pattern used by every other cache in the class - Use getattr for m.member_id to avoid TypeError on missing/null values - Fix README and changelog: replace "empty-group detection" with accurate description of rebalance detection and membership-change counting; note that empty groups are visible via the consumer_group_state:EMPTY tag - Add three unit tests covering membership_changes: no prior cache (no emit), unchanged members (no emit), changed members (emit once) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Remove consumer-group signals demo compose file Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Update kafka_consumer/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update kafka_consumer/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update kafka_consumer/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update kafka_consumer/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update kafka_consumer/metadata.csv Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update kafka_consumer/metadata.csv Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Address review comments on consumer-group signals PR - Fix _is_group_rebalancing to return True when assignment is None but target_assignment is present (member has target but no current assignment is unambiguous drift); add test covering this case. - Replace comma-joined member-ID hash with json.dumps to prevent delimiter collisions (e.g. ['a,b','c'] and ['a','b,c'] now produce distinct hashes); update expected hash in existing test. - Validate cache decoded from JSON is a dict; discard and continue if not to prevent AttributeError on .get(); add test seeding a list-shaped cache and asserting group gauges still emit. - Update changelog entry to name both new DSM-only metrics explicitly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix ruff formatting in cluster_metadata.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Update changelog entry wording Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com>
) * [kafka_actions] Bound message reads to a start-of-check snapshot read_messages could hang until its global timeout whenever a selective filter matched fewer messages than n_messages_retrieved: once the consumer drained the existing backlog it kept polling the live head, and because a continuously-produced topic almost always delivers a message within the poll window, the "no more messages" (poll == None) exit never fired. Fix consumption to a snapshot of the log taken when the check starts: - Capture each partition's high watermark up front and never yield a message at or beyond it, so messages produced after the check began are excluded and live-tailing is impossible. - Enable enable.partition.eof and stop a partition on its EOF event or when its captured watermark is reached; return as soon as all are drained. - Reduce the default timeout from 20s to 5s (now only a safety net) and surface a hit_timeout stat so a truncated read is distinguishable from a complete one. Verified against a live 10-partition topic: the previously-hanging filtered read now returns in ~0.3s instead of 20s. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> * [kafka_actions] Add changelog entry Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> * [kafka_actions] Fix import grouping for ruff isort Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> --------- Signed-off-by: Piotr Wolski <piotr.wolski@datadoghq.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot] (v2.0.0-alpha.4)
Can you help keep this open source service alive? 💖 Please sponsor : )