Skip to content

Commit 7ac5c1a

Browse files
authored
Merge pull request #305 from DataDog/vickenty/dhf
Add dogstatsd-http-forwarder
2 parents 51ead6b + 1008e8e commit 7ac5c1a

8 files changed

Lines changed: 800 additions & 2 deletions

File tree

.github/workflows/ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ jobs:
1212
fail-fast: false
1313
matrix:
1414
os: [ubuntu-latest]
15-
java-version: ['9', '11', '13', '17']
15+
java-version: [9, 11, 13, 17]
1616
include:
1717
- os: windows-latest
18-
java-version: '12'
18+
java-version: 12
1919
steps:
2020
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
2121
- uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0
@@ -28,6 +28,10 @@ jobs:
2828
- name: Build and test dogstatsd-http-core
2929
run: mvn clean install
3030
working-directory: dogstatsd-http-core
31+
- name: Build and test dogstatsd-http-forwarder
32+
if: matrix.java-version >= 11
33+
run: mvn clean install
34+
working-directory: dogstatsd-http-forwarder
3135

3236
test-jnr-exclude:
3337
name: Test (Java 9, jnr-exclude)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/target/

dogstatsd-http-forwarder/pom.xml

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.datadoghq</groupId>
7+
<artifactId>dogstatsd-http-forwarder</artifactId>
8+
<packaging>jar</packaging>
9+
<name>dogstatsd-http-forwarder</name>
10+
<version>1.0.0-SNAPSHOT</version>
11+
<description>HTTP forwarder for DogStatsD metrics.</description>
12+
<url>https://github.com/DataDog/java-dogstatsd-client</url>
13+
14+
<properties>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
</properties>
17+
18+
<licenses>
19+
<license>
20+
<name>The MIT License (MIT)</name>
21+
<url>http://opensource.org/licenses/MIT</url>
22+
<distribution>repo</distribution>
23+
</license>
24+
</licenses>
25+
26+
<scm>
27+
<url>https://github.com/DataDog/java-dogstatsd-client</url>
28+
<connection>scm:git:git@github.com:DataDog/java-dogstatsd-client.git</connection>
29+
<developerConnection>scm:git:git@github.com:Datadog/java-dogstatsd-client.git</developerConnection>
30+
</scm>
31+
32+
<developers>
33+
<developer>
34+
<id>datadog</id>
35+
<name>Datadog developers</name>
36+
<email>dev@datadoghq.com</email>
37+
</developer>
38+
</developers>
39+
40+
<dependencies>
41+
<dependency>
42+
<groupId>junit</groupId>
43+
<artifactId>junit</artifactId>
44+
<version>4.13.1</version>
45+
<scope>test</scope>
46+
</dependency>
47+
</dependencies>
48+
49+
<profiles>
50+
<profile>
51+
<id>spotless</id>
52+
<activation>
53+
<jdk>[17.0,)</jdk>
54+
</activation>
55+
<build>
56+
<plugins>
57+
<plugin>
58+
<groupId>com.diffplug.spotless</groupId>
59+
<artifactId>spotless-maven-plugin</artifactId>
60+
<version>2.45.0</version>
61+
<configuration>
62+
<java>
63+
<googleJavaFormat>
64+
<version>1.28.0</version>
65+
<style>AOSP</style>
66+
</googleJavaFormat>
67+
</java>
68+
</configuration>
69+
<executions>
70+
<execution>
71+
<goals>
72+
<goal>check</goal>
73+
</goals>
74+
</execution>
75+
</executions>
76+
</plugin>
77+
</plugins>
78+
</build>
79+
</profile>
80+
</profiles>
81+
82+
<build>
83+
<plugins>
84+
<plugin>
85+
<groupId>org.apache.maven.plugins</groupId>
86+
<artifactId>maven-compiler-plugin</artifactId>
87+
<version>3.8.1</version>
88+
<configuration>
89+
<source>11</source>
90+
<target>11</target>
91+
</configuration>
92+
</plugin>
93+
<plugin>
94+
<groupId>org.apache.maven.plugins</groupId>
95+
<artifactId>maven-surefire-plugin</artifactId>
96+
<version>2.19</version>
97+
</plugin>
98+
</plugins>
99+
</build>
100+
</project>
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/* Unless explicitly stated otherwise all files in this repository are
2+
* licensed under the Apache 2.0 License.
3+
*
4+
* This product includes software developed at Datadog
5+
* (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
6+
*/
7+
8+
package com.datadoghq.dogstatsd.http.forwarder;
9+
10+
import java.util.Map;
11+
import java.util.TreeMap;
12+
import java.util.concurrent.locks.Condition;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReentrantLock;
15+
16+
class BoundedQueue {
17+
// Key represents a tuple of integers (tries, clock).
18+
static class Key implements Comparable<Key> {
19+
final long tries;
20+
final long clock;
21+
22+
Key(long clock) {
23+
this.tries = 0;
24+
this.clock = clock;
25+
}
26+
27+
private Key(long tries, long clock) {
28+
this.tries = tries;
29+
this.clock = clock;
30+
}
31+
32+
Key next() {
33+
return new Key(tries + 1, clock);
34+
}
35+
36+
@Override
37+
public int compareTo(Key o) {
38+
// Keys are ordered such first we try items with fewer
39+
// attempts, and then with a newer (larger) clock value.
40+
if (tries == o.tries) {
41+
return Long.compare(o.clock, clock);
42+
}
43+
return Long.compare(tries, o.tries);
44+
}
45+
}
46+
47+
long clock = Long.MIN_VALUE;
48+
long bytes;
49+
final long maxBytes;
50+
final long maxTries;
51+
final WhenFull whenFull;
52+
53+
final TreeMap<Key, byte[]> items = new TreeMap<>();
54+
55+
long droppedItems;
56+
long droppedBytes;
57+
58+
Lock lock = new ReentrantLock();
59+
Condition notEmpty = lock.newCondition();
60+
Condition notFull = lock.newCondition();
61+
62+
BoundedQueue(long maxBytes, long maxTries, WhenFull whenFull) {
63+
this.maxBytes = maxBytes;
64+
this.maxTries = maxTries;
65+
this.whenFull = whenFull;
66+
}
67+
68+
void add(byte[] item) throws InterruptedException {
69+
put(null, item, whenFull);
70+
}
71+
72+
void requeue(Map.Entry<Key, byte[]> item) throws InterruptedException {
73+
Key nextKey = item.getKey().next();
74+
if (nextKey.tries > maxTries) {
75+
droppedItems++;
76+
droppedBytes += item.getValue().length;
77+
return;
78+
}
79+
put(nextKey, item.getValue(), WhenFull.DROP);
80+
}
81+
82+
// Must be called when lock is held.
83+
private Key newKey() {
84+
clock++;
85+
return new Key(clock);
86+
}
87+
88+
private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedException {
89+
lock.lock();
90+
try {
91+
if (key == null) {
92+
key = newKey();
93+
}
94+
ensureSpace(item.length, whenFull);
95+
items.put(key, item);
96+
bytes += item.length;
97+
notEmpty.signal();
98+
} finally {
99+
lock.unlock();
100+
}
101+
}
102+
103+
private void ensureSpace(int length, WhenFull whenFull) throws InterruptedException {
104+
if (length > maxBytes) {
105+
throw new IllegalArgumentException("item length is larger than maxBytes");
106+
}
107+
while (bytes + length > maxBytes) {
108+
switch (whenFull) {
109+
case DROP:
110+
Map.Entry<Key, byte[]> last = items.pollLastEntry();
111+
droppedItems++;
112+
droppedBytes += last.getValue().length;
113+
bytes -= last.getValue().length;
114+
break;
115+
case BLOCK:
116+
notFull.await();
117+
break;
118+
}
119+
}
120+
}
121+
122+
Map.Entry<Key, byte[]> next() throws InterruptedException {
123+
lock.lock();
124+
try {
125+
while (items.size() == 0) {
126+
notEmpty.await();
127+
}
128+
Map.Entry<Key, byte[]> item = items.pollFirstEntry();
129+
bytes -= item.getValue().length;
130+
notFull.signalAll();
131+
return item;
132+
} finally {
133+
lock.unlock();
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)