Skip to content

Commit f8170b6

Browse files
committed
NCBC-4183: Ensure correctness of recorded metrics and improve performance of AppTelemetry
Motivation ---------- Currently the App Service Level Telemetry in the SDK is its own entity and service registered with the DI container. Its lifecycle and metric reporting is separate from our existing metrics and histograms. The goal is to integrate it to reduce duplicate logic, and allow other code to idiomatically subscribe to the exporter, in order to reduce potential bugs and logic mismatch with the rest of our reporting. Changes ------- Fixes: - Only start MeterListener when there's at least 1 AppTelemetry endpoint available - Always send response to GET_TELEMETRY even when no metrics were collected - Move Query operation tracking (total) after error handling to prevent double-counting - _sum is now exporeted as an integer - Add operation and error tracking in SearchIndexManager - Ignore internal KV calls like GetCid etc by having GetAppTelemetryKvRequestType return null for unrecognized OpCodes Rest: - Add a connection string parameter for enable_app_telemetry - Add an AppTelemetry MeterListener independent from the other user-overridable meters Change-Id: If5349c2b2e7ccf8a5575d2f5066a0614c4be215b Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/241927 Reviewed-by: Emilien Bevierre <emilien.bevierre@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 2ebe062 commit f8170b6

39 files changed

Lines changed: 2311 additions & 1635 deletions

src/Couchbase/Analytics/AnalyticsClient.cs

Lines changed: 142 additions & 118 deletions
Large diffs are not rendered by default.

src/Couchbase/CStringParams.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ internal static class CStringParams
3131
public const string NetworkResolution = "network";
3232
public const string PreferredServerGroup = "preferred_server_group";
3333

34+
[InterfaceStability(Level.Volatile)]
35+
public const string EnableAppTelemetry = "enable_app_telemetry";
36+
3437
[InterfaceStability(Level.Volatile)]
3538
public const string AppTelemetryEndpoint = "app_telemetry_endpoint";
3639

src/Couchbase/ClusterOptions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ public string? ConnectionString
162162
{
163163
ConnectionStringValue.RandomizeSeedHosts = randomizeSeedNodes;
164164
}
165+
if (ConnectionStringValue.TryGetParameter(CStringParams.EnableAppTelemetry, out bool enableAppTelemetry))
166+
{
167+
AppTelemetry.Enabled = enableAppTelemetry;
168+
}
165169
if (ConnectionStringValue.TryGetParameter(CStringParams.AppTelemetryEndpoint, out string appTelemetryEndpoint))
166170
{
167171
AppTelemetry.Endpoint = new Uri(appTelemetryEndpoint);

src/Couchbase/Core/ClusterNode.cs

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ internal sealed partial class ClusterNode : IClusterNode, IConnectionInitializer
6363
private string _bucketName = BucketConfig.GlobalBucketName;
6464
private IBucket _owner;
6565
private readonly IOperationConfigurator _operationConfigurator;
66-
private readonly IAppTelemetryCollector _appTelemetryCollector;
6766

6867
public ClusterNode(ClusterContext context, IConnectionPoolFactory connectionPoolFactory, ILogger<ClusterNode> logger,
6968
ObjectPool<OperationBuilder> operationBuilderPool, ICircuitBreaker circuitBreaker, ISaslMechanismFactory saslMechanismFactory,
@@ -79,7 +78,6 @@ public ClusterNode(ClusterContext context, IConnectionPoolFactory connectionPool
7978
_tracer = tracer;
8079
_operationConfigurator = operationConfigurator;
8180
EndPoint = endPoint;
82-
_appTelemetryCollector = _context.ServiceProvider.GetRequiredService<IAppTelemetryCollector>();
8381

8482
try
8583
{
@@ -609,15 +607,16 @@ private Task<ResponseStatus> ExecuteOpImmediatelyAsync(IConnectionPool connectio
609607
private async Task<ResponseStatus> ExecuteOp(Func<IOperation, object, CancellationToken, Task> sender, IOperation op, object state, CancellationTokenPair tokenPair = default)
610608
{
611609
LogKvExecutingOperation(op.OpCode, _redactor.SystemData(EndPoint), _redactor.UserData(op.Key), op.Opaque, op.ConfigVersion);
612-
var operationStopwatch = _appTelemetryCollector?.StartNewLightweightStopwatch();
610+
var operationStopwatch = LightweightStopwatch.StartNew();
613611
TimeSpan? operationLatency;
612+
var appTelemetryRequestType = AppTelemetryUtils.GetAppTelemetryKvRequestType(op);
614613

615614
try
616615
{
617-
operationStopwatch?.Restart();
616+
operationStopwatch.Restart();
618617
// Await the send in case the send throws an exception (i.e. SendQueueFullException)
619618
await sender(op, state, tokenPair).ConfigureAwait(false);
620-
operationLatency = operationStopwatch?.Elapsed;
619+
operationLatency = operationStopwatch.Elapsed;
621620

622621
ResponseStatus status;
623622
using (new OperationCancellationRegistration(op, tokenPair))
@@ -631,15 +630,18 @@ private async Task<ResponseStatus> ExecuteOp(Func<IOperation, object, Cancellati
631630
{
632631
LogKvOperationCompleted(op.OpCode, _redactor.SystemData(EndPoint), _redactor.UserData(op.Key), op.Opaque, op.ConfigVersion);
633632

634-
_appTelemetryCollector?.IncrementMetrics(
635-
operationLatency,
636-
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
637-
_nodesAdapter?.AlternateHostname,
638-
NodeUuid,
639-
AppTelemetryServiceType.KeyValue,
640-
AppTelemetryCounterType.Total,
641-
AppTelemetryUtils.GetAppTelemetryKvRequestType(op),
642-
op.BucketName);
633+
if (appTelemetryRequestType.HasValue)
634+
{
635+
MetricTracker.AppTelemetry.TrackOperation(
636+
operationLatency,
637+
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
638+
_nodesAdapter?.AlternateHostname,
639+
NodeUuid,
640+
AppTelemetryServiceType.KeyValue,
641+
AppTelemetryCounterType.Total,
642+
appTelemetryRequestType.Value,
643+
op.BucketName);
644+
}
643645

644646
return status;
645647
}
@@ -718,12 +720,25 @@ private async Task<ResponseStatus> ExecuteOp(Func<IOperation, object, Cancellati
718720
op.LogOrphaned();
719721
}
720722

723+
if (appTelemetryRequestType.HasValue)
724+
{
725+
MetricTracker.AppTelemetry.TrackOperation(
726+
operationLatency,
727+
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
728+
_nodesAdapter?.AlternateHostname,
729+
NodeUuid,
730+
AppTelemetryServiceType.KeyValue,
731+
AppTelemetryCounterType.Total,
732+
appTelemetryRequestType.Value,
733+
op.BucketName);
734+
}
735+
721736
return status;
722737
}
723738
catch (OperationCanceledException ex)
724739
{
725740
//Recording operation time if it failed
726-
operationLatency = operationStopwatch?.Elapsed;
741+
operationLatency = operationStopwatch.Elapsed;
727742

728743
// Timeout handling logic is also in RetryOrchestrator, however this method can also be reached without
729744
// passing through RetryOrchestrator for cases like diagnostics or bootstrapping. Therefore, we need the logic
@@ -742,17 +757,18 @@ private async Task<ResponseStatus> ExecuteOp(Func<IOperation, object, Cancellati
742757
LogKvOperationTimeout(_redactor.SystemData(EndPoint), op.OpCode, _redactor.UserData(op.Key), op.Opaque, op.ConfigVersion, op.IsSent);
743758
MetricTracker.KeyValue.TrackTimeout(op.OpCode);
744759

745-
_appTelemetryCollector?.IncrementMetrics(
746-
operationLatency,
747-
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
748-
_nodesAdapter?.AlternateHostname,
749-
NodeUuid,
750-
AppTelemetryServiceType.KeyValue,
751-
AppTelemetryCounterType.TimedOut,
752-
AppTelemetryUtils.GetAppTelemetryKvRequestType(op),
753-
op.BucketName);
754-
755-
760+
if (appTelemetryRequestType.HasValue)
761+
{
762+
MetricTracker.AppTelemetry.TrackOperation(
763+
operationLatency,
764+
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
765+
_nodesAdapter?.AlternateHostname,
766+
NodeUuid,
767+
AppTelemetryServiceType.KeyValue,
768+
AppTelemetryCounterType.TimedOut,
769+
appTelemetryRequestType.Value,
770+
op.BucketName);
771+
}
756772

757773
// If this wasn't an externally requested cancellation, it's a timeout, so convert to a TimeoutException
758774
ThrowHelper.ThrowTimeoutException(op, ex, _redactor, new KeyValueErrorContext
@@ -771,15 +787,18 @@ private async Task<ResponseStatus> ExecuteOp(Func<IOperation, object, Cancellati
771787
});
772788
}
773789

774-
_appTelemetryCollector?.IncrementMetrics(
775-
operationLatency,
776-
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
777-
_nodesAdapter?.AlternateHostname,
778-
NodeUuid,
779-
AppTelemetryServiceType.KeyValue,
780-
AppTelemetryCounterType.Canceled,
781-
AppTelemetryUtils.GetAppTelemetryKvRequestType(op),
782-
op.BucketName);
790+
if (appTelemetryRequestType.HasValue)
791+
{
792+
MetricTracker.AppTelemetry.TrackOperation(
793+
operationLatency,
794+
_nodesAdapter?.CanonicalHostname ?? EndPoint.Host,
795+
_nodesAdapter?.AlternateHostname,
796+
NodeUuid,
797+
AppTelemetryServiceType.KeyValue,
798+
AppTelemetryCounterType.Canceled,
799+
appTelemetryRequestType.Value,
800+
op.BucketName);
801+
}
783802

784803
throw;
785804
}

src/Couchbase/Core/Configuration/Server/BucketConfig.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,10 @@ internal class BucketConfig : IEquatable<BucketConfig>, IJsonOnDeserialized
258258
internal ClusterLabels ClusterLabels = new();
259259
private List<NodesExt> _nodesExt = new();
260260

261-
private List<NodesExt> NodesWithAppTelemetry => NodesExt
262-
.Where(n => !string.IsNullOrEmpty(n.AppTelemetryPath))
263-
.ToList();
261+
internal List<NodesExt> NodesWithAppTelemetry =>
262+
field ??= NodesExt
263+
.Where(n => !string.IsNullOrEmpty(n.AppTelemetryPath))
264+
.ToList().Shuffle();
264265

265266
internal Uri GetAppTelemetryPath(int attempt, bool? tlsEnabled = false)
266267
{

src/Couchbase/Core/Diagnostics/Metrics/AppTelemetry/AppTelemetryCollector.cs

Lines changed: 35 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,28 @@
11
using System;
22
using System.Collections.Concurrent;
3-
using System.Linq;
43
using System.Text;
54
using System.Threading;
5+
using Couchbase.Core.Compatibility;
66
using Couchbase.Core.DI;
77
using Couchbase.Core.IO.Authentication.Authenticators;
88
using Couchbase.Core.Logging;
9-
using Couchbase.Utils;
109
using Microsoft.Extensions.Logging;
1110

1211
namespace Couchbase.Core.Diagnostics.Metrics.AppTelemetry;
1312
#nullable enable
1413

14+
[InterfaceStability(Level.Volatile)]
1515
internal class AppTelemetryCollector : IAppTelemetryCollector
1616
{
17-
private bool _enabled;
17+
private volatile bool _enabled;
1818
private ILogger<AppTelemetryCollector>? _logger;
1919
private IRedactor? _redactor;
2020
private WebSocketClientHandler? _webSocketClientHandler;
21-
private readonly object _enableLock = new();
22-
private readonly object _metricsLock = new();
2321
private readonly Uri? _endpoint;
2422
private CancellationTokenSource? _webSocketTokenSource;
2523
private ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> _metricSets = new();
2624

25+
//Shim for unit tests
2726
internal ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> MetricSets => _metricSets;
2827

2928
public AppTelemetryCollector()
@@ -52,7 +51,7 @@ public void Initialize()
5251
_logger = ClusterContext.ServiceProvider.GetRequiredService<ILogger<AppTelemetryCollector>>();
5352
}
5453

55-
if (_enabled)
54+
if (_enabled && EndpointCount > 0)
5655
{
5756
_webSocketClientHandler = new WebSocketClientHandler(this);
5857
Enable();
@@ -62,25 +61,19 @@ public void Initialize()
6261

6362
public void Enable()
6463
{
65-
lock (_enableLock)
66-
{
67-
_enabled = true;
68-
_webSocketTokenSource = new CancellationTokenSource();
69-
_ = _webSocketClientHandler?.StartAsync(_webSocketTokenSource.Token);
70-
}
64+
_enabled = true;
65+
MetricTracker.AppTelemetry.Register(this);
66+
_webSocketTokenSource = new CancellationTokenSource();
67+
_ = _webSocketClientHandler?.StartAsync(_webSocketTokenSource.Token);
7168
}
7269

7370
public void Disable()
7471
{
75-
lock (_enableLock)
76-
{
77-
_enabled = false;
78-
_webSocketTokenSource?.Cancel();
79-
}
80-
lock (_metricsLock)
81-
{
82-
_metricSets = new ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet>();
83-
}
72+
_enabled = false;
73+
MetricTracker.AppTelemetry.Unregister();
74+
_webSocketTokenSource?.Cancel();
75+
// Writers that already hold a metricSet reference will complete into the old dict, avoid orphaning in-flight writes.
76+
Interlocked.Exchange(ref _metricSets, new ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet>());
8477
}
8578

8679
public ClusterContext? ClusterContext { get; set; }
@@ -98,91 +91,58 @@ public void Disable()
9891
/// </summary>
9992
public Uri? Endpoint(int attempt) => _endpoint ?? ClusterContext?.GlobalConfig?.GetAppTelemetryPath(attempt, TlsEnabled);
10093

101-
public void IncrementMetrics(TimeSpan? operationLatency, string node, string? alternateNode, string nodeUuid,
94+
public int EndpointCount => _endpoint != null ? 1 : ClusterContext?.GlobalConfig?.NodesWithAppTelemetry.Count ?? 0;
95+
96+
public void IncrementMetrics(TimeSpan? operationLatency, string? node, string? alternateNode, string? nodeUuid,
10297
AppTelemetryServiceType serviceType,
10398
AppTelemetryCounterType counterType,
10499
AppTelemetryRequestType? requestType = null,
105100
string? bucket = null)
106101
{
107102
if (!_enabled) return;
108-
if (string.IsNullOrEmpty(nodeUuid)) return; //Do not capture operations before a config is fetched
109-
if (!operationLatency.HasValue) return;
103+
if (string.IsNullOrEmpty(nodeUuid)) return;
110104

111105
requestType ??= AppTelemetryUtils.DetermineAppTelemetryRequestType(serviceType);
112106

113-
//Only incrementing histograms for successful operations.
114-
//Timeouts and Cancellations histograms should only be incremented if an orphan is received, with its true latency.
115-
if (counterType == AppTelemetryCounterType.Total)
116-
{
117-
IncrementHistogram(requestType.Value, operationLatency, node, alternateNode, nodeUuid, bucket);
118-
}
119-
IncrementCounter(serviceType, counterType, node, alternateNode, nodeUuid, bucket);
120-
}
121-
122-
public void IncrementHistogram(AppTelemetryRequestType name, TimeSpan? operationLatency, string node, string? alternateNode, string nodeUuid, string? bucket = null)
123-
{
124-
if (!_enabled) return;
125-
if (!operationLatency.HasValue) return;
126-
127-
var targetKey = new NodeAndBucket(node, alternateNode, nodeUuid, bucket);
107+
var targetKey = new NodeAndBucket(node ?? string.Empty, alternateNode, nodeUuid, bucket);
108+
var dict = Volatile.Read(ref _metricSets);
109+
var metricSet = dict.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
128110

129-
AppTelemetryMetricSet metricSet;
130-
lock (_metricsLock)
111+
if (counterType == AppTelemetryCounterType.Total && operationLatency.HasValue)
131112
{
132-
metricSet = _metricSets.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
113+
metricSet.IncrementHistogram(requestType.Value, operationLatency.Value);
133114
}
134115

135-
metricSet.IncrementHistogram(name, operationLatency.Value);
136-
}
137-
138-
public void IncrementCounter(AppTelemetryServiceType serviceType, AppTelemetryCounterType counterType, string node, string? alternateNode, string nodeUuid, string? bucket = null)
139-
{
140-
if (!_enabled) return;
141-
if (serviceType is AppTelemetryServiceType.KeyValue && bucket is null) return;
142-
143-
var targetKey = new NodeAndBucket(node, alternateNode, nodeUuid, bucket);
116+
// KV counters require a bucket
117+
if (serviceType == AppTelemetryServiceType.KeyValue && bucket == null) return;
144118

145-
AppTelemetryMetricSet metricSet;
146-
lock (_metricsLock)
147-
{
148-
metricSet = _metricSets.GetOrAdd(targetKey, _ => new AppTelemetryMetricSet());
149-
}
150119
metricSet.IncrementCounter(serviceType, counterType);
151120
}
152121

153122
public bool TryExportMetricsAndReset(out string metricsString)
154123
{
155124
metricsString = string.Empty;
156125

157-
ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet> oldMetrics;
158-
lock (_metricsLock)
159-
{
160-
if (_metricSets.IsEmpty) return false;
161-
oldMetrics = _metricSets;
162-
_metricSets = new ConcurrentDictionary<NodeAndBucket, AppTelemetryMetricSet>();
163-
}
126+
var dict = Volatile.Read(ref _metricSets);
127+
if (dict.IsEmpty) return false;
164128

165129
var sb = new StringBuilder();
166-
foreach (var exported in oldMetrics.Select(entry => entry.Value.ExportAllMetrics(entry.Key)))
130+
foreach (var entry in dict)
167131
{
168-
sb.Append(exported);
132+
var exported = entry.Value.ExportAllMetrics(entry.Key);
133+
if (!string.IsNullOrEmpty(exported))
134+
{
135+
sb.Append(exported);
136+
}
169137
}
170138

171139
metricsString = sb.ToString();
172-
return true;
173-
}
174-
175-
public LightweightStopwatch? StartNewLightweightStopwatch()
176-
{
177-
if (_enabled)
178-
{
179-
return LightweightStopwatch.StartNew();
180-
}
181-
return null;
140+
return metricsString.Length > 0;
182141
}
183142

184143
public void Dispose()
185144
{
145+
MetricTracker.AppTelemetry.Unregister();
186146
_webSocketTokenSource?.Cancel();
187147
_webSocketClientHandler?.Dispose();
188148
}

0 commit comments

Comments
 (0)