Skip to content

Commit 627afe0

Browse files
abdessattar23mohammed-abdessetar
authored andcommitted
Bound JdkHttpSender thread pool to prevent DoS via unbounded thread creation
The default executor used Integer.MAX_VALUE max threads with a SynchronousQueue, allowing thousands of threads under burst load. Cap at max(availableProcessors, 5) with CallerRunsPolicy for backpressure, and await termination on shutdown so in-flight requests complete before the HttpClient is closed.
1 parent b665652 commit 627afe0

2 files changed

Lines changed: 62 additions & 19 deletions

File tree

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.ConcurrentLinkedQueue;
3333
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.RejectedExecutionException;
3435
import java.util.concurrent.SynchronousQueue;
3536
import java.util.concurrent.ThreadLocalRandom;
3637
import java.util.concurrent.ThreadPoolExecutor;
@@ -133,7 +134,7 @@ public final class JdkHttpSender implements HttpSender {
133134
private static ExecutorService newExecutor() {
134135
return new ThreadPoolExecutor(
135136
0,
136-
Integer.MAX_VALUE,
137+
Math.max(Runtime.getRuntime().availableProcessors(), 5),
137138
60,
138139
TimeUnit.SECONDS,
139140
new SynchronousQueue<>(),
@@ -157,24 +158,28 @@ private static HttpClient configureClient(
157158
@Override
158159
public void send(
159160
MessageWriter messageWriter, Consumer<HttpResponse> onResponse, Consumer<Throwable> onError) {
160-
CompletableFuture<HttpResponse> unused =
161-
CompletableFuture.supplyAsync(
162-
() -> {
163-
try {
164-
return sendInternal(messageWriter);
165-
} catch (IOException e) {
166-
throw new UncheckedIOException(e);
167-
}
168-
},
169-
executorService)
170-
.whenComplete(
171-
(httpResponse, throwable) -> {
172-
if (throwable != null) {
173-
onError.accept(throwable);
174-
return;
175-
}
176-
onResponse.accept(httpResponse);
177-
});
161+
try {
162+
CompletableFuture<HttpResponse> unused =
163+
CompletableFuture.supplyAsync(
164+
() -> {
165+
try {
166+
return sendInternal(messageWriter);
167+
} catch (IOException e) {
168+
throw new UncheckedIOException(e);
169+
}
170+
},
171+
executorService)
172+
.whenComplete(
173+
(httpResponse, throwable) -> {
174+
if (throwable != null) {
175+
onError.accept(throwable);
176+
return;
177+
}
178+
onResponse.accept(httpResponse);
179+
});
180+
} catch (RejectedExecutionException e) {
181+
onError.accept(e);
182+
}
178183
}
179184

180185
// Visible for testing
@@ -409,6 +414,11 @@ private void resetPool() {
409414
public CompletableResultCode shutdown() {
410415
if (managedExecutor) {
411416
executorService.shutdown();
417+
try {
418+
executorService.awaitTermination(10, TimeUnit.SECONDS);
419+
} catch (InterruptedException e) {
420+
Thread.currentThread().interrupt();
421+
}
412422
}
413423
if (AutoCloseable.class.isInstance(client)) {
414424
try {

exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.net.http.HttpConnectTimeoutException;
2828
import java.time.Duration;
2929
import java.util.Collections;
30+
import java.util.concurrent.ThreadPoolExecutor;
3031
import java.util.concurrent.TimeUnit;
3132
import javax.net.ssl.SSLException;
3233
import org.assertj.core.api.InstanceOfAssertFactories;
@@ -166,6 +167,38 @@ void sendInternal_NonRetryableException() throws IOException, InterruptedExcepti
166167
verify(mockHttpClient, times(1)).send(any(), any());
167168
}
168169

170+
@Test
171+
void defaultExecutor_isBounded() {
172+
JdkHttpSender defaultSender =
173+
new JdkHttpSender(
174+
URI.create("http://localhost"),
175+
"text/plain",
176+
null,
177+
Duration.ofNanos(1),
178+
Duration.ofSeconds(10),
179+
Collections::emptyMap,
180+
null,
181+
null,
182+
null,
183+
null,
184+
Long.MAX_VALUE);
185+
186+
try {
187+
int expectedMax = Math.max(Runtime.getRuntime().availableProcessors(), 5);
188+
assertThat(defaultSender)
189+
.extracting(
190+
"executorService", as(InstanceOfAssertFactories.type(ThreadPoolExecutor.class)))
191+
.satisfies(
192+
executor -> {
193+
assertThat(executor.getMaximumPoolSize()).isEqualTo(expectedMax);
194+
assertThat(executor.getRejectedExecutionHandler())
195+
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
196+
});
197+
} finally {
198+
defaultSender.shutdown();
199+
}
200+
}
201+
169202
@Test
170203
void connectTimeout() {
171204
sender =

0 commit comments

Comments
 (0)