Skip to content

Commit 049935f

Browse files
authored
Add keepalive to responseInputStream timeout scheduler executor to ma… (#6756)
* Add keepalive to responseInputStream timeout scheduler executor to make sure scheduler thread doenst leak * Fix checkstyle * Add changelog * Separate timeout value, and extend fix to ResponsePublisher * Fix changelog * Update changelog * Fix changelog
1 parent 1ae68b5 commit 049935f

File tree

3 files changed

+26
-6
lines changed

3 files changed

+26
-6
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed a thread leak in ResponseInputStream and ResponsePublisher where the internal timeout scheduler thread persisted for the lifetime of the JVM, even when no streams were active. The thread now terminates after being idle for 60 seconds."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.time.Duration;
21-
import java.util.concurrent.Executors;
2221
import java.util.concurrent.ScheduledExecutorService;
2322
import java.util.concurrent.ScheduledFuture;
23+
import java.util.concurrent.ScheduledThreadPoolExecutor;
2424
import java.util.concurrent.TimeUnit;
2525
import software.amazon.awssdk.annotations.SdkPublicApi;
2626
import software.amazon.awssdk.annotations.SdkTestInternalApi;
@@ -61,6 +61,7 @@ public final class ResponseInputStream<ResponseT> extends SdkFilterInputStream i
6161

6262
private static final Logger log = Logger.loggerFor(ResponseInputStream.class);
6363
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
64+
private static final long THREAD_IDLE_TIMEOUT_SECONDS = 60;
6465
private final ResponseT response;
6566
private final Abortable abortable;
6667
private ScheduledFuture<?> timeoutTask;
@@ -140,12 +141,18 @@ private void scheduleTimeoutTask(Duration timeout) {
140141
}
141142

142143
private static final class TimeoutScheduler {
143-
static final ScheduledExecutorService INSTANCE =
144-
Executors.newScheduledThreadPool(1, r -> {
144+
static final ScheduledExecutorService INSTANCE;
145+
146+
static {
147+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> {
145148
Thread t = new Thread(r, "response-input-stream-timeout-scheduler");
146149
t.setDaemon(true);
147150
return t;
148151
});
152+
executor.setKeepAliveTime(THREAD_IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
153+
executor.allowCoreThreadTimeOut(true);
154+
INSTANCE = executor;
155+
}
149156
}
150157

151158
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import java.nio.ByteBuffer;
1919
import java.time.Duration;
2020
import java.util.Objects;
21-
import java.util.concurrent.Executors;
2221
import java.util.concurrent.ScheduledExecutorService;
2322
import java.util.concurrent.ScheduledFuture;
23+
import java.util.concurrent.ScheduledThreadPoolExecutor;
2424
import java.util.concurrent.TimeUnit;
2525
import org.reactivestreams.Subscriber;
2626
import org.reactivestreams.Subscription;
@@ -50,6 +50,7 @@ public final class ResponsePublisher<ResponseT extends SdkResponse> implements S
5050

5151
private static final Logger log = Logger.loggerFor(ResponsePublisher.class);
5252
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
53+
private static final long THREAD_IDLE_TIMEOUT_SECONDS = 60;
5354
private final ResponseT response;
5455
private final SdkPublisher<ByteBuffer> publisher;
5556
private ScheduledFuture<?> timeoutTask;
@@ -101,12 +102,18 @@ private void scheduleTimeoutTask(Duration timeout) {
101102
}
102103

103104
private static final class TimeoutScheduler {
104-
static final ScheduledExecutorService INSTANCE =
105-
Executors.newScheduledThreadPool(1, r -> {
105+
static final ScheduledExecutorService INSTANCE;
106+
107+
static {
108+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> {
106109
Thread t = new Thread(r, "response-publisher-timeout-scheduler");
107110
t.setDaemon(true);
108111
return t;
109112
});
113+
executor.setKeepAliveTime(THREAD_IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
114+
executor.allowCoreThreadTimeOut(true);
115+
INSTANCE = executor;
116+
}
110117
}
111118

112119
private static class CancellingSubscriber implements Subscriber<ByteBuffer> {

0 commit comments

Comments
 (0)