Capture kafka producer and consumer configs#10697
Capture kafka producer and consumer configs#10697gh-worker-dd-mergequeue-cf854d[bot] merged 12 commits intomasterfrom
Conversation
Kafka / producer-benchmarkParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 3 metrics, 0 unstable metrics. See unchanged results
|
Kafka / consumer-benchmarkParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 3 metrics, 0 unstable metrics. See unchanged results
|
BenchmarksStartupParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 64 metrics, 7 unstable metrics. Startup time reports for insecure-bankgantt
title insecure-bank - global startup overhead: candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section tracing
Agent [baseline] (1.06 s) : 0, 1059735
Total [baseline] (8.893 s) : 0, 8892642
Agent [candidate] (1.06 s) : 0, 1059881
Total [candidate] (8.845 s) : 0, 8844844
section iast
Agent [baseline] (1.234 s) : 0, 1234136
Total [baseline] (9.61 s) : 0, 9609739
Agent [candidate] (1.229 s) : 0, 1228888
Total [candidate] (9.551 s) : 0, 9550815
gantt
title insecure-bank - break down per module: candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section tracing
crashtracking [baseline] (1.193 ms) : 0, 1193
crashtracking [candidate] (1.188 ms) : 0, 1188
BytebuddyAgent [baseline] (628.854 ms) : 0, 628854
BytebuddyAgent [candidate] (629.244 ms) : 0, 629244
AgentMeter [baseline] (29.449 ms) : 0, 29449
AgentMeter [candidate] (29.268 ms) : 0, 29268
GlobalTracer [baseline] (258.531 ms) : 0, 258531
GlobalTracer [candidate] (258.25 ms) : 0, 258250
AppSec [baseline] (31.963 ms) : 0, 31963
AppSec [candidate] (31.981 ms) : 0, 31981
Debugger [baseline] (59.887 ms) : 0, 59887
Debugger [candidate] (60.12 ms) : 0, 60120
Remote Config [baseline] (587.284 µs) : 0, 587
Remote Config [candidate] (592.561 µs) : 0, 593
Telemetry [baseline] (8.036 ms) : 0, 8036
Telemetry [candidate] (8.925 ms) : 0, 8925
Flare Poller [baseline] (5.062 ms) : 0, 5062
Flare Poller [candidate] (4.294 ms) : 0, 4294
section iast
crashtracking [baseline] (1.2 ms) : 0, 1200
crashtracking [candidate] (1.192 ms) : 0, 1192
BytebuddyAgent [baseline] (801.852 ms) : 0, 801852
BytebuddyAgent [candidate] (798.262 ms) : 0, 798262
AgentMeter [baseline] (11.499 ms) : 0, 11499
AgentMeter [candidate] (11.283 ms) : 0, 11283
GlobalTracer [baseline] (247.829 ms) : 0, 247829
GlobalTracer [candidate] (247.972 ms) : 0, 247972
IAST [baseline] (25.325 ms) : 0, 25325
IAST [candidate] (25.259 ms) : 0, 25259
AppSec [baseline] (26.551 ms) : 0, 26551
AppSec [candidate] (26.457 ms) : 0, 26457
Debugger [baseline] (69.598 ms) : 0, 69598
Debugger [candidate] (67.352 ms) : 0, 67352
Remote Config [baseline] (535.609 µs) : 0, 536
Remote Config [candidate] (520.884 µs) : 0, 521
Telemetry [baseline] (9.833 ms) : 0, 9833
Telemetry [candidate] (10.69 ms) : 0, 10690
Flare Poller [baseline] (3.572 ms) : 0, 3572
Flare Poller [candidate] (3.742 ms) : 0, 3742
Startup time reports for petclinicgantt
title petclinic - global startup overhead: candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section tracing
Agent [baseline] (1.058 s) : 0, 1058097
Total [baseline] (11.106 s) : 0, 11105534
Agent [candidate] (1.057 s) : 0, 1057043
Total [candidate] (11.041 s) : 0, 11040947
section appsec
Agent [baseline] (1.246 s) : 0, 1246192
Total [baseline] (11.175 s) : 0, 11174925
Agent [candidate] (1.251 s) : 0, 1251276
Total [candidate] (11.216 s) : 0, 11215809
section iast
Agent [baseline] (1.228 s) : 0, 1228141
Total [baseline] (11.353 s) : 0, 11353174
Agent [candidate] (1.237 s) : 0, 1237439
Total [candidate] (11.3 s) : 0, 11299851
section profiling
Agent [baseline] (1.185 s) : 0, 1185284
Total [baseline] (11.032 s) : 0, 11032358
Agent [candidate] (1.191 s) : 0, 1190642
Total [candidate] (11.103 s) : 0, 11103165
gantt
title petclinic - break down per module: candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section tracing
crashtracking [baseline] (1.198 ms) : 0, 1198
crashtracking [candidate] (1.201 ms) : 0, 1201
BytebuddyAgent [baseline] (628.739 ms) : 0, 628739
BytebuddyAgent [candidate] (628.373 ms) : 0, 628373
AgentMeter [baseline] (29.206 ms) : 0, 29206
AgentMeter [candidate] (29.279 ms) : 0, 29279
GlobalTracer [baseline] (257.295 ms) : 0, 257295
GlobalTracer [candidate] (257.506 ms) : 0, 257506
AppSec [baseline] (31.746 ms) : 0, 31746
AppSec [candidate] (31.801 ms) : 0, 31801
Debugger [baseline] (60.267 ms) : 0, 60267
Debugger [candidate] (60.507 ms) : 0, 60507
Remote Config [baseline] (591.735 µs) : 0, 592
Remote Config [candidate] (588.266 µs) : 0, 588
Telemetry [baseline] (7.945 ms) : 0, 7945
Telemetry [candidate] (8.082 ms) : 0, 8082
Flare Poller [baseline] (5.081 ms) : 0, 5081
Flare Poller [candidate] (3.539 ms) : 0, 3539
section appsec
crashtracking [baseline] (1.197 ms) : 0, 1197
crashtracking [candidate] (1.204 ms) : 0, 1204
BytebuddyAgent [baseline] (657.509 ms) : 0, 657509
BytebuddyAgent [candidate] (659.222 ms) : 0, 659222
AgentMeter [baseline] (11.953 ms) : 0, 11953
AgentMeter [candidate] (12.088 ms) : 0, 12088
GlobalTracer [baseline] (258.034 ms) : 0, 258034
GlobalTracer [candidate] (259.811 ms) : 0, 259811
IAST [baseline] (24.154 ms) : 0, 24154
IAST [candidate] (24.392 ms) : 0, 24392
AppSec [baseline] (178.269 ms) : 0, 178269
AppSec [candidate] (178.853 ms) : 0, 178853
Debugger [baseline] (66.212 ms) : 0, 66212
Debugger [candidate] (66.61 ms) : 0, 66610
Remote Config [baseline] (630.692 µs) : 0, 631
Remote Config [candidate] (636.983 µs) : 0, 637
Telemetry [baseline] (8.385 ms) : 0, 8385
Telemetry [candidate] (8.405 ms) : 0, 8405
Flare Poller [baseline] (3.617 ms) : 0, 3617
Flare Poller [candidate] (3.637 ms) : 0, 3637
section iast
crashtracking [baseline] (1.183 ms) : 0, 1183
crashtracking [candidate] (1.193 ms) : 0, 1193
BytebuddyAgent [baseline] (796.604 ms) : 0, 796604
BytebuddyAgent [candidate] (802.544 ms) : 0, 802544
AgentMeter [baseline] (11.302 ms) : 0, 11302
AgentMeter [candidate] (11.448 ms) : 0, 11448
GlobalTracer [baseline] (247.436 ms) : 0, 247436
GlobalTracer [candidate] (249.368 ms) : 0, 249368
IAST [baseline] (25.288 ms) : 0, 25288
IAST [candidate] (25.547 ms) : 0, 25547
AppSec [baseline] (26.547 ms) : 0, 26547
AppSec [candidate] (26.77 ms) : 0, 26770
Debugger [baseline] (70.66 ms) : 0, 70660
Debugger [candidate] (71.297 ms) : 0, 71297
Remote Config [baseline] (530.777 µs) : 0, 531
Remote Config [candidate] (533.245 µs) : 0, 533
Telemetry [baseline] (9.189 ms) : 0, 9189
Telemetry [candidate] (9.205 ms) : 0, 9205
Flare Poller [baseline] (3.333 ms) : 0, 3333
Flare Poller [candidate] (3.391 ms) : 0, 3391
section profiling
crashtracking [baseline] (1.169 ms) : 0, 1169
crashtracking [candidate] (1.165 ms) : 0, 1165
BytebuddyAgent [baseline] (684.394 ms) : 0, 684394
BytebuddyAgent [candidate] (687.634 ms) : 0, 687634
AgentMeter [baseline] (8.649 ms) : 0, 8649
AgentMeter [candidate] (8.636 ms) : 0, 8636
GlobalTracer [baseline] (215.806 ms) : 0, 215806
GlobalTracer [candidate] (216.967 ms) : 0, 216967
AppSec [baseline] (32.312 ms) : 0, 32312
AppSec [candidate] (32.495 ms) : 0, 32495
Debugger [baseline] (65.6 ms) : 0, 65600
Debugger [candidate] (65.521 ms) : 0, 65521
Remote Config [baseline] (563.624 µs) : 0, 564
Remote Config [candidate] (563.523 µs) : 0, 564
Telemetry [baseline] (8.513 ms) : 0, 8513
Telemetry [candidate] (7.759 ms) : 0, 7759
Flare Poller [baseline] (3.432 ms) : 0, 3432
Flare Poller [candidate] (4.302 ms) : 0, 4302
ProfilingAgent [baseline] (93.826 ms) : 0, 93826
ProfilingAgent [candidate] (94.619 ms) : 0, 94619
Profiling [baseline] (94.393 ms) : 0, 94393
Profiling [candidate] (95.186 ms) : 0, 95186
LoadParameters
See matching parameters
SummaryFound 2 performance improvements and 4 performance regressions! Performance is the same for 14 metrics, 16 unstable metrics.
Request duration reports for insecure-bankgantt
title insecure-bank - request duration [CI 0.99] : candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section baseline
no_agent (1.184 ms) : 1173, 1196
. : milestone, 1184,
iast (3.219 ms) : 3176, 3262
. : milestone, 3219,
iast_FULL (5.873 ms) : 5814, 5932
. : milestone, 5873,
iast_GLOBAL (3.58 ms) : 3524, 3635
. : milestone, 3580,
profiling (2.4 ms) : 2369, 2430
. : milestone, 2400,
tracing (1.769 ms) : 1754, 1783
. : milestone, 1769,
section candidate
no_agent (1.204 ms) : 1192, 1216
. : milestone, 1204,
iast (3.069 ms) : 3028, 3111
. : milestone, 3069,
iast_FULL (5.859 ms) : 5801, 5917
. : milestone, 5859,
iast_GLOBAL (3.659 ms) : 3598, 3720
. : milestone, 3659,
profiling (2.083 ms) : 2063, 2104
. : milestone, 2083,
tracing (1.786 ms) : 1771, 1800
. : milestone, 1786,
Request duration reports for petclinicgantt
title petclinic - request duration [CI 0.99] : candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section baseline
no_agent (17.168 ms) : 16995, 17341
. : milestone, 17168,
appsec (18.38 ms) : 18191, 18570
. : milestone, 18380,
code_origins (17.745 ms) : 17566, 17924
. : milestone, 17745,
iast (17.891 ms) : 17709, 18072
. : milestone, 17891,
profiling (18.513 ms) : 18329, 18697
. : milestone, 18513,
tracing (17.647 ms) : 17471, 17824
. : milestone, 17647,
section candidate
no_agent (19.175 ms) : 18976, 19373
. : milestone, 19175,
appsec (18.386 ms) : 18201, 18572
. : milestone, 18386,
code_origins (18.845 ms) : 18653, 19037
. : milestone, 18845,
iast (17.539 ms) : 17365, 17712
. : milestone, 17539,
profiling (19.437 ms) : 19241, 19633
. : milestone, 19437,
tracing (17.836 ms) : 17657, 18014
. : milestone, 17836,
DacapoParameters
See matching parameters
SummaryFound 0 performance improvements and 0 performance regressions! Performance is the same for 11 metrics, 1 unstable metrics. Execution time for biojavagantt
title biojava - execution time [CI 0.99] : candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section baseline
no_agent (15.348 s) : 15348000, 15348000
. : milestone, 15348000,
appsec (14.802 s) : 14802000, 14802000
. : milestone, 14802000,
iast (18.143 s) : 18143000, 18143000
. : milestone, 18143000,
iast_GLOBAL (17.665 s) : 17665000, 17665000
. : milestone, 17665000,
profiling (14.954 s) : 14954000, 14954000
. : milestone, 14954000,
tracing (14.838 s) : 14838000, 14838000
. : milestone, 14838000,
section candidate
no_agent (15.586 s) : 15586000, 15586000
. : milestone, 15586000,
appsec (14.521 s) : 14521000, 14521000
. : milestone, 14521000,
iast (18.416 s) : 18416000, 18416000
. : milestone, 18416000,
iast_GLOBAL (17.833 s) : 17833000, 17833000
. : milestone, 17833000,
profiling (15.031 s) : 15031000, 15031000
. : milestone, 15031000,
tracing (15.13 s) : 15130000, 15130000
. : milestone, 15130000,
Execution time for tomcatgantt
title tomcat - execution time [CI 0.99] : candidate=1.61.0-SNAPSHOT~7e9aff8638, baseline=1.61.0-SNAPSHOT~2604968b95
dateFormat X
axisFormat %s
section baseline
no_agent (1.47 ms) : 1458, 1481
. : milestone, 1470,
appsec (3.724 ms) : 3509, 3939
. : milestone, 3724,
iast (2.247 ms) : 2179, 2316
. : milestone, 2247,
iast_GLOBAL (2.285 ms) : 2216, 2353
. : milestone, 2285,
profiling (2.076 ms) : 2022, 2131
. : milestone, 2076,
tracing (2.065 ms) : 2012, 2118
. : milestone, 2065,
section candidate
no_agent (1.469 ms) : 1457, 1480
. : milestone, 1469,
appsec (3.798 ms) : 3579, 4018
. : milestone, 3798,
iast (2.25 ms) : 2181, 2319
. : milestone, 2250,
iast_GLOBAL (2.293 ms) : 2224, 2363
. : milestone, 2293,
profiling (2.07 ms) : 2015, 2124
. : milestone, 2070,
tracing (2.063 ms) : 2009, 2117
. : milestone, 2063,
|
fd826ee to
1ecfe51
Compare
| @@ -42,7 +43,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() { | |||
| @Override | |||
| public Map<String, String> contextStore() { | |||
| Map<String, String> contextStores = new HashMap<>(); | |||
| contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); | |||
| contextStores.put( | |||
There was a problem hiding this comment.
I want to store both cluster ID & pending Kafka config under the same key. The way to do that is to create a container class.
There was a problem hiding this comment.
Done - this is already handled by the MetadataState container class that holds both clusterId and pendingConfig.
|
Hi! 👋 Thanks for your pull request! 🎉 To help us review it, please make sure to:
If you need help, please check our contributing guidelines. |
- Fix MetadataState class name to use fully qualified name in KafkaProducerInstrumentation (helperClassNames and contextStore), which was causing producer spans to break due to context store mismatch - Replace sensitive keys denylist with an allowlist of safe config keys - Mask values of non-allowed keys with "****" instead of dropping them, to enable monitoring of which configs are seen but not yet allowed Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
In Groovy, `map[expr1] = expr2` evaluates expr2 before expr1, which swapped msgpack key/value reads. Use temp variables instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move test to default package to avoid underscore in package name, matching the convention used by other kafka test files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ecks - Use "kafka_producer"/"kafka_consumer" consistently in KafkaConfigHelper - Remove log.warn debug logging from MsgPackDatastreamsPayloadWriter - Remove unbounded reportedKafkaConfigs dedup map; configs flow through StatsBucket like other stats points - Make MASKED_VALUE and ALLOWED_KEYS public in KafkaConfigHelper - Remove redundant null checks in writeKafkaConfigs (constructor normalizes) - Update tests to reflect removal of dedup behavior Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
b898dd4 to
fb55014
Compare
The ContextPropagationAdvice classes were still using InstrumentationContext.get(Metadata.class, String.class) after the context store value type was changed from String to MetadataState, causing an instrumentation error at runtime. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata); | ||
| if (state == null) { | ||
| state = new MetadataState(); | ||
| InstrumentationContext.get(Metadata.class, MetadataState.class).put(metadata, state); |
There was a problem hiding this comment.
I think InstrumentationContext.get is relatively expensive, so we should avoid doing that twice, but also it has a method putIfAbsent that does exactly what you want, so you should use that instead (also avoids the double lookup)
.../main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java
Outdated
Show resolved
Hide resolved
.../src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java
Outdated
Show resolved
Hide resolved
...-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java
Outdated
Show resolved
Hide resolved
...-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java
Outdated
Show resolved
Hide resolved
...8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java
Outdated
Show resolved
Hide resolved
...ain/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdate22AndAfterAdvice.java
Outdated
Show resolved
Hide resolved
.../main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdateBefore22Advice.java
Outdated
Show resolved
Hide resolved
...src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerConstructorAdvice.java
Outdated
Show resolved
Hide resolved
| public static void storePendingProducerConfig(MetadataState state, Map<String, String> config) { | ||
| state.setPendingConfig(new PendingConfig("kafka_producer", "", config)); | ||
| log.debug("Stored pending producer config (cluster ID not yet known)"); | ||
| } | ||
|
|
||
| /** Store a consumer config to be reported once the cluster ID is known from metadata. */ | ||
| public static void storePendingConsumerConfig( | ||
| MetadataState state, String consumerGroup, Map<String, String> config) { | ||
| state.setPendingConfig( | ||
| new PendingConfig("kafka_consumer", consumerGroup != null ? consumerGroup : "", config)); | ||
| log.debug("Stored pending consumer config (cluster ID not yet known)"); | ||
| } | ||
|
|
||
| /** Called from metadata update advice when the cluster ID becomes available. */ | ||
| public static void reportPendingConfig(MetadataState state, String clusterId) { | ||
| PendingConfig pending = state.takePendingConfig(); | ||
| if (pending != null) { | ||
| log.debug("Received cluster ID, reporting {} config", pending.type); | ||
| if (Config.get().isDataStreamsEnabled()) { | ||
| AgentTracer.get() | ||
| .getDataStreamsMonitoring() | ||
| .reportKafkaConfig(pending.type, clusterId, pending.consumerGroup, pending.config); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
those 3 methods are a bit verbose on debug, how often can we expect to see those ? Is it safe to assume it'd only happen once ?
There was a problem hiding this comment.
Correct, only when consumer / producers are created. I expect that to be once per applications for most apps, maybe a bit more if some applications use dynamic producers / consumers. But shouldn't be too often. Let me know if you think I should remove some here.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| InstrumentationContext.get(Metadata.class, MetadataState.class).put(metadata, state); | ||
| } | ||
| InstrumentationContext.get(Metadata.class, MetadataState.class) | ||
| .putIfAbsent(metadata, new MetadataState()); |
There was a problem hiding this comment.
nit: you could use the factory overload of the method to avoid creating the object if we don't need it
…e allocation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/merge |
|
View all feedbacks in Devflow UI.
The expected merge time in
|
|
/merge |
|
View all feedbacks in Devflow UI.
PR already in the queue with status in_progress |
What Does This Do
Captures Kafka producer and consumer client configurations and reports them through the Data Streams Monitoring (DSM) pipeline. Configurations are extracted at client construction time, sensitive values are masked, and the data is sent along with other DSM stats once the Kafka cluster ID is available from metadata.
Motivation
Client configs (producer / consumer configs) are a critical part of a Kafka environment. When issues arise with an application, they can stem from either Kafka broker misconfiguration or client misconfiguration. Having visibility into both sides is essential for diagnosing problems. For example, a wrong buffer size on the client side can put significant pressure on the broker.
Additional Notes
****isDataStreamsEnabled()MetadataStatecomposite object replaces the previous plainStringcontext store on KafkaMetadataobjects, holding both the cluster ID and a pending configkafka-clients-0.11andkafka-clients-3.8instrumentation versions are updatedtag: ai generated
[DSM-IDENT]