Skip to content

Commit 71aa16f

Browse files
authored
Merge pull request #313 from DataDog/vickenty/fup
AGTMETRICS-489 Refactor forwarder to handle multiple URLs
2 parents db70d84 + 8dbbe44 commit 71aa16f

6 files changed

Lines changed: 113 additions & 86 deletions

File tree

dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public int compareTo(Key o) {
5454
final long maxTries;
5555
final WhenFull whenFull;
5656

57-
final TreeMap<Key, byte[]> items = new TreeMap<>();
57+
final TreeMap<Key, Payload> items = new TreeMap<>();
5858

5959
final Telemetry telemetry;
6060
final LongSupplier nanos;
@@ -88,14 +88,14 @@ private void recordDrop(long bytes) {
8888
droppedBytes += bytes;
8989
}
9090

91-
void add(byte[] item) throws InterruptedException {
91+
void add(Payload item) throws InterruptedException {
9292
put(null, item, whenFull);
9393
}
9494

95-
void requeue(Map.Entry<Key, byte[]> item) throws InterruptedException {
95+
void requeue(Map.Entry<Key, Payload> item) throws InterruptedException {
9696
Key nextKey = item.getKey().next();
9797
if (nextKey.tries > maxTries) {
98-
telemetry.onDrop(1, item.getValue().length);
98+
telemetry.onDrop(1, item.getValue().bytes.length);
9999
return;
100100
}
101101
put(nextKey, item.getValue(), WhenFull.DROP);
@@ -107,15 +107,15 @@ private Key newKey() {
107107
return new Key(0, clock, nanos.getAsLong());
108108
}
109109

110-
private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedException {
110+
private void put(Key key, Payload item, WhenFull whenFull) throws InterruptedException {
111111
lock.lock();
112112
try {
113113
if (key == null) {
114114
key = newKey();
115115
}
116-
ensureSpace(item.length, whenFull);
116+
ensureSpace(item.bytes.length, whenFull);
117117
items.put(key, item);
118-
bytes += item.length;
118+
bytes += item.bytes.length;
119119
notEmpty.signal();
120120
} finally {
121121
long droppedPayloads = this.droppedPayloads;
@@ -135,9 +135,9 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept
135135
while (bytes + length > maxBytes) {
136136
switch (whenFull) {
137137
case DROP:
138-
Map.Entry<Key, byte[]> last = items.pollLastEntry();
139-
bytes -= last.getValue().length;
140-
recordDrop(last.getValue().length);
138+
Map.Entry<Key, Payload> last = items.pollLastEntry();
139+
recordDrop(last.getValue().bytes.length);
140+
bytes -= last.getValue().bytes.length;
141141
break;
142142
case BLOCK:
143143
notFull.await();
@@ -146,14 +146,14 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept
146146
}
147147
}
148148

149-
Map.Entry<Key, byte[]> next() throws InterruptedException {
149+
Map.Entry<Key, Payload> next() throws InterruptedException {
150150
lock.lock();
151151
try {
152152
while (items.size() == 0) {
153153
notEmpty.await();
154154
}
155-
Map.Entry<Key, byte[]> item = items.pollFirstEntry();
156-
bytes -= item.getValue().length;
155+
Map.Entry<Key, Payload> item = items.pollFirstEntry();
156+
bytes -= item.getValue().bytes.length;
157157
notFull.signalAll();
158158
return item;
159159
} finally {

dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,13 @@
2424
/**
2525
* An HTTP forwarder that delivers DogStatsD HTTP payloads to a remote endpoint.
2626
*
27-
* <p>Payloads are enqueued via {@link #send(byte[])} and delivered asynchronously by a background
28-
* thread. Failed requests are retried with exponential back-off up to {@code maxTries} attempts
29-
* before being discarded.
27+
* <p>Payloads are enqueued via {@link #send(URI, byte[])} and delivered asynchronously by a
28+
* background thread. Failed requests are retried with exponential back-off up to {@code maxTries}
29+
* attempts before being discarded.
3030
*/
3131
public class Forwarder extends Thread {
3232
static final Logger logger = Logger.getLogger(Forwarder.class.getName());
3333
final BoundedQueue queue;
34-
final URI url;
3534
final HttpClient client;
3635
final Duration requestTimeout;
3736
final Random rng = new Random();
@@ -42,9 +41,8 @@ public class Forwarder extends Thread {
4241
final Telemetry telemetry;
4342

4443
/**
45-
* Creates a new forwarder targeting the given URL.
44+
* Creates a new forwarder.
4645
*
47-
* @param url the remote HTTP endpoint to POST payloads to
4846
* @param maxRequestsBytes maximum total size of buffered payloads, in bytes
4947
* @param maxTries maximum number of delivery attempts per payload
5048
* @param whenFull action to take when the queue is at capacity
@@ -53,13 +51,11 @@ public class Forwarder extends Thread {
5351
* {@code null} disables the request timeout
5452
*/
5553
public Forwarder(
56-
URI url,
5754
long maxRequestsBytes,
5855
long maxTries,
5956
WhenFull whenFull,
6057
Duration connectTimeout,
6158
Duration requestTimeout) {
62-
this.url = url;
6359
this.telemetry = new Telemetry();
6460
this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull, this.telemetry);
6561
this.requestTimeout = requestTimeout;
@@ -91,26 +87,30 @@ public void run() {
9187
}
9288

9389
/**
94-
* Enqueues a payload for delivery to the remote endpoint.
90+
* Enqueues a payload for delivery to the given endpoint.
9591
*
9692
* <p>If the queue is full, behaviour is determined by the {@link WhenFull} policy supplied at
9793
* construction time.
9894
*
95+
* @param url the remote HTTP endpoint to POST the payload to
9996
* @param payload the raw bytes to deliver
10097
* @throws InterruptedException if the calling thread is interrupted while waiting for space
10198
* ({@link WhenFull#BLOCK} mode only)
10299
*/
103-
public void send(byte[] payload) throws InterruptedException {
104-
queue.add(payload);
100+
public void send(URI url, byte[] payload) throws InterruptedException {
101+
queue.add(new Payload(url, payload));
105102
telemetry.onEnqueue(payload.length);
106103
}
107104

108-
void runOnce(Map.Entry<BoundedQueue.Key, byte[]> item) throws InterruptedException {
109-
byte[] payload = item.getValue();
110-
logger.log(Level.INFO, "sending {0} bytes", payload.length);
105+
void runOnce(Map.Entry<BoundedQueue.Key, Payload> item) throws InterruptedException {
106+
Payload payload = item.getValue();
107+
logger.log(
108+
Level.INFO,
109+
"sending {0} bytes to {1}",
110+
new Object[] {payload.bytes.length, payload.url});
111111

112112
HttpRequest.Builder builder =
113-
HttpRequest.newBuilder(url).POST(BodyPublishers.ofByteArray(payload));
113+
HttpRequest.newBuilder(payload.url).POST(BodyPublishers.ofByteArray(payload.bytes));
114114
if (requestTimeout != null) {
115115
builder.timeout(requestTimeout);
116116
}
@@ -138,9 +138,9 @@ void runOnce(Map.Entry<BoundedQueue.Key, byte[]> item) throws InterruptedExcepti
138138
backoff();
139139
}
140140

141-
void handleResponse(int code, Map.Entry<BoundedQueue.Key, byte[]> item)
141+
void handleResponse(int code, Map.Entry<BoundedQueue.Key, Payload> item)
142142
throws InterruptedException {
143-
int len = item.getValue().length;
143+
int len = item.getValue().bytes.length;
144144
switch (code) {
145145
case 400:
146146
telemetry.onResponse(code, len, false);
@@ -158,9 +158,9 @@ void handleResponse(int code, Map.Entry<BoundedQueue.Key, byte[]> item)
158158
}
159159
}
160160

161-
void handleTransportError(Map.Entry<BoundedQueue.Key, byte[]> item)
161+
void handleTransportError(Map.Entry<BoundedQueue.Key, Payload> item)
162162
throws InterruptedException {
163-
telemetry.onTransportError(item.getValue().length);
163+
telemetry.onTransportError(item.getValue().bytes.length);
164164
increaseBackoff();
165165
queue.requeue(item);
166166
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/* Unless explicitly stated otherwise all files in this repository are
2+
* licensed under the Apache 2.0 License.
3+
*
4+
* This product includes software developed at Datadog
5+
* (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
6+
*/
7+
8+
package com.datadoghq.dogstatsd.http.forwarder;
9+
10+
import java.net.URI;
11+
12+
class Payload {
13+
final URI url;
14+
final byte[] bytes;
15+
16+
Payload(URI url, byte[] bytes) {
17+
this.url = url;
18+
this.bytes = bytes;
19+
}
20+
}

0 commit comments

Comments
 (0)