Skip to content

Commit d9e5f5b

Browse files
committed
Refactor forwarder to handle multiple URLs.
Forwarder still logically represents a single destination, urls are expected to point to the same agent, differing only in the path for different payload types.
1 parent 7ac5c1a commit d9e5f5b

4 files changed

Lines changed: 90 additions & 60 deletions

File tree

dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public int compareTo(Key o) {
5050
final long maxTries;
5151
final WhenFull whenFull;
5252

53-
final TreeMap<Key, byte[]> items = new TreeMap<>();
53+
final TreeMap<Key, Payload> items = new TreeMap<>();
5454

5555
long droppedItems;
5656
long droppedBytes;
@@ -65,15 +65,15 @@ public int compareTo(Key o) {
6565
this.whenFull = whenFull;
6666
}
6767

68-
void add(byte[] item) throws InterruptedException {
68+
void add(Payload item) throws InterruptedException {
6969
put(null, item, whenFull);
7070
}
7171

72-
void requeue(Map.Entry<Key, byte[]> item) throws InterruptedException {
72+
void requeue(Map.Entry<Key, Payload> item) throws InterruptedException {
7373
Key nextKey = item.getKey().next();
7474
if (nextKey.tries > maxTries) {
7575
droppedItems++;
76-
droppedBytes += item.getValue().length;
76+
droppedBytes += item.getValue().bytes.length;
7777
return;
7878
}
7979
put(nextKey, item.getValue(), WhenFull.DROP);
@@ -85,15 +85,15 @@ private Key newKey() {
8585
return new Key(clock);
8686
}
8787

88-
private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedException {
88+
private void put(Key key, Payload item, WhenFull whenFull) throws InterruptedException {
8989
lock.lock();
9090
try {
9191
if (key == null) {
9292
key = newKey();
9393
}
94-
ensureSpace(item.length, whenFull);
94+
ensureSpace(item.bytes.length, whenFull);
9595
items.put(key, item);
96-
bytes += item.length;
96+
bytes += item.bytes.length;
9797
notEmpty.signal();
9898
} finally {
9999
lock.unlock();
@@ -107,10 +107,10 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept
107107
while (bytes + length > maxBytes) {
108108
switch (whenFull) {
109109
case DROP:
110-
Map.Entry<Key, byte[]> last = items.pollLastEntry();
110+
Map.Entry<Key, Payload> last = items.pollLastEntry();
111111
droppedItems++;
112-
droppedBytes += last.getValue().length;
113-
bytes -= last.getValue().length;
112+
droppedBytes += last.getValue().bytes.length;
113+
bytes -= last.getValue().bytes.length;
114114
break;
115115
case BLOCK:
116116
notFull.await();
@@ -119,14 +119,14 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept
119119
}
120120
}
121121

122-
Map.Entry<Key, byte[]> next() throws InterruptedException {
122+
Map.Entry<Key, Payload> next() throws InterruptedException {
123123
lock.lock();
124124
try {
125125
while (items.size() == 0) {
126126
notEmpty.await();
127127
}
128-
Map.Entry<Key, byte[]> item = items.pollFirstEntry();
129-
bytes -= item.getValue().length;
128+
Map.Entry<Key, Payload> item = items.pollFirstEntry();
129+
bytes -= item.getValue().bytes.length;
130130
notFull.signalAll();
131131
return item;
132132
} finally {

dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,13 @@
2424
/**
2525
* An HTTP forwarder that delivers DogStatsD HTTP payloads to a remote endpoint.
2626
*
27-
* <p>Payloads are enqueued via {@link #send(byte[])} and delivered asynchronously by a background
28-
* thread. Failed requests are retried with exponential back-off up to {@code maxTries} attempts
29-
* before being discarded.
27+
* <p>Payloads are enqueued via {@link #send(URI, byte[])} and delivered asynchronously by a
28+
* background thread. Failed requests are retried with exponential back-off up to {@code maxTries}
29+
* attempts before being discarded.
3030
*/
3131
public class Forwarder extends Thread {
3232
static final Logger logger = Logger.getLogger(Forwarder.class.getName());
3333
final BoundedQueue queue;
34-
final URI url;
3534
final HttpClient client;
3635
final Duration requestTimeout;
3736
final Random rng = new Random();
@@ -42,9 +41,8 @@ public class Forwarder extends Thread {
4241
int responseOk, responseBadRequest, responseOther;
4342

4443
/**
45-
* Creates a new forwarder targeting the given URL.
44+
* Creates a new forwarder.
4645
*
47-
* @param url the remote HTTP endpoint to POST payloads to
4846
* @param maxRequestsBytes maximum total size of buffered payloads, in bytes
4947
* @param maxTries maximum number of delivery attempts per payload
5048
* @param whenFull action to take when the queue is at capacity
@@ -53,13 +51,11 @@ public class Forwarder extends Thread {
5351
* {@code null} disables the request timeout
5452
*/
5553
public Forwarder(
56-
URI url,
5754
long maxRequestsBytes,
5855
long maxTries,
5956
WhenFull whenFull,
6057
Duration connectTimeout,
6158
Duration requestTimeout) {
62-
this.url = url;
6359
this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull);
6460
this.requestTimeout = requestTimeout;
6561
this.client =
@@ -82,25 +78,29 @@ public void run() {
8278
}
8379

8480
/**
85-
* Enqueues a payload for delivery to the remote endpoint.
81+
* Enqueues a payload for delivery to the given endpoint.
8682
*
8783
* <p>If the queue is full, behaviour is determined by the {@link WhenFull} policy supplied at
8884
* construction time.
8985
*
86+
* @param url the remote HTTP endpoint to POST the payload to
9087
* @param payload the raw bytes to deliver
9188
* @throws InterruptedException if the calling thread is interrupted while waiting for space
9289
* ({@link WhenFull#BLOCK} mode only)
9390
*/
94-
public void send(byte[] payload) throws InterruptedException {
95-
queue.add(payload);
91+
public void send(URI url, byte[] payload) throws InterruptedException {
92+
queue.add(new Payload(url, payload));
9693
}
9794

98-
void runOnce(Map.Entry<BoundedQueue.Key, byte[]> item) throws InterruptedException {
99-
byte[] payload = item.getValue();
100-
logger.log(Level.INFO, "sending {0} bytes", payload.length);
95+
void runOnce(Map.Entry<BoundedQueue.Key, Payload> item) throws InterruptedException {
96+
Payload payload = item.getValue();
97+
logger.log(
98+
Level.INFO,
99+
"sending {0} bytes to {1}",
100+
new Object[] {payload.bytes.length, payload.url});
101101

102102
HttpRequest.Builder builder =
103-
HttpRequest.newBuilder(url).POST(BodyPublishers.ofByteArray(payload));
103+
HttpRequest.newBuilder(payload.url).POST(BodyPublishers.ofByteArray(payload.bytes));
104104
if (requestTimeout != null) {
105105
builder.timeout(requestTimeout);
106106
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/* Unless explicitly stated otherwise all files in this repository are
2+
* licensed under the Apache 2.0 License.
3+
*
4+
* This product includes software developed at Datadog
5+
* (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
6+
*/
7+
8+
package com.datadoghq.dogstatsd.http.forwarder;
9+
10+
import java.net.URI;
11+
12+
class Payload {
13+
final URI url;
14+
final byte[] bytes;
15+
16+
Payload(URI url, byte[] bytes) {
17+
this.url = url;
18+
this.bytes = bytes;
19+
}
20+
}

dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99

1010
import static org.junit.Assert.*;
1111

12+
import java.net.URI;
1213
import java.util.Arrays;
1314
import java.util.Map;
1415
import org.junit.Test;
1516

1617
public class BoundedQueueTest {
18+
private static final URI URL = URI.create("http://example.invalid/");
19+
1720
private static byte[] bytes(int n) {
1821
return bytes(n, (byte) 0);
1922
}
@@ -24,24 +27,32 @@ private static byte[] bytes(int n, byte v) {
2427
return b;
2528
}
2629

30+
private static Payload payload(int n) {
31+
return new Payload(URL, bytes(n));
32+
}
33+
34+
private static Payload payload(byte[] b) {
35+
return new Payload(URL, b);
36+
}
37+
2738
// --- Round-trip / bytes tracking ---
2839

2940
@Test
3041
public void addThenNextReturnsItem() throws InterruptedException {
3142
BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP);
32-
byte[] item = bytes(4);
43+
Payload item = payload(4);
3344
q.add(item);
34-
Map.Entry<BoundedQueue.Key, byte[]> entry = q.next();
45+
Map.Entry<BoundedQueue.Key, Payload> entry = q.next();
3546
assertSame(item, entry.getValue());
3647
assertEquals(0, q.bytes);
3748
}
3849

3950
@Test
4051
public void bytesDecrementedOnNext() throws InterruptedException {
4152
BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP);
42-
q.add(bytes(3));
43-
q.add(bytes(3));
44-
q.add(bytes(3));
53+
q.add(payload(3));
54+
q.add(payload(3));
55+
q.add(payload(3));
4556
assertEquals(9, q.bytes);
4657
q.next();
4758
assertEquals(6, q.bytes);
@@ -54,9 +65,9 @@ public void bytesDecrementedOnNext() throws InterruptedException {
5465
@Test
5566
public void newestItemDequeuesFirstWithinSameTries() throws InterruptedException {
5667
BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP);
57-
byte[] a = {1};
58-
byte[] b = {2};
59-
byte[] c = {3};
68+
Payload a = payload(new byte[] {1});
69+
Payload b = payload(new byte[] {2});
70+
Payload c = payload(new byte[] {3});
6071
q.add(a);
6172
q.add(b);
6273
q.add(c);
@@ -71,11 +82,11 @@ public void newestItemDequeuesFirstWithinSameTries() throws InterruptedException
7182
@Test
7283
public void dropWhenFullDropsOldestItem() throws InterruptedException {
7384
BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP);
74-
byte[] a = bytes(5); // added first → smallest clock → last in TreeMap
75-
byte[] b = bytes(4);
85+
Payload a = payload(5); // added first → smallest clock → last in TreeMap
86+
Payload b = payload(4);
7687
q.add(a); // queue: a(clock=MIN+1)
7788
q.add(b); // queue full: a, b
78-
byte[] c = bytes(3);
89+
Payload c = payload(3);
7990
q.add(c); // a (oldest, last entry) evicted
8091
assertEquals(1, q.droppedItems);
8192
assertEquals(5, q.droppedBytes);
@@ -87,32 +98,32 @@ public void dropWhenFullDropsOldestItem() throws InterruptedException {
8798
@Test
8899
public void dropCountersAccumulate() throws InterruptedException {
89100
BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP);
90-
q.add(bytes(5)); // fills queue (X)
91-
q.add(bytes(5)); // X dropped (Y in)
92-
q.add(bytes(5)); // Y dropped (Z in)
101+
q.add(payload(5)); // fills queue (X)
102+
q.add(payload(5)); // X dropped (Y in)
103+
q.add(payload(5)); // Y dropped (Z in)
93104
assertEquals(2, q.droppedItems);
94105
assertEquals(10, q.droppedBytes);
95106
}
96107

97108
@Test(timeout = 3000)
98109
public void dropDoesNotBlock() throws InterruptedException {
99110
BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP);
100-
q.add(bytes(5)); // fill
101-
q.add(bytes(5)); // should return immediately via DROP
111+
q.add(payload(5)); // fill
112+
q.add(payload(5)); // should return immediately via DROP
102113
}
103114

104115
// --- WhenFull.BLOCK ---
105116

106117
@Test(timeout = 5000)
107118
public void blockUnblocksWhenSpaceFreed() throws InterruptedException {
108119
BoundedQueue q = new BoundedQueue(5, 1, WhenFull.BLOCK);
109-
q.add(bytes(5)); // queue full
120+
q.add(payload(5)); // queue full
110121

111122
Thread producer =
112123
new Thread(
113124
() -> {
114125
try {
115-
q.add(bytes(5));
126+
q.add(payload(5));
116127
} catch (InterruptedException e) {
117128
return;
118129
}
@@ -135,19 +146,19 @@ public void blockUnblocksWhenSpaceFreed() throws InterruptedException {
135146
@Test(expected = IllegalArgumentException.class)
136147
public void addThrowsForOversizedItem() throws InterruptedException {
137148
BoundedQueue q = new BoundedQueue(4, 1, WhenFull.DROP);
138-
q.add(bytes(5));
149+
q.add(payload(5));
139150
}
140151

141152
@Test
142153
public void requeueIncrementsTriesPreservesClock() throws InterruptedException {
143154
BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP);
144-
q.add(bytes(4));
145-
Map.Entry<BoundedQueue.Key, byte[]> entry = q.next();
155+
q.add(payload(4));
156+
Map.Entry<BoundedQueue.Key, Payload> entry = q.next();
146157
assertEquals(0, entry.getKey().tries);
147158
long originalClock = entry.getKey().clock;
148159

149160
q.requeue(entry);
150-
Map.Entry<BoundedQueue.Key, byte[]> requeued = q.next();
161+
Map.Entry<BoundedQueue.Key, Payload> requeued = q.next();
151162
assertEquals(1, requeued.getKey().tries);
152163
assertEquals(originalClock, requeued.getKey().clock);
153164
assertEquals(0, q.droppedItems);
@@ -156,10 +167,10 @@ public void requeueIncrementsTriesPreservesClock() throws InterruptedException {
156167
@Test
157168
public void requeuedItemDequeuesAfterFreshItems() throws InterruptedException {
158169
BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP);
159-
byte[] a = {10};
160-
byte[] b = {20};
170+
Payload a = payload(new byte[] {10});
171+
Payload b = payload(new byte[] {20});
161172
q.add(a);
162-
Map.Entry<BoundedQueue.Key, byte[]> entryA = q.next();
173+
Map.Entry<BoundedQueue.Key, Payload> entryA = q.next();
163174
q.requeue(entryA); // A now has tries=1
164175

165176
q.add(b); // B has tries=0 → higher priority
@@ -171,8 +182,8 @@ public void requeuedItemDequeuesAfterFreshItems() throws InterruptedException {
171182
@Test
172183
public void requeueAtMaxTriesIsAccepted() throws InterruptedException {
173184
BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP);
174-
q.add(bytes(3));
175-
Map.Entry<BoundedQueue.Key, byte[]> e = q.next();
185+
q.add(payload(3));
186+
Map.Entry<BoundedQueue.Key, Payload> e = q.next();
176187
q.requeue(e); // tries → 1
177188
e = q.next();
178189
q.requeue(e); // tries → 2 == maxTries, should be accepted
@@ -183,9 +194,8 @@ public void requeueAtMaxTriesIsAccepted() throws InterruptedException {
183194
@Test
184195
public void requeuePastMaxTriesDropsItem() throws InterruptedException {
185196
BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP);
186-
byte[] item = bytes(7);
187-
q.add(item);
188-
Map.Entry<BoundedQueue.Key, byte[]> e = q.next();
197+
q.add(payload(7));
198+
Map.Entry<BoundedQueue.Key, Payload> e = q.next();
189199
q.requeue(e); // tries → 1
190200
e = q.next();
191201
q.requeue(e); // tries → 2 == maxTries, accepted
@@ -200,8 +210,8 @@ public void requeuePastMaxTriesDropsItem() throws InterruptedException {
200210
@Test(timeout = 5000)
201211
public void nextBlocksUntilItemAdded() throws InterruptedException {
202212
BoundedQueue q = new BoundedQueue(100, 1, WhenFull.DROP);
203-
byte[] item = bytes(3);
204-
Map.Entry<BoundedQueue.Key, byte[]>[] result = new Map.Entry[1];
213+
Payload item = payload(3);
214+
Map.Entry<BoundedQueue.Key, Payload>[] result = new Map.Entry[1];
205215

206216
Thread consumer =
207217
new Thread(

0 commit comments

Comments
 (0)