Skip to content

Commit 5cf32b0

Browse files
authored
Merge pull request #6 from webskin/fix/sse-tcp-con-leak
fix: prevent SSE connection leaks and improve reconnect resilience
2 parents 8fced9c + ed7d0e0 commit 5cf32b0

2 files changed

Lines changed: 158 additions & 32 deletions

File tree

src/main/java/fr/maif/requests/SSEFeatureService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import java.time.Duration;
1818
import java.util.*;
19+
import java.util.concurrent.CancellationException;
1920
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.concurrent.TimeUnit;
@@ -122,6 +123,15 @@ public CompletableFuture<IzanamiResult> featureValues(FeatureRequest request) {
122123
missingFuture.complete(missingResults);
123124
}
124125
}).exceptionally(e -> {
126+
// On intentional cancellation (disconnect() during reconnect),
127+
// don't complete missingFuture with empty map — the new connection's
128+
// FEATURE_STATES event can still deliver real data.
129+
// completeOnTimeout (above) acts as safety net.
130+
if (e instanceof CancellationException ||
131+
(e.getCause() instanceof CancellationException)) {
132+
LOGGER.debug("SSE reconnection cancelled previous connection");
133+
return null;
134+
}
125135
LOGGER.error("Received exception while requesting missing features", e);
126136
missingFuture.complete(new HashMap<>());
127137
return null;

src/main/java/fr/maif/requests/events/SSEClient.java

Lines changed: 148 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Optional;
2424
import java.util.concurrent.*;
2525
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627
import java.util.concurrent.atomic.AtomicLong;
2728
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.function.BiConsumer;
@@ -37,8 +38,19 @@ public class SSEClient {
3738
private final ClientConfiguration clientConfiguration;
3839
private final HttpClient httpClient;
3940
private Stream<String> currentConnection;
40-
private final AtomicLong connectionId = new AtomicLong(0L);
4141

42+
/** Incremented by reconnectWith() only — used by SSEFeatureService to match events to the right connection. */
43+
private final AtomicLong connectionId = new AtomicLong(0L);
44+
/** Incremented on every reconnect() — used to detect stale delayed reconnects and stale thenAccept callbacks. */
45+
private final AtomicLong reconnectGeneration = new AtomicLong(0L);
46+
/** Tracks consecutive failed reconnect attempts for exponential backoff (5s, 10s, 20s, 40s, 60s cap). */
47+
private final AtomicInteger reconnectAttempts = new AtomicInteger(0);
48+
private final Object connectionLock = new Object();
49+
private final AtomicBoolean closed = new AtomicBoolean(false);
50+
51+
/** The root sendAsync future — stored separately from queryFuture so disconnect() can cancel the HTTP exchange itself. */
52+
private CompletableFuture<HttpResponse<Stream<String>>> rawFuture;
53+
/** The terminal future of the chain (rawFuture -> thenAccept -> exceptionally). */
4254
private CompletableFuture<Void> queryFuture;
4355
private FeatureRequest request;
4456
private BiConsumer<Long, IzanamiEvent> consumer;
@@ -63,6 +75,10 @@ public SSEClient(ClientConfiguration clientConfiguration) {
6375
if (maxToleratedDurationWithoutEvents.compareTo(periodSinceLastEvent) < 0) {
6476
LOGGER.error("No event received since {} seconds, will try to disconnect / reconnect", periodSinceLastEvent.toSeconds());
6577
reconnect();
78+
} else {
79+
// Connection healthy for a full life probe cycle —
80+
// safe to reset exponential backoff for future reconnects.
81+
reconnectAttempts.set(0);
6682
}
6783
}
6884
},
@@ -71,7 +87,22 @@ public SSEClient(ClientConfiguration clientConfiguration) {
7187
SECONDS
7288
);
7389
this.executorService = Executors.newFixedThreadPool(2);
74-
this.httpClient = HttpClient.newBuilder().executor(this.executorService).build();
90+
this.httpClient = createHttpClient();
91+
}
92+
93+
/**
94+
* Creates a fresh HttpClient. HTTP/1.1 is forced because the JDK's HTTP/2
95+
* implementation has known bugs with long-lived SSE streams: streams are not
96+
* properly released on close, leading to "too many concurrent streams" errors.
97+
* connectTimeout covers the TCP handshake only, not the SSE stream lifetime.
98+
*/
99+
private HttpClient createHttpClient() {
100+
return HttpClient.newBuilder()
101+
.version(HttpClient.Version.HTTP_1_1)
102+
.executor(this.executorService)
103+
// TODO make connectTimeout configurable via ClientConfiguration
104+
.connectTimeout(Duration.ofSeconds(10))
105+
.build();
75106
}
76107

77108

@@ -95,19 +126,34 @@ public CompletableFuture<Void> doConnect(FeatureRequest request, BiConsumer<Long
95126
try {
96127
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(new URI(url))
97128
.setHeader("Izanami-Client-Id", clientConfiguration.connectionInformation.clientId)
98-
.setHeader("Izanami-Client-Secret", clientConfiguration.connectionInformation.clientSecret)
99-
.timeout(request.getTimeout().orElse(clientConfiguration.callTimeout));
129+
.setHeader("Izanami-Client-Secret", clientConfiguration.connectionInformation.clientSecret);
100130

131+
Duration responseTimeout = request.getTimeout().orElse(clientConfiguration.callTimeout);
101132

102133
var r = request.getPayload()
103134
.map(payload -> requestBuilder.POST(HttpRequest.BodyPublishers.ofString(payload)))
104135
.orElseGet(requestBuilder::GET).build();
105136

106-
LOGGER.debug("Calling {} with a timeout of {} seconds", r.uri().toString(), r.timeout().get().toSeconds());
137+
LOGGER.debug("Calling {} with response timeout of {} seconds", r.uri().toString(), responseTimeout.toSeconds());
138+
139+
long myGeneration = reconnectGeneration.get();
107140

108-
this.queryFuture = httpClient.sendAsync(r, HttpResponse.BodyHandlers.ofLines())
141+
this.rawFuture = httpClient.sendAsync(r, HttpResponse.BodyHandlers.ofLines());
142+
this.queryFuture = this.rawFuture
143+
// orTimeout scopes the timeout to the initial HTTP response only.
144+
// Once thenAccept starts (response received), the SSE stream runs
145+
// indefinitely — orTimeout does not affect it.
146+
.orTimeout(responseTimeout.toSeconds(), TimeUnit.SECONDS)
109147
.thenAccept(resp -> {
110148

149+
// Guard: if reconnect() was called since this doConnect(),
150+
// this connection is stale — don't install it
151+
if (reconnectGeneration.get() != myGeneration) {
152+
LOGGER.debug("Stale connection detected, discarding response");
153+
resp.body().close();
154+
return;
155+
}
156+
111157
if(resp.statusCode() >= 400) {
112158
LOGGER.error("Izanami responded with status code {}", resp.statusCode());
113159
throw new RuntimeException("Failed to connect to Izanami backend");
@@ -122,18 +168,63 @@ public CompletableFuture<Void> doConnect(FeatureRequest request, BiConsumer<Long
122168

123169
this.currentConnection.map(line -> {
124170
var res = sseMachine.addLine(line);
171+
// Update lastEventDate for any complete SSE event
172+
// (including keepalives with unrecognized event types)
173+
// so the life probe knows the connection is alive.
174+
res.ifPresent(sse -> lastEventDate.set(LocalDateTime.now()));
125175
return res.flatMap(EventService::fromSSE);
126176
})
127177
.flatMap(Optional::stream)
128178
.forEach(evt -> {
129-
lastEventDate.set(LocalDateTime.now());
130179
consumer.accept(id, evt);
131180
});
132181
}).exceptionally(e -> {
133182
connected.set(false);
134-
LOGGER.error("An error occured while connecting to sse endpoint : ", e);
135-
CompletableFuture.delayedExecutor(5, SECONDS, executorService).execute(() -> doConnect(request, consumer, id));
136-
throw new RuntimeException(e);
183+
if (closed.get()) {
184+
LOGGER.debug("SSE client is closed, not reconnecting");
185+
return null;
186+
}
187+
if (e instanceof CancellationException ||
188+
(e.getCause() instanceof CancellationException)) {
189+
LOGGER.debug("SSE connection was intentionally cancelled");
190+
return null;
191+
}
192+
193+
// Transient errors are network-level failures expected to self-heal:
194+
// IOException/UncheckedIOException — connection drop, network blip, RST
195+
// TimeoutException — server slow to respond
196+
// Permanent errors are server-side rejections (401, 403, 404, 500)
197+
// that won't resolve without a config or server-side change.
198+
//
199+
// Both schedule a reconnect (server errors may be temporary),
200+
// but only permanent errors are propagated to the caller so it
201+
// can apply its error strategy immediately. Transient errors
202+
// return null and let the reconnect deliver data silently.
203+
Throwable cause = (e instanceof CompletionException && e.getCause() != null) ? e.getCause() : e;
204+
boolean isTransient = cause instanceof java.io.IOException
205+
|| cause instanceof java.io.UncheckedIOException
206+
|| cause instanceof java.util.concurrent.TimeoutException;
207+
208+
long myGen = reconnectGeneration.get();
209+
long delay = Math.min(5 * (1L << Math.min(reconnectAttempts.getAndIncrement(), 4)), 60);
210+
LOGGER.warn("SSE connection lost (transient={}), will reconnect in {}s: {}", isTransient, delay, e.getMessage());
211+
CompletableFuture.delayedExecutor(delay, SECONDS, executorService)
212+
.execute(() -> {
213+
if (closed.get()) {
214+
LOGGER.debug("SSE client closed, skipping reconnect");
215+
} else if (reconnectGeneration.get() == myGen) {
216+
reconnect();
217+
} else {
218+
LOGGER.debug("Skipping stale reconnect, connection already refreshed");
219+
}
220+
});
221+
222+
if (isTransient) {
223+
return null;
224+
}
225+
// Permanent failure — propagate so caller can react,
226+
// but reconnect is still scheduled above
227+
throw (e instanceof CompletionException) ? (CompletionException) e : new CompletionException(e);
137228
});
138229
return queryFuture;
139230
} catch (URISyntaxException e) {
@@ -142,17 +233,38 @@ public CompletableFuture<Void> doConnect(FeatureRequest request, BiConsumer<Long
142233
}
143234

144235
public CompletableFuture<Void> disconnect() {
145-
LOGGER.info("Disconnecting from SSE endpoint");
146-
if (Objects.nonNull(currentConnection)) {
147-
LOGGER.debug("Closing event stream");
148-
currentConnection.close();
149-
}
236+
synchronized (connectionLock) {
237+
LOGGER.info("Disconnecting from SSE endpoint");
238+
239+
// Cancel futures FIRST:
240+
// 1. rawFuture.cancel() — if HTTP response hasn't arrived yet,
241+
// this prevents thenAccept from ever executing (no leaked stream reader)
242+
// 2. queryFuture.cancel() — completes the terminal stage, so the
243+
// exceptionally handler doesn't fire with IOException when close()
244+
// unblocks the reading thread
245+
if (Objects.nonNull(rawFuture)) {
246+
rawFuture.cancel(true);
247+
rawFuture = null;
248+
}
249+
250+
if (Objects.nonNull(queryFuture)) {
251+
queryFuture.cancel(true);
252+
queryFuture = null;
253+
}
150254

151-
connected.set(false);
152-
return CompletableFuture.completedFuture(null);
255+
if (Objects.nonNull(currentConnection)) {
256+
LOGGER.debug("Closing event stream");
257+
currentConnection.close();
258+
currentConnection = null;
259+
}
260+
261+
connected.set(false);
262+
return CompletableFuture.completedFuture(null);
263+
}
153264
}
154265

155266
public void close() {
267+
closed.set(true);
156268
disconnect();
157269
lifeProbeExecutorService.shutdown();
158270
executorService.shutdown();
@@ -164,26 +276,30 @@ public interface ReconnectionConsumer {
164276
void apply(Long connectionId, Long eventId, IzanamiEvent event);
165277
}
166278

167-
// TODO factorise
168279
public CompletableFuture<Void> reconnect() {
169-
LOGGER.debug("Reconnecting with new feature set...");
170-
if (Objects.nonNull(currentConnection)) {
171-
LOGGER.debug("Disconnecting");
172-
disconnect();
173-
} else {
174-
LOGGER.debug("No connection opened");
175-
}
280+
synchronized (connectionLock) {
281+
reconnectGeneration.incrementAndGet();
282+
LOGGER.debug("Reconnecting...");
283+
if (Objects.nonNull(currentConnection) || Objects.nonNull(rawFuture)) {
284+
LOGGER.debug("Disconnecting");
285+
disconnect();
286+
} else {
287+
LOGGER.debug("No connection opened");
288+
}
176289

177-
LOGGER.debug("Reconnecting");
178-
return doConnect(request, consumer, connectionId.get());
290+
LOGGER.debug("Reconnecting");
291+
return doConnect(request, consumer, connectionId.get());
292+
}
179293
}
180294

181295
public CompletableFuture<Void> reconnectWith(FeatureRequest request, ReconnectionConsumer consumer) {
182-
long nextId = connectionId.incrementAndGet();
183-
this.request = request;
184-
this.consumer = (evtId, evt) -> consumer.apply(nextId, evtId, evt);
296+
synchronized (connectionLock) {
297+
long nextId = connectionId.incrementAndGet();
298+
this.request = request;
299+
this.consumer = (evtId, evt) -> consumer.apply(nextId, evtId, evt);
185300

186-
return reconnect();
301+
return reconnect();
302+
}
187303
}
188304

189305
public static class EventService {
@@ -217,7 +333,7 @@ public static class SSEStateMachine {
217333
private ServerSentEvent.Builder currentBuilder = ServerSentEvent.newBuilder();
218334

219335
public Optional<ServerSentEvent> addLine(String line) {
220-
LOGGER.info("LINE {}", line);
336+
LOGGER.debug("LINE {}", line);
221337
if (Objects.isNull(line) || line.isBlank()) {
222338
var sse = currentBuilder.build();
223339
this.currentBuilder = ServerSentEvent.newBuilder();

0 commit comments

Comments
 (0)