Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Linq;
using Datadog.Trace.Debugger.Configurations.Models;
using Datadog.Trace.Debugger.RateLimiting;
using Datadog.Trace.Logging;
using Datadog.Trace.RemoteConfigurationManagement;

Expand All @@ -20,20 +21,35 @@ internal sealed class ConfigurationUpdater
private readonly string? _env;
private readonly string? _version;
private readonly int _maxProbesPerType;
private readonly IDebuggerGlobalRateLimiter _globalRateLimiter;

private ProbeConfiguration _currentConfiguration;

private ConfigurationUpdater(string? env, string? version, int maxProbesPerType)
private ConfigurationUpdater(string? env, string? version, int maxProbesPerType, IDebuggerGlobalRateLimiter? globalRateLimiter)
{
_env = env;
_version = version;
_maxProbesPerType = maxProbesPerType;
_globalRateLimiter = globalRateLimiter ?? DebuggerGlobalRateLimiter.Instance;
_currentConfiguration = new ProbeConfiguration();
}

public static ConfigurationUpdater Create(string? environment, string? serviceVersion, int maxProbesPerType)
public static ConfigurationUpdater Create(string? environment, string? serviceVersion, int maxProbesPerType, IDebuggerGlobalRateLimiter? globalRateLimiter = null)
{
return new ConfigurationUpdater(environment, serviceVersion, maxProbesPerType);
return new ConfigurationUpdater(environment, serviceVersion, maxProbesPerType, globalRateLimiter);
}

private static bool IsProbeConfigurationPath(RemoteConfigurationPath path)
{
return path.Id.StartsWith(DefinitionPaths.LogProbe)
|| path.Id.StartsWith(DefinitionPaths.MetricProbe)
|| path.Id.StartsWith(DefinitionPaths.SpanProbe)
|| path.Id.StartsWith(DefinitionPaths.SpanDecorationProbe);
}

private static bool IsServiceConfigurationPath(RemoteConfigurationPath path)
{
return path.Id.StartsWith(DefinitionPaths.ServiceConfiguration);
}

public List<UpdateResult> AcceptAdded(ProbeConfiguration configuration)
Expand All @@ -49,7 +65,7 @@ public List<UpdateResult> AcceptAdded(ProbeConfiguration configuration)

if (comparer.HasRateLimitChanged)
{
HandleRateLimitChanged(comparer);
HandleRateLimitChanged(filteredConfiguration);
}

_currentConfiguration = configuration;
Expand All @@ -61,7 +77,17 @@ public void AcceptRemoved(List<RemoteConfigurationPath> paths)
{
try
{
HandleRemovedProbesChanges(paths);
if (paths.Any(IsServiceConfigurationPath))
{
_currentConfiguration.ServiceConfiguration = null;
_globalRateLimiter.ResetRate();
}

var probePaths = paths.Where(IsProbeConfigurationPath).ToList();
if (probePaths.Count > 0)
{
HandleRemovedProbesChanges(probePaths);
}
}
catch (Exception ex)
{
Expand Down Expand Up @@ -127,9 +153,9 @@ private void HandleRemovedProbesChanges(List<RemoteConfigurationPath> paths)
DebuggerManager.Instance.DynamicInstrumentation?.UpdateRemovedProbeInstrumentations(paths);
}

private void HandleRateLimitChanged(ProbeConfigurationComparer comparer)
private void HandleRateLimitChanged(ProbeConfiguration configuration)
{
// todo handle rate limited changes
_globalRateLimiter.SetRate(configuration.ServiceConfiguration?.Sampling?.SnapshotsPerSecond);
}

internal sealed record UpdateResult(string Id, string? Error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public ProbeConfigurationComparer(ProbeConfiguration currentConfiguration, Probe

HasProbeRelatedChanges = AddedDefinitions.Any() || isFilteredListChanged;
HasRateLimitChanged =
(!currentConfiguration.ServiceConfiguration?.Sampling?.Equals(incomingConfiguration.ServiceConfiguration?.Sampling) ?? incomingConfiguration.ServiceConfiguration?.Sampling != null)
|| HasProbeRelatedChanges;
(!currentConfiguration.ServiceConfiguration?.Sampling?.Equals(incomingConfiguration.ServiceConfiguration?.Sampling) ?? incomingConfiguration.ServiceConfiguration?.Sampling != null);
}

public IReadOnlyList<ProbeDefinition> AddedDefinitions { get; }
Expand Down
7 changes: 5 additions & 2 deletions tracer/src/Datadog.Trace/Debugger/DebuggerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Datadog.Trace.Configuration;
using Datadog.Trace.Debugger.Configurations;
using Datadog.Trace.Debugger.ProbeStatuses;
using Datadog.Trace.Debugger.RateLimiting;
using Datadog.Trace.Debugger.Sink;
using Datadog.Trace.Debugger.Snapshots;
using Datadog.Trace.Debugger.Symbols;
Expand All @@ -29,6 +30,7 @@ internal sealed class DebuggerFactory

internal static DynamicInstrumentation CreateDynamicInstrumentation(IDiscoveryService discoveryService, IRcmSubscriptionManager remoteConfigurationManager, TracerSettings tracerSettings, Func<string> serviceNameProvider, DebuggerSettings debuggerSettings, IGitMetadataTagsProvider gitMetadataTagsProvider)
{
var globalRateLimiter = DebuggerGlobalRateLimiter.Instance;
var snapshotSlicer = SnapshotSlicer.Create(debuggerSettings);
var snapshotSink = SnapshotSink.Create(debuggerSettings, snapshotSlicer);
var logSink = SnapshotSink.Create(debuggerSettings, snapshotSlicer);
Expand All @@ -39,7 +41,7 @@ internal static DynamicInstrumentation CreateDynamicInstrumentation(IDiscoverySe
var diagnosticsUploader = CreateDiagnosticsUploader(discoveryService, debuggerSettings, gitMetadataTagsProvider, GetApiFactory(tracerSettings, true), diagnosticsSink);
var lineProbeResolver = LineProbeResolver.Create(debuggerSettings.ThirdPartyDetectionExcludes, debuggerSettings.ThirdPartyDetectionIncludes);
var probeStatusPoller = ProbeStatusPoller.Create(diagnosticsSink, debuggerSettings);
var configurationUpdater = ConfigurationUpdater.Create(tracerSettings.Manager.InitialMutableSettings.Environment, tracerSettings.Manager.InitialMutableSettings.ServiceVersion, debuggerSettings.MaxProbesPerType);
var configurationUpdater = ConfigurationUpdater.Create(tracerSettings.Manager.InitialMutableSettings.Environment, tracerSettings.Manager.InitialMutableSettings.ServiceVersion, debuggerSettings.MaxProbesPerType, globalRateLimiter);

var statsd = GetDogStatsd(tracerSettings);

Expand All @@ -53,7 +55,8 @@ internal static DynamicInstrumentation CreateDynamicInstrumentation(IDiscoverySe
diagnosticsUploader: diagnosticsUploader,
probeStatusPoller: probeStatusPoller,
configurationUpdater: configurationUpdater,
dogStats: statsd);
dogStats: statsd,
globalRateLimiter: globalRateLimiter);
}

private static IDogStatsd GetDogStatsd(TracerSettings tracerSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal sealed class DynamicInstrumentation : IDisposable
private readonly ConfigurationUpdater _configurationUpdater;
private readonly IDogStatsd _dogStats;
private readonly DebuggerSettings _settings;
private readonly IDebuggerGlobalRateLimiter _globalRateLimiter;
private readonly object _instanceLock = new();
private int _disposeState;

Expand All @@ -60,7 +61,8 @@ internal DynamicInstrumentation(
IDebuggerUploader diagnosticsUploader,
IProbeStatusPoller probeStatusPoller,
ConfigurationUpdater configurationUpdater,
IDogStatsd dogStats)
IDogStatsd dogStats,
IDebuggerGlobalRateLimiter? globalRateLimiter = null)
{
Log.Information("Initializing Dynamic Instrumentation");
_settings = settings;
Expand All @@ -74,7 +76,9 @@ internal DynamicInstrumentation(
_subscriptionManager = remoteConfigurationManager;
_configurationUpdater = configurationUpdater;
_dogStats = dogStats;
_globalRateLimiter = globalRateLimiter ?? DebuggerGlobalRateLimiter.Instance;
_unboundProbes = new List<ProbeDefinition>();
_globalRateLimiter.ResetRate();
_subscription = new Subscription(
(updates, removals) =>
{
Expand Down Expand Up @@ -744,6 +748,7 @@ public void Dispose()
// On master, _dogStats was disposed via SafeDisposal.Add() which called sync
// Dispose() — itself fire-and-forget internally via Task.Run().
_dogStats?.DisposeAsync().ContinueWith(t => Log.Error(t.Exception, "Error waiting for StatsD disposal"), TaskContinuationOptions.OnlyOnFaulted);
_globalRateLimiter.Dispose();
}
}
}
39 changes: 36 additions & 3 deletions tracer/src/Datadog.Trace/Debugger/Expressions/ProbeProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ internal sealed class ProbeProcessor : IProbeProcessor
private const string DynamicPrefix = "_dd.di.";
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(ProbeProcessor));

private readonly IDebuggerGlobalRateLimiter _globalRateLimiter;
private string _probeId = string.Empty;
private ProbeType _probeType;
private bool _shouldApplyGlobalRateLimit;
private ProbeExpressionEvaluator? _evaluator;
private DebuggerExpression?[]? _templates;
private DebuggerExpression? _condition;
Expand All @@ -41,7 +45,13 @@ internal sealed class ProbeProcessor : IProbeProcessor
/// <exception cref="ArgumentOutOfRangeException">If probe type or probe location is from unsupported type</exception>
/// <remarks>Exceptions should be caught and logged by the caller</remarks>
internal ProbeProcessor(ProbeDefinition probe)
: this(probe, DebuggerGlobalRateLimiter.Instance)
{
}

internal ProbeProcessor(ProbeDefinition probe, IDebuggerGlobalRateLimiter globalRateLimiter)
{
_globalRateLimiter = globalRateLimiter ?? throw new ArgumentNullException(nameof(globalRateLimiter));
InitializeProbeProcessor(probe);
}

Expand Down Expand Up @@ -73,6 +83,9 @@ private void InitializeProbeProcessor(ProbeDefinition probe)
};

SetExpressions(probe);
_probeId = probe.Id;
_probeType = probeType;
_shouldApplyGlobalRateLimit = probeType is ProbeType.Snapshot or ProbeType.Log;

var capture = (probe as LogProbe)?.Capture;
var maxInfo = capture != null
Expand Down Expand Up @@ -164,9 +177,15 @@ private ProbeExpressionEvaluator GetOrCreateEvaluator()
return _evaluator;
}

private bool SamplePayload(IAdaptiveSampler sampler)
{
return (!_shouldApplyGlobalRateLimit || _globalRateLimiter.ShouldSample(_probeType, _probeId))
&& sampler.Sample();
}

public bool ShouldProcess(in ProbeData probeData)
{
return HasCondition() || probeData.Sampler.Sample();
return HasCondition() || SamplePayload(probeData.Sampler);
}

public bool Process<TCapture>(ref CaptureInfo<TCapture> info, IDebuggerSnapshotCreator inSnapshotCreator, in ProbeData probeData)
Expand Down Expand Up @@ -382,14 +401,28 @@ private ExpressionEvaluationResult Evaluate(DebuggerSnapshotCreator snapshotCrea
}

if (evaluationResult.Condition != null && // i.e. not a metric, span probe, or span decoration
(evaluationResult.Condition is false ||
!sampler.Sample()))
evaluationResult.Condition is false)
{
// if the expression evaluated to false, or there is a rate limit, stop capture
shouldStopCapture = true;
return evaluationResult;
}

if (evaluationResult.Condition != null &&
_shouldApplyGlobalRateLimit &&
!_globalRateLimiter.ShouldSample(_probeType, _probeId))
{
shouldStopCapture = true;
return evaluationResult;
}

if (evaluationResult.Condition != null &&
!sampler.Sample())
{
shouldStopCapture = true;
return evaluationResult;
}

return evaluationResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using System.Threading;
using Datadog.Trace.Logging;
Expand Down Expand Up @@ -64,15 +66,16 @@ internal sealed class AdaptiveSampler : IAdaptiveSampler
private int _countsSlotIndex;
private Counts[] _countsSlots;

private Timer _timer;
private Action _rollWindowCallback;
private Timer? _timer;
private Action? _rollWindowCallback;
private int _disposeState;

internal AdaptiveSampler(
TimeSpan windowDuration,
int samplesPerWindow,
int averageLookback,
int budgetLookback,
Action rollWindowCallback)
Action? rollWindowCallback)
{
_timer = new Timer(state => RollWindow(), state: null, windowDuration, windowDuration);
_totalCountRunningAverage = 0;
Expand Down Expand Up @@ -134,6 +137,17 @@ public double NextDouble()
return ThreadSafeRandom.Shared.NextDouble();
}

public void Dispose()
{
if (Interlocked.CompareExchange(ref _disposeState, 1, 0) != 0)
{
return;
}

Interlocked.Exchange(ref _timer, null)?.Dispose();
_rollWindowCallback = null;
}

private double ComputeIntervalAlpha(int lookback)
{
return 1 - Math.Pow(lookback, -1.0 / lookback);
Expand Down Expand Up @@ -191,10 +205,7 @@ internal void RollWindow()

counts.Reset();

if (_rollWindowCallback != null)
{
_rollWindowCallback();
}
_rollWindowCallback?.Invoke();
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// <copyright file="AdaptiveSamplerLifetime.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using System;
using System.Threading;

namespace Datadog.Trace.Debugger.RateLimiting
{
internal static class AdaptiveSamplerLifetime
{
private const int AverageLookback = 180;
private const int BudgetLookback = 16;

private static readonly TimeSpan WindowDuration = TimeSpan.FromSeconds(1);

public static IAdaptiveSampler Create(int samplesPerSecond)
{
return new AdaptiveSampler(WindowDuration, samplesPerSecond, AverageLookback, BudgetLookback, rollWindowCallback: null);
}

public static void Replace(ref IAdaptiveSampler sampler, IAdaptiveSampler replacement)
{
Dispose(Interlocked.Exchange(ref sampler, replacement));
}

public static void Dispose(ref IAdaptiveSampler sampler)
{
Dispose(Interlocked.Exchange(ref sampler, NopAdaptiveSampler.Instance));
}

public static void Dispose(IAdaptiveSampler sampler)
{
if (!ReferenceEquals(sampler, NopAdaptiveSampler.Instance))
{
sampler.Dispose();
}
}
}
}
Loading
Loading