Skip to content

Commit c715dd6

Browse files
committed
Add telemetry to the forwarder.
1 parent 597f615 commit c715dd6

8 files changed

Lines changed: 555 additions & 42 deletions

File tree

dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,29 @@
1212
import java.util.concurrent.locks.Condition;
1313
import java.util.concurrent.locks.Lock;
1414
import java.util.concurrent.locks.ReentrantLock;
15+
import java.util.function.LongSupplier;
1516

1617
class BoundedQueue {
17-
// Key represents a tuple of integers (tries, clock).
18+
// Key represents a tuple of integers (tries, clock). enqueuedAtNanos records when the
19+
// payload first entered the queue (in System.nanoTime() units) and is preserved across
20+
// requeues so that "age of oldest item" remains accurate after retries.
1821
static class Key implements Comparable<Key> {
1922
final long tries;
2023
final long clock;
24+
final long enqueuedAtNanos;
2125

2226
Key(long clock) {
23-
this.tries = 0;
24-
this.clock = clock;
27+
this(0, clock, System.nanoTime());
2528
}
2629

27-
private Key(long tries, long clock) {
30+
private Key(long tries, long clock, long enqueuedAtNanos) {
2831
this.tries = tries;
2932
this.clock = clock;
33+
this.enqueuedAtNanos = enqueuedAtNanos;
3034
}
3135

3236
Key next() {
33-
return new Key(tries + 1, clock);
37+
return new Key(tries + 1, clock, enqueuedAtNanos);
3438
}
3539

3640
@Override
@@ -52,17 +56,40 @@ public int compareTo(Key o) {
5256

5357
final TreeMap<Key, byte[]> items = new TreeMap<>();
5458

55-
long droppedItems;
56-
long droppedBytes;
59+
final Telemetry telemetry;
60+
final LongSupplier nanos;
5761

5862
Lock lock = new ReentrantLock();
5963
Condition notEmpty = lock.newCondition();
6064
Condition notFull = lock.newCondition();
6165

6266
BoundedQueue(long maxBytes, long maxTries, WhenFull whenFull) {
67+
this(maxBytes, maxTries, whenFull, null);
68+
}
69+
70+
BoundedQueue(long maxBytes, long maxTries, WhenFull whenFull, Telemetry telemetry) {
71+
this(maxBytes, maxTries, whenFull, telemetry, System::nanoTime);
72+
}
73+
74+
BoundedQueue(
75+
long maxBytes,
76+
long maxTries,
77+
WhenFull whenFull,
78+
Telemetry telemetry,
79+
LongSupplier nanos) {
6380
this.maxBytes = maxBytes;
6481
this.maxTries = maxTries;
6582
this.whenFull = whenFull;
83+
this.telemetry = telemetry;
84+
this.nanos = nanos;
85+
}
86+
87+
long droppedPayloads;
88+
long droppedBytes;
89+
90+
private void recordDrop(long bytes) {
91+
droppedPayloads++;
92+
droppedBytes += bytes;
6693
}
6794

6895
void add(byte[] item) throws InterruptedException {
@@ -72,8 +99,7 @@ void add(byte[] item) throws InterruptedException {
7299
void requeue(Map.Entry<Key, byte[]> item) throws InterruptedException {
73100
Key nextKey = item.getKey().next();
74101
if (nextKey.tries > maxTries) {
75-
droppedItems++;
76-
droppedBytes += item.getValue().length;
102+
telemetry.onDrop(1, item.getValue().length);
77103
return;
78104
}
79105
put(nextKey, item.getValue(), WhenFull.DROP);
@@ -82,7 +108,7 @@ void requeue(Map.Entry<Key, byte[]> item) throws InterruptedException {
82108
// Must be called when lock is held.
83109
private Key newKey() {
84110
clock++;
85-
return new Key(clock);
111+
return new Key(0, clock, nanos.getAsLong());
86112
}
87113

88114
private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedException {
@@ -96,7 +122,13 @@ private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedExce
96122
bytes += item.length;
97123
notEmpty.signal();
98124
} finally {
125+
long droppedPayloads = this.droppedPayloads;
126+
long droppedBytes = this.droppedBytes;
127+
this.droppedPayloads = 0;
128+
this.droppedBytes = 0;
99129
lock.unlock();
130+
// Avoid potential lock ordering issues.
131+
telemetry.onDrop(droppedPayloads, droppedBytes);
100132
}
101133
}
102134

@@ -108,9 +140,8 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept
108140
switch (whenFull) {
109141
case DROP:
110142
Map.Entry<Key, byte[]> last = items.pollLastEntry();
111-
droppedItems++;
112-
droppedBytes += last.getValue().length;
113143
bytes -= last.getValue().length;
144+
recordDrop(last.getValue().length);
114145
break;
115146
case BLOCK:
116147
notFull.await();
@@ -133,4 +164,23 @@ Map.Entry<Key, byte[]> next() throws InterruptedException {
133164
lock.unlock();
134165
}
135166
}
167+
168+
void snapshot(long now, Telemetry.Snapshot s) {
169+
lock.lock();
170+
try {
171+
long oldestAge = 0L;
172+
for (Key k : items.keySet()) {
173+
long age = now - k.enqueuedAtNanos;
174+
if (age > oldestAge) {
175+
oldestAge = age;
176+
}
177+
}
178+
s.queuePayloads = items.size();
179+
s.queueBytes = bytes;
180+
s.queueMaxBytes = maxBytes;
181+
s.oldestEnqueuedAgeNanos = oldestAge;
182+
} finally {
183+
lock.unlock();
184+
}
185+
}
136186
}

dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class Forwarder extends Thread {
3939
String localData;
4040
String externalData;
4141

42-
int responseOk, responseBadRequest, responseOther;
42+
final Telemetry telemetry;
4343

4444
/**
4545
* Creates a new forwarder targeting the given URL.
@@ -60,7 +60,8 @@ public Forwarder(
6060
Duration connectTimeout,
6161
Duration requestTimeout) {
6262
this.url = url;
63-
this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull);
63+
this.telemetry = new Telemetry();
64+
this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull, this.telemetry);
6465
this.requestTimeout = requestTimeout;
6566
this.client =
6667
HttpClient.newBuilder()
@@ -69,6 +70,14 @@ public Forwarder(
6970
.build();
7071
}
7172

73+
/**
74+
* Captures a snapshot of the forwarder's telemetry counters and queue state, clearing delta
75+
* counters so subsequent snapshots report activity since this call.
76+
*/
77+
public Telemetry.Snapshot snapshot() {
78+
return telemetry.snapshot(queue);
79+
}
80+
7281
/** Runs the forwarding loop, delivering queued payloads until the thread is interrupted. */
7382
@Override
7483
public void run() {
@@ -92,6 +101,7 @@ public void run() {
92101
* ({@link WhenFull#BLOCK} mode only)
93102
*/
94103
public void send(byte[] payload) throws InterruptedException {
104+
telemetry.onEnqueue(payload.length);
95105
queue.add(payload);
96106
}
97107

@@ -119,23 +129,24 @@ void runOnce(Map.Entry<BoundedQueue.Key, byte[]> item) throws InterruptedExcepti
119129
logger.log(
120130
Level.INFO, "response {0}: {1}", new Object[] {res.statusCode(), res.body()});
121131

122-
switch (res.statusCode()) {
132+
int code = res.statusCode();
133+
switch (code) {
123134
case 400:
124-
responseBadRequest++;
135+
telemetry.onResponse(code, payload.length, false);
125136
decreaseBackoff();
126137
break;
127138
case 200:
128-
responseOk++;
139+
telemetry.onResponse(code, payload.length, true);
129140
decreaseBackoff();
130141
break;
131142
default:
132-
responseOther++;
143+
telemetry.onResponse(code, payload.length, false);
133144
increaseBackoff();
134145
queue.requeue(item);
135146
}
136147
} catch (IOException ex) {
137148
logger.log(Level.WARNING, "error sending request: {0}", ex.toString());
138-
responseOther++;
149+
telemetry.onTransportError(payload.length);
139150
increaseBackoff();
140151
queue.requeue(item);
141152
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/* Unless explicitly stated otherwise all files in this repository are
2+
* licensed under the Apache 2.0 License.
3+
*
4+
* This product includes software developed at Datadog
5+
* (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
6+
*/
7+
8+
package com.datadoghq.dogstatsd.http.forwarder;
9+
10+
/** Canonical telemetry labels for HTTP response codes. */
11+
final class HttpCode {
12+
13+
private static final String[] CATEGORIES = {"0xx", "1xx", "2xx", "3xx", "4xx", "5xx"};
14+
15+
private static final String[] NAMES = new String[506];
16+
17+
static {
18+
for (int i = 0; i < NAMES.length; i++) {
19+
NAMES[i] = CATEGORIES[i / 100];
20+
}
21+
NAMES[0] = "0";
22+
NAMES[200] = "200";
23+
NAMES[400] = "400";
24+
NAMES[403] = "403";
25+
NAMES[404] = "404";
26+
NAMES[429] = "429";
27+
NAMES[500] = "500";
28+
NAMES[501] = "501";
29+
NAMES[502] = "502";
30+
NAMES[503] = "503";
31+
NAMES[504] = "504";
32+
NAMES[505] = "505";
33+
}
34+
35+
private HttpCode() {}
36+
37+
static String name(int code) {
38+
if (code >= 0 && code < NAMES.length) {
39+
return NAMES[code];
40+
}
41+
int category = code / 100;
42+
if (category >= 0 && category < CATEGORIES.length) {
43+
return CATEGORIES[category];
44+
}
45+
return "xxx";
46+
}
47+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/* Unless explicitly stated otherwise all files in this repository are
2+
* licensed under the Apache 2.0 License.
3+
*
4+
* This product includes software developed at Datadog
5+
* (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
6+
*/
7+
8+
package com.datadoghq.dogstatsd.http.forwarder;
9+
10+
import java.time.Clock;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import java.util.function.LongSupplier;
14+
15+
/** Thread-safe counter store for {@link Forwarder} telemetry. */
16+
public class Telemetry {
17+
18+
/** HTTP status code used to record transport-level (no-response) errors. */
19+
public static final int TRANSPORT_ERROR_CODE = 0;
20+
21+
/** Point-in-time view of cumulative counters and queue state. */
22+
public static final class Snapshot {
23+
/**
24+
* Wall-clock time (Unix epoch milliseconds) at the start of the interval covered by this
25+
* snapshot — i.e., the moment of the previous snapshot, or telemetry construction if this
26+
* is the first snapshot.
27+
*/
28+
public long intervalStartMillis;
29+
30+
public long enqueuedPayloads;
31+
public long deliveredPayloads;
32+
public long enqueuedBytes;
33+
public long deliveredBytes;
34+
public long queuePayloads;
35+
public long queueBytes;
36+
public long queueMaxBytes;
37+
public long droppedPayloads;
38+
public long droppedBytes;
39+
40+
/** Nanos elapsed since the oldest queued item was enqueued; {@code 0} if queue is empty. */
41+
public long oldestEnqueuedAgeNanos;
42+
43+
/** Nanos elapsed since the last successful submission; {@code 0} if none yet. */
44+
public long lastSuccessAgeNanos;
45+
46+
/** Totals keyed by HTTP code. */
47+
public Map<String, CodeCounters> byCode = new HashMap<>();
48+
49+
Snapshot(long intervalStartMillis) {
50+
this.intervalStartMillis = intervalStartMillis;
51+
}
52+
53+
/** Per-code totals within a snapshot's window. */
54+
public static final class CodeCounters {
55+
public long payloads;
56+
public long bytes;
57+
}
58+
}
59+
60+
private final Clock clock;
61+
private final LongSupplier nanos;
62+
63+
private Snapshot current;
64+
65+
private long lastSuccessNanos;
66+
private boolean everDelivered;
67+
68+
public Telemetry() {
69+
this(Clock.systemUTC(), System::nanoTime);
70+
}
71+
72+
Telemetry(Clock clock, LongSupplier nanos) {
73+
this.clock = clock;
74+
this.nanos = nanos;
75+
this.current = new Snapshot(clock.millis());
76+
}
77+
78+
synchronized void onEnqueue(int len) {
79+
current.enqueuedPayloads++;
80+
current.enqueuedBytes += len;
81+
}
82+
83+
synchronized void onResponse(int code, int len, boolean delivered) {
84+
Snapshot.CodeCounters c =
85+
current.byCode.computeIfAbsent(
86+
HttpCode.name(code), k -> new Snapshot.CodeCounters());
87+
c.payloads++;
88+
c.bytes += len;
89+
if (delivered) {
90+
current.deliveredPayloads++;
91+
current.deliveredBytes += len;
92+
lastSuccessNanos = nanos.getAsLong();
93+
everDelivered = true;
94+
}
95+
}
96+
97+
synchronized void onTransportError(int len) {
98+
onResponse(TRANSPORT_ERROR_CODE, len, false);
99+
}
100+
101+
/** Records a dropped payload. */
102+
synchronized void onDrop(long payloads, long bytes) {
103+
current.droppedPayloads += payloads;
104+
current.droppedBytes += bytes;
105+
}
106+
107+
/**
108+
* Captures a snapshot using the supplied queue stats, then swaps in a fresh accumulator so
109+
* subsequent snapshots report deltas since this call.
110+
*/
111+
public synchronized Snapshot snapshot(BoundedQueue q) {
112+
long now = nanos.getAsLong();
113+
Snapshot s = current;
114+
current = new Snapshot(clock.millis());
115+
s.lastSuccessAgeNanos = everDelivered ? now - lastSuccessNanos : 0L;
116+
if (q != null) {
117+
q.snapshot(now, s);
118+
}
119+
return s;
120+
}
121+
}

0 commit comments

Comments
 (0)