Skip to content

Commit 8d80a09

Browse files
committed
NCBC-4160: AppTelemetry reports significantly lower metrics than it should
Motivation ---------- - Histogram bins are not thread-safe. Multiple threads can read `currentValue` simultaneously, both compute count + 1, and both write back the same value, so one increment is silently lost. - Race condition in TryExportMetricsAndReset: An increment thread obtains a reference to metricSetA via GetOrAdd (inside lock), then the export thread takes a snapshot and clears the dictionary. The increment thread thenwrites to metricSetA, but it's already been snapshotted and will be discarded after export, so the increment is permanently lost. Or, a new entry metricSetB is added to the dictionary AFTER the snapshot ToArray() but BEFORE Clear(). It ends up in neither the snapshot nor the post-clear dictionary. - WebSocket send failure permanently loses exported metrics - TryExportMetricsAndReset clears the dictionary before the WebSocket send. If SendAsync fails (connection drop, timeout), those metrics are lost. - Backoff never resets after a successful connection (so it grows to 1h backoff each time) Changes ------- - `AppTelemetryHistogramBins.cs`: Added a lock around the read-modify-write in IncrementCountAndSum() to prevent concurrent threads from overwriting each other's updates to bin counts and sums. - `AppTelemetryCollector.cs`: Replaced the snapshot then clear pattern with an atomic dictionary swap under _metricsLock. The old dictionary is swapped out and exported while new increments go to the fresh dictionary, removing the window where metrics could be lost between snapshot and clear. Also fixed Disable() to use the same swap pattern under the metrics lock. - `WebSocketClientHandler.cs`: Added a _pendingMetrics cache. If SendAsync fails after export, the serialized metrics are saved and retried on the next telemetry request instead of being lost. - `WebSocketClientHandler.cs`: Added _attempt = 0 after a successful WebSocket connection opens, so the backoff resets to 100ms instead of staying at the max (up to 1h) after a connection terminates. Change-Id: I1a96ddbd0c908b1ec90a20868aed264ca499b607 Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/241592 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: David Kelly <davidmichaelkelly@gmail.com>
1 parent 180f500 commit 8d80a09

3 files changed

Lines changed: 61 additions & 27 deletions

File tree

src/Couchbase/Core/Diagnostics/Metrics/AppTelemetry/AppTelemetryCollector.cs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ internal class AppTelemetryCollector : IAppTelemetryCollector
2222
private readonly object _metricsLock = new();
2323
private readonly Uri? _endpoint;
2424
private CancellationTokenSource? _webSocketTokenSource;
25+
private ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> _metricSets = new();
2526

26-
public ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> MetricSets { get; } = new();
27+
internal ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> MetricSets => _metricSets;
2728

2829
public AppTelemetryCollector()
2930
{
@@ -75,7 +76,10 @@ public void Disable()
7576
{
7677
_enabled = false;
7778
_webSocketTokenSource?.Cancel();
78-
MetricSets.Clear();
79+
}
80+
lock (_metricsLock)
81+
{
82+
_metricSets = new ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet>();
7983
}
8084
}
8185

@@ -125,8 +129,7 @@ public void IncrementHistogram(AppTelemetryRequestType name, TimeSpan? operation
125129
AppTelemetryMetricSet metricSet;
126130
lock (_metricsLock)
127131
{
128-
// Ensures we don't add during a Clear
129-
metricSet = MetricSets.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
132+
metricSet = _metricSets.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
130133
}
131134

132135
metricSet.IncrementHistogram(name, operationLatency.Value);
@@ -142,27 +145,25 @@ public void IncrementCounter(AppTelemetryServiceType serviceType, AppTelemetryCo
142145
AppTelemetryMetricSet metricSet;
143146
lock (_metricsLock)
144147
{
145-
// Ensures we don't add during a Clear
146-
metricSet = MetricSets.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
148+
metricSet = _metricSets.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
147149
}
148150
metricSet.IncrementCounter(serviceType, counterType);
149151
}
150152

151153
public bool TryExportMetricsAndReset(out string metricsString)
152154
{
153155
metricsString = string.Empty;
154-
if (MetricSets.IsEmpty) return false;
155-
156-
var sb = new StringBuilder();
157-
158-
var snapshot = MetricSets.ToArray();
159156

157+
ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> oldMetrics;
160158
lock (_metricsLock)
161159
{
162-
MetricSets.Clear();
160+
if (_metricSets.IsEmpty) return false;
161+
oldMetrics = _metricSets;
162+
_metricSets = new ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet>();
163163
}
164164

165-
foreach (var exported in snapshot.Select(entry => entry.Value.ExportAllMetrics(entry.Key)))
165+
var sb = new StringBuilder();
166+
foreach (var exported in oldMetrics.Select(entry => entry.Value.ExportAllMetrics(entry.Key)))
166167
{
167168
sb.Append(exported);
168169
}

src/Couchbase/Core/Diagnostics/Metrics/AppTelemetry/AppTelemetryHistogramBins.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using Couchbase.Core.Compatibility;
56

67
namespace Couchbase.Core.Diagnostics.Metrics.AppTelemetry;
78

89
[InterfaceStability(Level.Volatile)]
910
internal class AppTelemetryHistogramBins
1011
{
12+
#if NET10_0_OR_GREATER
13+
private readonly Lock _binLock = new();
14+
#else
15+
private readonly object _binLock = new();
16+
#endif
1117
/// <summary>
1218
/// Count of all operations. This value is the cumulative sum of all smaller bins.
1319
/// Sum of all operations' durations in seconds.
@@ -32,16 +38,19 @@ public void IncrementCountAndSum(TimeSpan operationLatency)
3238
{
3339
var opLatency = operationLatency.TotalMilliseconds;
3440

35-
// For each bin, if the operation latency is less than or equal to the bin's upper bound,
36-
// increment the count and add to the sum
37-
foreach (var bin in Bins)
41+
lock (_binLock)
3842
{
39-
var le = bin.Keys.First(); // There's only 1 key per bin
43+
// For each bin, if the operation latency is less than or equal to the bin's upper bound,
44+
// increment the count and add to the sum
45+
foreach (var bin in Bins)
46+
{
47+
var le = bin.Keys.First(); // There's only 1 key per bin
4048

41-
if (!(opLatency <= le)) continue;
42-
var currentValue = bin[le];
43-
bin[le] = new KeyValuePair<uint, double>(currentValue.Key + 1, currentValue.Value + opLatency);
44-
break; //Break immediately after incrementing the bin, then we'll sum each bin cumulatively in the export method.
49+
if (!(opLatency <= le)) continue;
50+
var currentValue = bin[le];
51+
bin[le] = new KeyValuePair<uint, double>(currentValue.Key + 1, currentValue.Value + opLatency);
52+
break; //Break immediately after incrementing the bin, then we'll sum each bin cumulatively in the export method.
53+
}
4554
}
4655
}
4756
}

src/Couchbase/Core/Diagnostics/Metrics/AppTelemetry/WebSocketClientHandler.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ internal class WebSocketClientHandler : IDisposable
3838
private readonly IRedactor _redactor;
3939
private int _attempt = 0;
4040
private readonly int _clampedExponent = 0;
41+
private string? _pendingMetrics;
4142
private Uri? Endpoint => _appTelemetryCollector.Endpoint(_attempt);
4243

4344
public WebSocketClientHandler(IAppTelemetryCollector appTelemetryCollector)
@@ -69,6 +70,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
6970

7071
if (_webSocket?.State == WebSocketState.Open)
7172
{
73+
_attempt = 0;
7274
await ReceiveAsync(cancellationToken).ConfigureAwait(false);
7375
}
7476
}
@@ -156,20 +158,42 @@ private async Task HandleMessage(byte[] message, CancellationToken cancellationT
156158

157159
private async Task SendTelemetryAsync(CancellationToken cancellationToken)
158160
{
159-
if (_appTelemetryCollector.TryExportMetricsAndReset(out var metrics))
161+
string? metrics;
162+
163+
if (_pendingMetrics != null)
164+
{
165+
// Retry previously unsent metrics before exporting new ones.
166+
metrics = _pendingMetrics;
167+
}
168+
else if (_appTelemetryCollector.TryExportMetricsAndReset(out var newMetrics))
169+
{
170+
metrics = newMetrics;
171+
}
172+
else
160173
{
161-
var metricsBytes = Encoding.UTF8.GetBytes(metrics);
162-
var response = new byte[1 + metricsBytes.Length];
163-
response[0] = SuccessOpcode;
164-
metricsBytes.CopyTo(response, 1);
174+
return;
175+
}
176+
177+
var metricsBytes = Encoding.UTF8.GetBytes(metrics);
178+
var response = new byte[1 + metricsBytes.Length];
179+
response[0] = SuccessOpcode;
180+
metricsBytes.CopyTo(response, 1);
165181

166-
_logger.LogDebug("Sending AppTelemetry metrics {Metrics}", metrics); //TODO: this is for debugging
182+
_logger.LogTrace("Sending AppTelemetry metrics {Metrics}", metrics);
167183

184+
try
185+
{
168186
await _webSocket!.SendAsync(
169187
new ArraySegment<byte>(response),
170188
WebSocketMessageType.Binary,
171189
true,
172190
cancellationToken).ConfigureAwait(false);
191+
_pendingMetrics = null;
192+
}
193+
catch
194+
{
195+
_pendingMetrics = metrics;
196+
throw;
173197
}
174198
}
175199

0 commit comments

Comments
 (0)