Skip to content

Commit 3850cdb

Browse files
committed
DSM overhead optimizations
1 parent 7f212c3 commit 3850cdb

9 files changed

Lines changed: 321 additions & 8 deletions

File tree

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ private static long GetMessageSize<T>(T message)
142142
return size;
143143
}
144144

145+
// NOTE: tags must be sorted alphabetically — called only on edge-tag cache miss
146+
private static string[] BuildProduceEdgeTags(string clusterId, string topic)
147+
{
148+
if (!StringUtil.IsNullOrEmpty(clusterId))
149+
{
150+
return StringUtil.IsNullOrEmpty(topic)
151+
? ["direction:out", $"kafka_cluster_id:{clusterId}", "type:kafka"]
152+
: ["direction:out", $"kafka_cluster_id:{clusterId}", $"topic:{topic}", "type:kafka"];
153+
}
154+
155+
return ["direction:out", $"topic:{topic}", "type:kafka"];
156+
}
157+
145158
internal static Scope? CreateConsumerScope(
146159
Tracer tracer,
147160
DataStreamsManager dataStreamsManager,
@@ -398,17 +411,14 @@ internal static void TryInjectHeaders<TTopicPartitionMarker, TMessage>(
398411
ProducerCache.TryGetProducer(producer, out _, out var producerClusterId);
399412

400413
string[] edgeTags;
401-
if (!StringUtil.IsNullOrEmpty(producerClusterId))
414+
if (StringUtil.IsNullOrEmpty(topic) && StringUtil.IsNullOrEmpty(producerClusterId))
402415
{
403-
edgeTags = StringUtil.IsNullOrEmpty(topic)
404-
? ["direction:out", $"kafka_cluster_id:{producerClusterId}", "type:kafka"]
405-
: ["direction:out", $"kafka_cluster_id:{producerClusterId}", $"topic:{topic}", "type:kafka"];
416+
edgeTags = DefaultProduceEdgeTags;
406417
}
407418
else
408419
{
409-
edgeTags = StringUtil.IsNullOrEmpty(topic)
410-
? DefaultProduceEdgeTags
411-
: ["direction:out", $"topic:{topic}", "type:kafka"];
420+
var cacheKey = new ProduceEdgeTagCacheKey(producerClusterId ?? string.Empty, topic ?? string.Empty);
421+
edgeTags = dataStreamsManager.GetOrCreateEdgeTags(cacheKey, static k => BuildProduceEdgeTags(k.ClusterId, k.Topic));
412422
}
413423

414424
var msgSize = dataStreamsManager.IsInDefaultState ? 0 : GetMessageSize(message);
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// <copyright file="ProduceEdgeTagCacheKey.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System;
9+
10+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;
11+
12+
/// <summary>
13+
/// Value-type cache key for produce edge tags. Using a named struct avoids boxing and
14+
/// is compatible with all supported target frameworks.
15+
/// </summary>
16+
internal readonly struct ProduceEdgeTagCacheKey : IEquatable<ProduceEdgeTagCacheKey>
17+
{
18+
public readonly string ClusterId;
19+
public readonly string Topic;
20+
21+
public ProduceEdgeTagCacheKey(string clusterId, string topic)
22+
{
23+
ClusterId = clusterId;
24+
Topic = topic;
25+
}
26+
27+
public bool Equals(ProduceEdgeTagCacheKey other)
28+
=> ClusterId == other.ClusterId && Topic == other.Topic;
29+
30+
public override bool Equals(object? obj)
31+
=> obj is ProduceEdgeTagCacheKey other && Equals(other);
32+
33+
public override int GetHashCode()
34+
{
35+
unchecked
36+
{
37+
int hash = 17;
38+
hash = (hash * 31) + (ClusterId?.GetHashCode() ?? 0);
39+
hash = (hash * 31) + (Topic?.GetHashCode() ?? 0);
40+
return hash;
41+
}
42+
}
43+
}

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,29 @@ internal void Inject<TCarrier>(PathwayContext context, TCarrier headers, bool is
3434
{
3535
if (headers is null) { ThrowHelper.ThrowArgumentNullException(nameof(headers)); }
3636

37+
#if NETCOREAPP3_1_OR_GREATER
38+
// Encode directly into stack buffers to avoid heap allocations for the intermediate byte arrays.
39+
// The only unavoidable allocation is the final ToArray() for headers.Add, since Kafka takes ownership.
40+
Span<byte> encodedBytes = stackalloc byte[PathwayContextEncoder.MaxEncodedSize];
41+
var encodedLen = PathwayContextEncoder.EncodeInto(context, encodedBytes);
42+
var encodedSlice = encodedBytes.Slice(0, encodedLen);
43+
44+
Span<byte> base64Bytes = stackalloc byte[PathwayContextEncoder.MaxBase64EncodedSize];
45+
var status = Base64.EncodeToUtf8(encodedSlice, base64Bytes, out _, out int bytesWritten);
46+
47+
if (status != OperationStatus.Done)
48+
{
49+
Log.Error("Failed to encode Data Streams context to Base64. OperationStatus: {Status}", status);
50+
return;
51+
}
52+
53+
headers.Add(DataStreamsPropagationHeaders.PropagationKeyBase64, base64Bytes.Slice(0, bytesWritten).ToArray());
54+
55+
if (isDataStreamsLegacyHeadersEnabled)
56+
{
57+
headers.Add(DataStreamsPropagationHeaders.PropagationKey, encodedSlice.ToArray());
58+
}
59+
#else
3760
var encodedBytes = PathwayContextEncoder.Encode(context);
3861

3962
// Calculate the maximum length of the base64 encoded data
@@ -62,6 +85,7 @@ internal void Inject<TCarrier>(PathwayContext context, TCarrier headers, bool is
6285
{
6386
headers.Add(DataStreamsPropagationHeaders.PropagationKey, encodedBytes);
6487
}
88+
#endif
6589
}
6690

6791
/// <summary>

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System;
88
using System.Collections.Concurrent;
99
using System.Collections.Generic;
10+
using System.Runtime.CompilerServices;
1011
using System.Threading;
1112
using System.Threading.Tasks;
1213
using Datadog.Trace.Agent.DiscoveryService;
@@ -27,6 +28,13 @@ namespace Datadog.Trace.DataStreamsMonitoring;
2728
/// </summary>
2829
internal sealed class DataStreamsManager
2930
{
31+
/// <summary>
32+
/// Maximum number of distinct keys stored in a single per-type edge-tag cache.
33+
/// When the limit is reached, new keys are computed on the fly without caching to
34+
/// prevent unbounded memory growth caused by high-cardinality identifiers.
35+
/// </summary>
36+
internal const int MaxEdgeTagCacheSize = 1000;
37+
3038
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<DataStreamsManager>();
3139
private static readonly AsyncLocal<PathwayContext?> LastConsumePathway = new(); // saves the context on consume checkpointing only
3240
private readonly object _nodeHashUpdateLock = new();
@@ -36,6 +44,7 @@ internal sealed class DataStreamsManager
3644
private readonly IDisposable _updateSubscription;
3745
private readonly bool _isLegacyDsmHeadersEnabled;
3846
private readonly bool _isInDefaultState;
47+
private readonly ConditionalWeakTable<string[], NodeHashCacheEntry> _nodeHashCache = new();
3948
private long _nodeHashBase; // note that this actually represents a `ulong` that we have done an unsafe cast for
4049
private MutableSettings _previousMutableSettings;
4150
private string? _previousContainerTagsHash;
@@ -303,7 +312,24 @@ public void InjectPathwayContextAsBase64String<TCarrier>(PathwayContext? context
303312

304313
// Don't blame me, blame the fact we can't do Volatile.Read with a ulong in .NET FX...
305314
var nodeHashBase = new NodeHashBase(unchecked((ulong)Volatile.Read(ref _nodeHashBase)));
306-
var nodeHash = HashHelper.CalculateNodeHash(nodeHashBase, edgeTags);
315+
var cacheEntry = _nodeHashCache.GetOrCreateValue(edgeTags);
316+
NodeHash nodeHash;
317+
318+
// Fast lock-free path: snapshot is an immutable object published via a volatile field.
319+
// If the base still matches we avoid taking any lock on the hot path.
320+
if (!cacheEntry.TryGetNodeHash(nodeHashBase, out nodeHash))
321+
{
322+
lock (cacheEntry)
323+
{
324+
// Double-check under lock in case another thread raced to update
325+
if (!cacheEntry.TryGetNodeHash(nodeHashBase, out nodeHash))
326+
{
327+
nodeHash = HashHelper.CalculateNodeHash(nodeHashBase, edgeTags);
328+
cacheEntry.Store(nodeHashBase, nodeHash);
329+
}
330+
}
331+
}
332+
307333
var parentHash = previousContext?.Hash ?? default;
308334
var pathwayHash = HashHelper.CalculatePathwayHash(nodeHash, parentHash);
309335

@@ -351,6 +377,34 @@ public void InjectPathwayContextAsBase64String<TCarrier>(PathwayContext? context
351377
}
352378
}
353379

380+
/// <summary>
381+
/// Returns a cached edge-tag array for the given key, creating and caching it on first use.
382+
/// On cache hits, zero heap allocations occur. The factory is only invoked on the first call
383+
/// per unique key, making this safe to use on high-throughput hot paths.
384+
/// Once the cache reaches <see cref="MaxEdgeTagCacheSize"/> entries the result is computed
385+
/// fresh each time (no caching) to bound memory usage for high-cardinality key spaces.
386+
/// </summary>
387+
/// <typeparam name="TKey">A value type (struct) used as the cache key — no boxing.</typeparam>
388+
/// <param name="key">The cache key derived from the caller's natural identifiers.</param>
389+
/// <param name="factory">A static factory that builds the edge-tag array from the key on cache miss.</param>
390+
public string[] GetOrCreateEdgeTags<TKey>(TKey key, Func<TKey, string[]> factory)
391+
where TKey : notnull, IEquatable<TKey>
392+
{
393+
var cache = EdgeTagCache<TKey>.Cache;
394+
if (cache.TryGetValue(key, out var existing))
395+
{
396+
return existing;
397+
}
398+
399+
if (cache.Count >= MaxEdgeTagCacheSize)
400+
{
401+
// High-cardinality key space — bypass cache to prevent unbounded memory growth
402+
return factory(key);
403+
}
404+
405+
return cache.GetOrAdd(key, factory);
406+
}
407+
354408
/// <summary>
355409
/// Make sure we only extract the schema (a costly operation) on select occasions
356410
/// </summary>
@@ -371,4 +425,58 @@ public bool ShouldExtractSchema(Span span, string operation, out int weight)
371425
weight = 0;
372426
return false;
373427
}
428+
429+
/// <summary>
430+
/// Memoized NodeHash associated with a specific edge-tag array instance and nodeHashBase value.
431+
/// The volatile <see cref="_snapshot"/> field enables a lock-free fast path: callers read the
432+
/// snapshot without a lock, and only acquire the lock when the base has changed or is missing.
433+
/// </summary>
434+
private sealed class NodeHashCacheEntry
435+
{
436+
// Immutable snapshot published via volatile write; null until first computation.
437+
private volatile NodeHashSnapshot? _snapshot;
438+
439+
/// <summary>
440+
/// Tries to return the cached <see cref="NodeHash"/> for <paramref name="nodeHashBase"/>
441+
/// without acquiring any lock (lock-free read via volatile field).
442+
/// </summary>
443+
public bool TryGetNodeHash(NodeHashBase nodeHashBase, out NodeHash nodeHash)
444+
{
445+
var snap = _snapshot; // volatile read — acts as a load-acquire barrier
446+
if (snap is not null && snap.Base == nodeHashBase.Value)
447+
{
448+
nodeHash = snap.Hash;
449+
return true;
450+
}
451+
452+
nodeHash = default;
453+
return false;
454+
}
455+
456+
/// <summary>
457+
/// Stores a newly-computed <see cref="NodeHash"/>. Must be called under a lock held by the caller.
458+
/// The volatile write ensures the snapshot is visible to all threads before the lock is released.
459+
/// </summary>
460+
public void Store(NodeHashBase nodeHashBase, NodeHash nodeHash)
461+
{
462+
_snapshot = new NodeHashSnapshot(nodeHashBase.Value, nodeHash); // volatile write
463+
}
464+
465+
/// <summary>Immutable payload published atomically via the volatile <see cref="_snapshot"/> field.</summary>
466+
private sealed class NodeHashSnapshot
467+
{
468+
private readonly ulong _base;
469+
private readonly NodeHash _hash;
470+
471+
internal NodeHashSnapshot(ulong @base, NodeHash hash)
472+
{
473+
_base = @base;
474+
_hash = hash;
475+
}
476+
477+
internal ulong Base => _base;
478+
479+
internal NodeHash Hash => _hash;
480+
}
481+
}
374482
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// <copyright file="EdgeTagCache.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
using System;
7+
using System.Collections.Concurrent;
8+
9+
namespace Datadog.Trace.DataStreamsMonitoring;
10+
11+
/// <summary>
12+
/// Process-wide cache of edge tag arrays, keyed by a caller-supplied value type.
13+
/// One dictionary instance exists per distinct TKey type (static generic class pattern).
14+
/// This avoids boxing and lets each integration use its own natural key shape.
15+
/// </summary>
16+
internal static class EdgeTagCache<TKey>
17+
where TKey : notnull, IEquatable<TKey>
18+
{
19+
internal static readonly ConcurrentDictionary<TKey, string[]> Cache = new();
20+
}

tracer/src/Datadog.Trace/DataStreamsMonitoring/PathwayContextEncoder.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ internal static class PathwayContextEncoder
1515
{
1616
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(PathwayContextEncoder));
1717

18+
/// <summary>Maximum byte length produced by EncodeInto: 8 (hash) + 9 (pathway ms) + 9 (edge ms).</summary>
19+
internal const int MaxEncodedSize = 26;
20+
21+
/// <summary>Maximum byte length of the Base64 encoding of <see cref="MaxEncodedSize"/> bytes: ceil(26/3)*4 = 36.</summary>
22+
internal const int MaxBase64EncodedSize = 36;
23+
1824
/// <summary>
1925
/// Encodes a <see cref="PathwayContext"/> as a series of bytes
2026
/// NOTE: the encoding is lossy, in that we convert <see cref="PathwayContext.PathwayStart"/>
@@ -38,6 +44,25 @@ public static byte[] Encode(PathwayContext pathway)
3844
return bytes;
3945
}
4046

47+
#if NETCOREAPP3_1_OR_GREATER
48+
/// <summary>
49+
/// Zero-allocation alternative to <see cref="Encode"/>: writes the encoded pathway directly into
50+
/// a caller-supplied <paramref name="buffer"/> (must be at least <see cref="MaxEncodedSize"/> bytes).
51+
/// </summary>
52+
/// <returns>Number of bytes written into <paramref name="buffer"/>.</returns>
53+
public static int EncodeInto(PathwayContext pathway, Span<byte> buffer)
54+
{
55+
var pathwayStartMs = ToMilliseconds(pathway.PathwayStart);
56+
var edgeStartMs = ToMilliseconds(pathway.EdgeStart);
57+
58+
BinaryPrimitivesHelper.WriteUInt64LittleEndian(buffer, pathway.Hash.Value);
59+
var pathwayBytes = VarEncodingHelper.WriteVarLongZigZag(buffer.Slice(8), pathwayStartMs);
60+
var edgeBytes = VarEncodingHelper.WriteVarLongZigZag(buffer.Slice(8 + pathwayBytes), edgeStartMs);
61+
62+
return 8 + pathwayBytes + edgeBytes;
63+
}
64+
#endif
65+
4166
/// <summary>
4267
/// Tries to decode a <see cref="PathwayContext"/> from a <c>byte[]</c>.
4368
/// NOTE: the encoding process is lossy, so the decoded <see cref="PathwayContext"/>

tracer/src/Datadog.Trace/DataStreamsMonitoring/Utils/BinaryPrimitivesHelper.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ namespace Datadog.Trace.DataStreamsMonitoring.Utils;
1313

1414
internal static class BinaryPrimitivesHelper
1515
{
16+
#if NETCOREAPP3_1_OR_GREATER
17+
public static void WriteUInt64LittleEndian(Span<byte> bytes, ulong value)
18+
=> System.Buffers.Binary.BinaryPrimitives.WriteUInt64LittleEndian(bytes, value);
19+
#endif
20+
1621
public static void WriteUInt64LittleEndian(byte[] bytes, ulong value)
1722
{
1823
#if NETCOREAPP3_1_OR_GREATER

0 commit comments

Comments
 (0)