Skip to content

Commit 13b47c9

Browse files
committed
[DSM] Reduce per-message overhead in Kafka produce and consume hot paths
1 parent adeec8f commit 13b47c9

4 files changed

Lines changed: 87 additions & 3 deletions

File tree

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,15 @@ private static string[] BuildProduceEdgeTags(string clusterId, string topic)
299299
tags.MessageQueueTimeMs == null ? 0 : (long)tags.MessageQueueTimeMs,
300300
pathwayContext);
301301

302-
message?.Headers?.Remove(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext); // remove eventual junk
302+
// TemporaryBase64PathwayContext is only written by our consumer code when
303+
// KafkaCreateConsumerScopeEnabled=false. When it is true (the default), the
304+
// header is never present so the unconditional Remove performs a wasted O(n)
305+
// linear header scan on every message. Skip it when we know it can't be there.
306+
if (!tracer.CurrentTraceSettings.Settings.KafkaCreateConsumerScopeEnabled)
307+
{
308+
message?.Headers?.Remove(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext);
309+
}
310+
303311
if (!tracer.CurrentTraceSettings.Settings.KafkaCreateConsumerScopeEnabled && message?.Headers is not null && span.Context.PathwayContext != null)
304312
{
305313
// write the _new_ pathway (the "consume" checkpoint that we just set above) to the headers as a way to pass its value to an eventual

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,19 @@ internal void Inject<TCarrier>(PathwayContext context, TCarrier headers, bool is
110110
{
111111
try
112112
{
113+
#if NETCOREAPP3_1_OR_GREATER
114+
// Decode directly into a stack buffer to avoid a heap allocation per consume.
115+
Span<byte> decodedBytes = stackalloc byte[PathwayContextEncoder.MaxEncodedSize];
116+
var status = Base64.DecodeFromUtf8(base64Bytes, decodedBytes, out _, out int bytesWritten);
117+
118+
if (status != OperationStatus.Done)
119+
{
120+
Log.Error("Failed to decode Base64 data streams context. OperationStatus: {Status}", status);
121+
return null;
122+
}
123+
124+
return PathwayContextEncoder.Decode(decodedBytes.Slice(0, bytesWritten));
125+
#else
113126
// Calculate the maximum decoded length
114127
// Base64 encoding encodes 3 bytes of data into 4 bytes of encoded data
115128
// So the maximum decoded length is (base64Bytes.Length * 3) / 4
@@ -134,6 +147,7 @@ internal void Inject<TCarrier>(PathwayContext context, TCarrier headers, bool is
134147
return PathwayContextEncoder.Decode(decodedBytes.AsSpan(0, bytesWritten).ToArray());
135148
}
136149
}
150+
#endif
137151
}
138152
catch (Exception ex)
139153
{

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ internal sealed class DataStreamsManager
4444
private readonly IDisposable _updateSubscription;
4545
private readonly bool _isLegacyDsmHeadersEnabled;
4646
private readonly bool _isInDefaultState;
47-
private readonly ConditionalWeakTable<string[], NodeHashCacheEntry> _nodeHashCache = new();
47+
// Keyed by string[] identity (reference equality) — safe because EdgeTagCache holds strong
48+
// references to the cached arrays (bounded by MaxEdgeTagCacheSize).
49+
private readonly ConcurrentDictionary<string[], NodeHashCacheEntry> _nodeHashCache =
50+
new(NodeHashCacheKeyComparer.Instance);
51+
4852
private long _nodeHashBase; // note that this actually represents a `ulong` that we have done an unsafe cast for
4953
private MutableSettings _previousMutableSettings;
5054
private string? _previousContainerTagsHash;
@@ -312,7 +316,7 @@ public void InjectPathwayContextAsBase64String<TCarrier>(PathwayContext? context
312316

313317
// Don't blame me, blame the fact we can't do Volatile.Read with a ulong in .NET FX...
314318
var nodeHashBase = new NodeHashBase(unchecked((ulong)Volatile.Read(ref _nodeHashBase)));
315-
var cacheEntry = _nodeHashCache.GetOrCreateValue(edgeTags);
319+
var cacheEntry = _nodeHashCache.GetOrAdd(edgeTags, static _ => new NodeHashCacheEntry());
316320
NodeHash nodeHash;
317321

318322
// Fast lock-free path: snapshot is an immutable object published via a volatile field.
@@ -426,6 +430,20 @@ public bool ShouldExtractSchema(Span span, string operation, out int weight)
426430
return false;
427431
}
428432

433+
/// <summary>
434+
/// Reference-equality comparer for string[] keys in <see cref="_nodeHashCache"/>.
435+
/// Two string[] objects are considered equal only when they are the same instance,
436+
/// which is always true for the cached arrays held by <see cref="EdgeTagCache{TKey}"/>.
437+
/// </summary>
438+
private sealed class NodeHashCacheKeyComparer : IEqualityComparer<string[]>
439+
{
440+
internal static readonly NodeHashCacheKeyComparer Instance = new();
441+
442+
public bool Equals(string[]? x, string[]? y) => ReferenceEquals(x, y);
443+
444+
public int GetHashCode(string[] obj) => System.Runtime.CompilerServices.RuntimeHelpers.GetHashCode(obj);
445+
}
446+
429447
/// <summary>
430448
/// Memoized NodeHash associated with a specific edge-tag array instance and nodeHashBase value.
431449
/// The volatile <see cref="_snapshot"/> field enables a lock-free fast path: callers read the

tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,50 @@ public static int EncodeInto(PathwayContext pathway, Span<byte> buffer)
112112
return new PathwayContext(new PathwayHash(hash), pathwayStartNs, edgeStartNs);
113113
}
114114

115+
#if NETCOREAPP3_1_OR_GREATER
116+
/// <summary>
117+
/// Zero-allocation alternative to <see cref="Decode(byte[])"/>: decodes directly from a
118+
/// caller-supplied <paramref name="bytes"/> span (e.g. a stackalloc buffer).
119+
/// </summary>
120+
public static PathwayContext? Decode(Span<byte> bytes)
121+
{
122+
if (bytes.Length < 10)
123+
{
124+
Log.Warning<int>("Error decoding Data Stream PathwayContext from bytes: insufficient bytes ({ByteCount})", bytes.Length);
125+
return null;
126+
}
127+
128+
var hash = System.Buffers.Binary.BinaryPrimitives.ReadUInt64LittleEndian(bytes);
129+
130+
var pathwayStartMs = VarEncodingHelper.ReadVarLongZigZag(bytes.Slice(8), out var bytesRead);
131+
if (pathwayStartMs is null)
132+
{
133+
Log.Warning("Error decoding Data Stream PathwayContext from bytes: invalid pathway start");
134+
return null;
135+
}
136+
137+
var edgeStartMs = VarEncodingHelper.ReadVarLongZigZag(bytes.Slice(8 + bytesRead), out _);
138+
if (edgeStartMs is null)
139+
{
140+
Log.Warning("Error decoding Data Stream PathwayContext from bytes: invalid edge start");
141+
return null;
142+
}
143+
144+
var pathwayStartNs = ToNanoseconds(pathwayStartMs.Value);
145+
var edgeStartNs = ToNanoseconds(edgeStartMs.Value);
146+
if (pathwayStartMs > pathwayStartNs || edgeStartMs.Value > edgeStartNs)
147+
{
148+
Log.Warning(
149+
"Overflow detected in Data Stream PathwayContext from bytes: invalid pathway {PathwayMs}ms or edge {EdgeMs}ms",
150+
pathwayStartMs,
151+
edgeStartMs);
152+
return null;
153+
}
154+
155+
return new PathwayContext(new PathwayHash(hash), pathwayStartNs, edgeStartNs);
156+
}
157+
#endif
158+
115159
private static long ToNanoseconds(long milliseconds)
116160
=> milliseconds * 1_000_000;
117161

0 commit comments

Comments
 (0)