-
Notifications
You must be signed in to change notification settings - Fork 331
Expand file tree
/
Copy pathDebuggerSink.java
More file actions
234 lines (208 loc) · 8.13 KB
/
DebuggerSink.java
File metadata and controls
234 lines (208 loc) · 8.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package com.datadog.debugger.sink;
import com.datadog.debugger.instrumentation.DiagnosticMessage;
import com.datadog.debugger.uploader.BatchUploader;
import com.datadog.debugger.util.DebuggerMetrics;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.debugger.DebuggerContext.SkipCause;
import datadog.trace.bootstrap.debugger.ProbeId;
import datadog.trace.util.AgentTaskScheduler;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Collects data that needs to be sent to the backend: Snapshots, metrics and statuses */
public class DebuggerSink {
private static final Logger LOGGER = LoggerFactory.getLogger(DebuggerSink.class);
private static final double FREE_CAPACITY_LOWER_THRESHOLD = 0.25;
private static final double FREE_CAPACITY_UPPER_THRESHOLD = 0.75;
private static final int LOW_RATE_MIN_FLUSH_INTERVAL = 100;
private static final int LOW_RATE_MAX_FLUSH_INTERVAL = 2000;
private static final long LOW_RATE_INITIAL_FLUSH_INTERVAL = 1000;
static final long LOW_RATE_STEP_SIZE = 200;
private static final String PREFIX = "debugger.sink.";
private final ProbeStatusSink probeStatusSink;
private final SnapshotSink snapshotSink;
private final SymbolSink symbolSink;
private final DebuggerMetrics debuggerMetrics;
private final String tags;
private final AtomicLong highRateDropped = new AtomicLong();
private final int uploadFlushInterval;
private final AgentTaskScheduler lowRateScheduler = AgentTaskScheduler.INSTANCE;
private volatile AgentTaskScheduler.Scheduled<DebuggerSink> lowRateScheduled;
private volatile AgentTaskScheduler.Scheduled<DebuggerSink> flushIntervalScheduled;
private volatile long currentLowRateFlushInterval = LOW_RATE_INITIAL_FLUSH_INTERVAL;
public DebuggerSink(Config config, ProbeStatusSink probeStatusSink) {
this(
config,
null,
DebuggerMetrics.getInstance(config),
probeStatusSink,
new SnapshotSink(
config,
null,
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
new SymbolSink(config));
}
public DebuggerSink(
Config config,
String tags,
DebuggerMetrics debuggerMetrics,
ProbeStatusSink probeStatusSink,
SnapshotSink snapshotSink,
SymbolSink symbolSink) {
this.tags = tags;
this.debuggerMetrics = debuggerMetrics;
this.probeStatusSink = probeStatusSink;
this.snapshotSink = snapshotSink;
this.symbolSink = symbolSink;
this.uploadFlushInterval = config.getDynamicInstrumentationUploadFlushInterval();
}
public void start() {
if (uploadFlushInterval == 0) {
flushIntervalScheduled =
lowRateScheduler.scheduleAtFixedRate(
this::reconsiderLowRateFlushInterval, this, 0, 200, TimeUnit.MILLISECONDS);
} else {
currentLowRateFlushInterval = uploadFlushInterval;
}
LOGGER.debug("Scheduling low rate debugger sink flush to {}ms", currentLowRateFlushInterval);
lowRateScheduled =
lowRateScheduler.scheduleAtFixedRate(
this::lowRateFlush, this, 0, currentLowRateFlushInterval, TimeUnit.MILLISECONDS);
snapshotSink.start();
}
public void stop() {
cancelSchedule(this.flushIntervalScheduled);
cancelSchedule(this.lowRateScheduled);
probeStatusSink.stop();
symbolSink.stop();
snapshotSink.stop();
}
private void cancelSchedule(AgentTaskScheduler.Scheduled<DebuggerSink> scheduled) {
if (scheduled != null) {
scheduled.cancel();
}
}
public SnapshotSink getSnapshotSink() {
return snapshotSink;
}
public ProbeStatusSink getProbeStatusSink() {
return probeStatusSink;
}
public SymbolSink getSymbolSink() {
return symbolSink;
}
public void addSnapshot(Snapshot snapshot) {
boolean added = snapshotSink.addLowRate(snapshot);
if (!added) {
debuggerMetrics.count(PREFIX + "dropped.requests", 1);
} else {
probeStatusSink.addEmitting(snapshot.getProbe().getProbeId());
}
}
public void addHighRateSnapshot(Snapshot snapshot) {
boolean added = snapshotSink.addHighRate(snapshot);
if (!added) {
long dropped = highRateDropped.incrementAndGet();
if (dropped % 100 == 0) {
debuggerMetrics.count(PREFIX + "dropped.requests", 100);
}
} else {
probeStatusSink.addEmitting(snapshot.getProbe().getProbeId());
}
}
ProbeStatusSink getProbeDiagnosticsSink() {
return probeStatusSink;
}
private void lowRateReschedule() {
cancelSchedule(this.lowRateScheduled);
LOGGER.debug("Rescheduling low rate debugger sink flush to {}ms", currentLowRateFlushInterval);
this.lowRateScheduled =
lowRateScheduler.scheduleAtFixedRate(
this::lowRateFlush,
this,
currentLowRateFlushInterval,
currentLowRateFlushInterval,
TimeUnit.MILLISECONDS);
}
// visible for testing
void lowRateFlush(DebuggerSink ignored) {
symbolSink.flush();
probeStatusSink.flush(tags);
snapshotSink.lowRateFlush(tags);
}
private void reconsiderLowRateFlushInterval(DebuggerSink debuggerSink) {
debuggerMetrics.histogram(
PREFIX + "upload.queue.remaining.capacity", snapshotSink.remainingCapacity());
debuggerMetrics.histogram(PREFIX + "current.flush.interval", currentLowRateFlushInterval);
doReconsiderLowRateFlushInterval();
}
// Depending on the remaining capacity in the upload queue, we adjust the flush interval
// to avoid filling the queue if we are waiting too long between flushes.
// We are using 2 thresholds to adjust the flush interval:
// - if the remaining capacity is below the lower threshold, we decrease the flush interval
// - if the remaining capacity is above the upper threshold, we increase the flush interval
void doReconsiderLowRateFlushInterval() {
double remainingCapacityPercent =
snapshotSink.remainingCapacity() * 1D / SnapshotSink.LOW_RATE_CAPACITY;
long currentInterval = currentLowRateFlushInterval;
long newInterval = currentInterval;
if (remainingCapacityPercent <= FREE_CAPACITY_LOWER_THRESHOLD) {
newInterval = Math.max(currentInterval - LOW_RATE_STEP_SIZE, LOW_RATE_MIN_FLUSH_INTERVAL);
} else if (remainingCapacityPercent >= FREE_CAPACITY_UPPER_THRESHOLD) {
newInterval = Math.min(currentInterval + LOW_RATE_STEP_SIZE, LOW_RATE_MAX_FLUSH_INTERVAL);
}
if (newInterval != currentInterval) {
currentLowRateFlushInterval = newInterval;
LOGGER.debug(
"Changing flush interval. Remaining available capacity in upload queue {}%, new flush interval {}ms",
remainingCapacityPercent * 100, newInterval);
lowRateReschedule();
}
}
public void addReceived(ProbeId probeId) {
probeStatusSink.addReceived(probeId);
}
public void addInstalled(ProbeId probeId) {
probeStatusSink.addInstalled(probeId);
}
public void addBlocked(ProbeId probeId) {
probeStatusSink.addBlocked(probeId);
}
public void removeDiagnostics(ProbeId probeId) {
probeStatusSink.removeDiagnostics(probeId);
}
public void addDiagnostics(ProbeId probeId, List<DiagnosticMessage> messages) {
for (DiagnosticMessage msg : messages) {
switch (msg.getKind()) {
case INFO:
LOGGER.info(msg.getMessage());
break;
case WARN:
LOGGER.warn(msg.getMessage());
break;
case ERROR:
LOGGER.error(msg.getMessage());
reportError(probeId, msg);
break;
}
}
}
private void reportError(ProbeId probeId, DiagnosticMessage msg) {
Throwable throwable = msg.getThrowable();
if (throwable != null) {
probeStatusSink.addError(probeId, throwable);
} else {
probeStatusSink.addError(probeId, msg.getMessage());
}
}
/** Notifies the snapshot was skipped for one of the SkipCause reason */
public void skipSnapshot(String probeId, SkipCause cause) {
debuggerMetrics.incrementCounter(PREFIX + "skip", cause.tag(), "probe_id:" + probeId);
}
long getCurrentLowRateFlushInterval() {
return currentLowRateFlushInterval;
}
}