Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Runtime.InteropServices;
using Datadog.Trace.Debugger.RateLimiting;
using Datadog.Trace.Logging;

namespace Datadog.Trace.Debugger.Caching;
Expand All @@ -26,15 +27,10 @@ private DefaultMemoryChecker()

public bool IsLowResourceEnvironment { get; }

[return: MarshalAs(UnmanagedType.Bool)]
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
private static extern bool GlobalMemoryStatusEx([In, Out] MEMORYSTATUSEX lpBuffer);

private bool CheckLowResourceEnvironment()
{
try
{
Logger.Debug("Checking if environment is low on resources");
// Check if we're using more than 75% of available memory or there is less than 1GB of RAM available.
return IsLowResourceEnvironmentGc() || IsLowResourceEnvironmentSystem();
}
Expand Down Expand Up @@ -73,7 +69,7 @@ internal bool CheckWindowsMemory()
{
try
{
if (MEMORYSTATUSEX.GetAvailablePhysicalMemory(out var availableMemory))
if (WindowsMemoryInfo.TryGetAvailablePhysicalMemory(out var availableMemory))
{
// If less than 1GB of RAM is available, consider it a low-resource environment
return availableMemory < 1_073_741_824; // 1 GB in bytes
Expand Down Expand Up @@ -148,43 +144,4 @@ private ReadOnlySpan<char> ReadMemInfo()

return value.Slice(0, spaceIndex);
}

// Windows API for memory information
[StructLayout(LayoutKind.Sequential, CharSet = CharSet.Auto)]
private sealed class MEMORYSTATUSEX
{
#pragma warning disable IDE0044 // Add readonly modifier
#pragma warning disable CS0169 // Field is never used
private uint dwLength;
private uint dwMemoryLoad;
private ulong ullTotalPhys;
#pragma warning disable CS0649 // Field is never assigned to, and will always have its default value
private ulong ullAvailPhys;
#pragma warning restore CS0649 // Field is never assigned to, and will always have its default value
private ulong ullTotalPageFile;
private ulong ullAvailPageFile;
private ulong ullTotalVirtual;
private ulong ullAvailVirtual;
private ulong ullAvailExtendedVirtual;
#pragma warning restore CS0169 // Field is never used
#pragma warning restore IDE0044 // Add readonly modifier

private MEMORYSTATUSEX()
{
dwLength = (uint)Marshal.SizeOf(typeof(MEMORYSTATUSEX));
}

internal static bool GetAvailablePhysicalMemory(out ulong availableMemory)
{
availableMemory = 0;
MEMORYSTATUSEX memStatus = new MEMORYSTATUSEX();
if (GlobalMemoryStatusEx(memStatus))
{
availableMemory = memStatus.ullAvailPhys;
return true;
}

return false;
}
}
}
4 changes: 3 additions & 1 deletion tracer/src/Datadog.Trace/Debugger/DebuggerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ internal static DynamicInstrumentation CreateDynamicInstrumentation(IDiscoverySe
var lineProbeResolver = LineProbeResolver.Create(debuggerSettings.ThirdPartyDetectionExcludes, debuggerSettings.ThirdPartyDetectionIncludes);
var probeStatusPoller = ProbeStatusPoller.Create(diagnosticsSink, debuggerSettings);
var configurationUpdater = ConfigurationUpdater.Create(tracerSettings.Manager.InitialMutableSettings.Environment, tracerSettings.Manager.InitialMutableSettings.ServiceVersion, debuggerSettings.MaxProbesPerType, globalRateLimiter);
var memoryPressureMonitor = new MemoryPressureMonitor(MemoryPressureConfig.Default);

var statsd = GetDogStatsd(tracerSettings);

Expand All @@ -56,7 +57,8 @@ internal static DynamicInstrumentation CreateDynamicInstrumentation(IDiscoverySe
probeStatusPoller: probeStatusPoller,
configurationUpdater: configurationUpdater,
dogStats: statsd,
globalRateLimiter: globalRateLimiter);
globalRateLimiter: globalRateLimiter,
memoryPressureMonitor: memoryPressureMonitor);
}

private static IDogStatsd GetDogStatsd(TracerSettings tracerSettings)
Expand Down
97 changes: 75 additions & 22 deletions tracer/src/Datadog.Trace/Debugger/DynamicInstrumentation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ internal sealed class DynamicInstrumentation : IDisposable
{
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(DynamicInstrumentation));

private readonly TaskCompletionSource<bool> _processExit;
// Completed when this DI instance is being disposed (runtime disable via remote config, or process shutdown).
// Used to abort in-flight initialization waits promptly.
private readonly TaskCompletionSource<bool> _disposalSignal;
private readonly IDiscoveryService _discoveryService;
private readonly IRcmSubscriptionManager _subscriptionManager;
private readonly ISubscription _subscription;
Expand All @@ -53,6 +55,7 @@ internal sealed class DynamicInstrumentation : IDisposable
private readonly IProbeStatusPoller _probeStatusPoller;
private readonly ConfigurationUpdater _configurationUpdater;
private readonly IDogStatsd _dogStats;
private readonly MemoryPressureMonitor _memoryPressureMonitor;
private readonly DebuggerSettings _settings;
private readonly NativeProbeInstrumentationRequester _instrumentProbes;
private readonly object _instanceLock = new();
Expand All @@ -72,11 +75,12 @@ internal DynamicInstrumentation(
ConfigurationUpdater configurationUpdater,
IDogStatsd dogStats,
IDebuggerGlobalRateLimiter? globalRateLimiter = null,
MemoryPressureMonitor? memoryPressureMonitor = null,
NativeProbeInstrumentationRequester? instrumentProbes = null)
{
Log.Information("Initializing Dynamic Instrumentation");
_settings = settings;
_processExit = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_disposalSignal = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_discoveryService = discoveryService;
_lineProbeResolver = lineProbeResolver;
_snapshotUploader = snapshotUploader;
Expand All @@ -87,6 +91,7 @@ internal DynamicInstrumentation(
_configurationUpdater = configurationUpdater;
_configurationUpdater.SetProbeInstrumentationHandlers(UpdateAddedProbeInstrumentations, UpdateRemovedProbeInstrumentations);
_dogStats = dogStats;
_memoryPressureMonitor = memoryPressureMonitor ?? new MemoryPressureMonitor(MemoryPressureConfig.Default);
_instrumentProbes = instrumentProbes ?? DebuggerNativeMethods.InstrumentProbes;
_unboundProbes = new List<ProbeDefinition>();
_lastReportedUnboundProbeErrors = new Dictionary<string, LineProbeResolveErrorKey>();
Expand Down Expand Up @@ -145,7 +150,7 @@ private async Task InitializeAsync()
hasFileProbes = _configurationUpdater.HasAnyEffectiveProbeForFile(probeConfiguration);
if (hasFileProbes)
{
StartRuntimeIfNeeded();
StartRuntimeIfNeeded(subscribeToRcm: false);
}

_configurationUpdater.AcceptFile(probeConfiguration);
Expand All @@ -154,8 +159,7 @@ private async Task InitializeAsync()
var isRcmAvailable = await rcmAvailabilityTask.ConfigureAwait(false);
if (isRcmAvailable)
{
StartRuntimeIfNeeded();
_subscriptionManager.SubscribeToChanges(_subscription);
StartRuntimeIfNeeded(subscribeToRcm: true);
}

// Start background processing and register the assembly load callback if either:
Expand Down Expand Up @@ -184,16 +188,55 @@ private async Task InitializeAsync()
}
}

private void StartRuntimeIfNeeded()
/// <summary>
/// Starts the runtime (background processing + assembly-load callback) exactly once, and optionally
/// subscribes to RCM. Idempotent and safe to call from both the file-probe path (<paramref name="subscribeToRcm"/>
/// false) and the RCM-available path (<paramref name="subscribeToRcm"/> true).
/// </summary>
private void StartRuntimeIfNeeded(bool subscribeToRcm)
{
if (IsInitialized || IsDisposed)
if (IsDisposed || (IsInitialized && !subscribeToRcm))
{
return;
}

AppDomain.CurrentDomain.AssemblyLoad += CheckUnboundProbes;
StartBackgroundProcess();
Volatile.Write(ref _initializationState, 2);
lock (_instanceLock)
{
if (IsDisposed)
{
return;
}

// Start the runtime exactly once.
if (!IsInitialized)
{
var assemblyLoadSubscribed = false;
try
{
AppDomain.CurrentDomain.AssemblyLoad += CheckUnboundProbes;
assemblyLoadSubscribed = true;
StartBackgroundProcess();
Volatile.Write(ref _initializationState, 2);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make it a named constant

}
catch
{
if (assemblyLoadSubscribed)
{
AppDomain.CurrentDomain.AssemblyLoad -= CheckUnboundProbes;
}

throw;
}
}

// Subscribe only on the RCM path, and only if a disable hasn't landed in the meantime.
// Best-effort: correctness against a leaked subscription is guaranteed by Dispose's Unsubscribe
// running under this same lock.
if (subscribeToRcm && !IsDisposed)
{
_subscriptionManager.SubscribeToChanges(_subscription);
}
}
}

private void StartBackgroundProcess()
Expand Down Expand Up @@ -836,6 +879,16 @@ internal void AddLog(ProbeInfo probe, string log)
SetProbeStatusToEmitting(probe);
}

internal void RefreshMemoryPressureIfStale()
{
if (IsDisposed)
{
return;
}

_memoryPressureMonitor.RefreshIfStale();
}

internal void SetProbeStatusToEmitting(ProbeInfo probe)
{
if (IsDisposed)
Expand Down Expand Up @@ -899,7 +952,7 @@ private async Task<bool> WaitForRcmAvailabilityAsync()
var rcmTimeout = TimeSpan.FromMinutes(5);
var timeoutTask = Task.Delay(rcmTimeout);

var completedTask = await Task.WhenAny(rcmAvailabilityTcs.Task, timeoutTask, _processExit.Task).ConfigureAwait(false);
var completedTask = await Task.WhenAny(rcmAvailabilityTcs.Task, timeoutTask, _disposalSignal.Task).ConfigureAwait(false);
if (completedTask == timeoutTask)
{
Log.Warning("Dynamic Instrumentation could not be enabled because Remote Configuration Management is not available after waiting {Timeout} seconds. Please note that Dynamic Instrumentation is not supported in all environments (e.g. AAS). Ensure that you are using datadog-agent version 7.41.1 or higher, and that Remote Configuration Management is enabled in datadog-agent's yaml configuration file.", rcmTimeout.TotalSeconds);
Expand Down Expand Up @@ -930,34 +983,34 @@ void DiscoveryCallback(AgentConfiguration x)

public void Dispose()
{
// Already disposed
if (Interlocked.CompareExchange(ref _disposeState, 1, 0) != 0)
{
return;
}

if (_processExit.Task.IsCompleted)
{
return;
}
_disposalSignal.TrySetResult(true);

_processExit.TrySetResult(true);
AppDomain.CurrentDomain.AssemblyLoad -= CheckUnboundProbes;
lock (_instanceLock)
{
// Must stay under the lock: StartRuntimeIfNeeded subscribes AssemblyLoad under the same
// lock, so unsubscribing here is what prevents a subscribe-after-dispose handler leak.
AppDomain.CurrentDomain.AssemblyLoad -= CheckUnboundProbes;

SafeDisposal.New()
.Execute(() => _subscriptionManager.Unsubscribe(_subscription), "unsubscribing from RCM")
.Add(_snapshotUploader)
.Add(_logUploader)
.Add(_diagnosticsUploader)
.Add(_probeStatusPoller)
.Add(_memoryPressureMonitor)
.DisposeAll();
}

// Cannot await here because Dispose() is synchronous and callers hold locks.
// On master, _dogStats was disposed via SafeDisposal.Add() which called sync
// Dispose() — itself fire-and-forget internally via Task.Run().
_dogStats?.DisposeAsync().ContinueWith(t => Log.Error(t.Exception, "Error waiting for StatsD disposal"), TaskContinuationOptions.OnlyOnFaulted);
_dogStats?.DisposeAsync().ContinueWith(
t => Log.Error(t.Exception, "Error waiting for StatsD disposal"),
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted,
TaskScheduler.Default);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ private static CaptureLimitInfo ToCaptureLimitInfo(Capture? capture)
return result.Count == 0 ? null : result.ToArray();
}

private static bool ShouldRefreshMemoryPressureBeforeCapture(MethodState methodState)
{
return methodState is MethodState.BeginLine
or MethodState.BeginLineAsync
or MethodState.EntryStart
or MethodState.EntryAsync
or MethodState.ExitStart
or MethodState.ExitStartAsync;
}

public bool TryBeginProcess(in ProbeData probeData, [NotNullWhen(true)] out IDebuggerSnapshotCreator? snapshotCreator)
{
var state = _state;
Expand Down Expand Up @@ -138,8 +148,13 @@ public bool Process<TCapture>(ref CaptureInfo<TCapture> info, IDebuggerSnapshotC
}

var probeInfo = state.ProbeInfo;
var dynamicInstrumentation = DebuggerManager.Instance.DynamicInstrumentation;
if (dynamicInstrumentation is not null && ShouldRefreshMemoryPressureBeforeCapture(info.MethodState))
{
dynamicInstrumentation.RefreshMemoryPressureIfStale();
}
Comment thread
dudikeleti marked this conversation as resolved.

if (DebuggerManager.Instance.DynamicInstrumentation?.IsInitialized == false)
if (dynamicInstrumentation?.IsInitialized == false)
{
Log.Debug("Stop processing probe {ID} because Dynamic Instrumentation has not initialized yet or has been disabled, probably dynamically through Remote Config", probeData.ProbeId);
snapshotCreator.Stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// <copyright file="MemoryPressureConfig.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using System.Globalization;
using Datadog.Trace.Util;

namespace Datadog.Trace.Debugger.RateLimiting
{
internal readonly struct MemoryPressureConfig
{
Comment thread
dudikeleti marked this conversation as resolved.
public static MemoryPressureConfig Default { get; } = new()
{
HighPressureThresholdRatio = 0.85,
MaxGen2PerSecond = 2,
MemoryExitMargin = 0.05,
Gen2ExitMargin = 1,
ConsecutiveHighToEnter = 1,
ConsecutiveLowToExit = 1,
RefreshInterval = TimeSpan.FromSeconds(1)
};

// Enter high pressure at this fraction (0.0–1.0) of available memory in use. Machine-wide on .NET Framework.
public double HighPressureThresholdRatio { get; init; }

public int MaxGen2PerSecond { get; init; }

public double MemoryExitMargin { get; init; }

public int Gen2ExitMargin { get; init; }

public int ConsecutiveHighToEnter { get; init; }

public int ConsecutiveLowToExit { get; init; }

public TimeSpan RefreshInterval { get; init; }

public override string ToString()
{
var culture = CultureInfo.InvariantCulture;
var sb = StringBuilderCache.Acquire();

sb.Append("Threshold=");
sb.Append(HighPressureThresholdRatio.ToString("F2", culture));
sb.Append(" (");
sb.Append((HighPressureThresholdRatio * 100).ToString("F1", culture));
sb.Append("%), MaxGen2=");
sb.Append(MaxGen2PerSecond.ToString(culture));
sb.Append("/s, ExitMargin=");
sb.Append(MemoryExitMargin.ToString("F2", culture));
sb.Append(", Gen2ExitMargin=");
sb.Append(Gen2ExitMargin.ToString(culture));
sb.Append(", HighToEnter=");
sb.Append(ConsecutiveHighToEnter.ToString(culture));
sb.Append(", LowToExit=");
sb.Append(ConsecutiveLowToExit.ToString(culture));

return StringBuilderCache.GetStringAndRelease(sb);
}
}
}
Loading
Loading