2323import java .util .Optional ;
2424import java .util .concurrent .*;
2525import java .util .concurrent .atomic .AtomicBoolean ;
26+ import java .util .concurrent .atomic .AtomicInteger ;
2627import java .util .concurrent .atomic .AtomicLong ;
2728import java .util .concurrent .atomic .AtomicReference ;
2829import java .util .function .BiConsumer ;
@@ -37,8 +38,19 @@ public class SSEClient {
3738 private final ClientConfiguration clientConfiguration ;
3839 private final HttpClient httpClient ;
3940 private Stream <String > currentConnection ;
40- private final AtomicLong connectionId = new AtomicLong (0L );
4141
42+ /** Incremented by reconnectWith() only — used by SSEFeatureService to match events to the right connection. */
43+ private final AtomicLong connectionId = new AtomicLong (0L );
44+ /** Incremented on every reconnect() — used to detect stale delayed reconnects and stale thenAccept callbacks. */
45+ private final AtomicLong reconnectGeneration = new AtomicLong (0L );
46+ /** Tracks consecutive failed reconnect attempts for exponential backoff (5s, 10s, 20s, 40s, 60s cap). */
47+ private final AtomicInteger reconnectAttempts = new AtomicInteger (0 );
48+ private final Object connectionLock = new Object ();
49+ private final AtomicBoolean closed = new AtomicBoolean (false );
50+
51+ /** The root sendAsync future — stored separately from queryFuture so disconnect() can cancel the HTTP exchange itself. */
52+ private CompletableFuture <HttpResponse <Stream <String >>> rawFuture ;
53+ /** The terminal future of the chain (rawFuture -> thenAccept -> exceptionally). */
4254 private CompletableFuture <Void > queryFuture ;
4355 private FeatureRequest request ;
4456 private BiConsumer <Long , IzanamiEvent > consumer ;
@@ -63,6 +75,10 @@ public SSEClient(ClientConfiguration clientConfiguration) {
6375 if (maxToleratedDurationWithoutEvents .compareTo (periodSinceLastEvent ) < 0 ) {
6476 LOGGER .error ("No event received since {} seconds, will try to disconnect / reconnect" , periodSinceLastEvent .toSeconds ());
6577 reconnect ();
78+ } else {
79+ // Connection healthy for a full life probe cycle —
80+ // safe to reset exponential backoff for future reconnects.
81+ reconnectAttempts .set (0 );
6682 }
6783 }
6884 },
@@ -71,7 +87,22 @@ public SSEClient(ClientConfiguration clientConfiguration) {
7187 SECONDS
7288 );
7389 this .executorService = Executors .newFixedThreadPool (2 );
74- this .httpClient = HttpClient .newBuilder ().executor (this .executorService ).build ();
90+ this .httpClient = createHttpClient ();
91+ }
92+
93+ /**
94+ * Creates a fresh HttpClient. HTTP/1.1 is forced because the JDK's HTTP/2
95+ * implementation has known bugs with long-lived SSE streams: streams are not
96+ * properly released on close, leading to "too many concurrent streams" errors.
97+ * connectTimeout covers the TCP handshake only, not the SSE stream lifetime.
98+ */
99+ private HttpClient createHttpClient () {
100+ return HttpClient .newBuilder ()
101+ .version (HttpClient .Version .HTTP_1_1 )
102+ .executor (this .executorService )
103+ // TODO make connectTimeout configurable via ClientConfiguration
104+ .connectTimeout (Duration .ofSeconds (10 ))
105+ .build ();
75106 }
76107
77108
@@ -95,19 +126,34 @@ public CompletableFuture<Void> doConnect(FeatureRequest request, BiConsumer<Long
95126 try {
96127 HttpRequest .Builder requestBuilder = HttpRequest .newBuilder (new URI (url ))
97128 .setHeader ("Izanami-Client-Id" , clientConfiguration .connectionInformation .clientId )
98- .setHeader ("Izanami-Client-Secret" , clientConfiguration .connectionInformation .clientSecret )
99- .timeout (request .getTimeout ().orElse (clientConfiguration .callTimeout ));
129+ .setHeader ("Izanami-Client-Secret" , clientConfiguration .connectionInformation .clientSecret );
100130
131+ Duration responseTimeout = request .getTimeout ().orElse (clientConfiguration .callTimeout );
101132
102133 var r = request .getPayload ()
103134 .map (payload -> requestBuilder .POST (HttpRequest .BodyPublishers .ofString (payload )))
104135 .orElseGet (requestBuilder ::GET ).build ();
105136
106- LOGGER .debug ("Calling {} with a timeout of {} seconds" , r .uri ().toString (), r .timeout ().get ().toSeconds ());
137+ LOGGER .debug ("Calling {} with response timeout of {} seconds" , r .uri ().toString (), responseTimeout .toSeconds ());
138+
139+ long myGeneration = reconnectGeneration .get ();
107140
108- this .queryFuture = httpClient .sendAsync (r , HttpResponse .BodyHandlers .ofLines ())
141+ this .rawFuture = httpClient .sendAsync (r , HttpResponse .BodyHandlers .ofLines ());
142+ this .queryFuture = this .rawFuture
143+ // orTimeout scopes the timeout to the initial HTTP response only.
144+ // Once thenAccept starts (response received), the SSE stream runs
145+ // indefinitely — orTimeout does not affect it.
146+ .orTimeout (responseTimeout .toSeconds (), TimeUnit .SECONDS )
109147 .thenAccept (resp -> {
110148
149+ // Guard: if reconnect() was called since this doConnect(),
150+ // this connection is stale — don't install it
151+ if (reconnectGeneration .get () != myGeneration ) {
152+ LOGGER .debug ("Stale connection detected, discarding response" );
153+ resp .body ().close ();
154+ return ;
155+ }
156+
111157 if (resp .statusCode () >= 400 ) {
112158 LOGGER .error ("Izanami responded with status code {}" , resp .statusCode ());
113159 throw new RuntimeException ("Failed to connect to Izanami backend" );
@@ -122,18 +168,63 @@ public CompletableFuture<Void> doConnect(FeatureRequest request, BiConsumer<Long
122168
123169 this .currentConnection .map (line -> {
124170 var res = sseMachine .addLine (line );
171+ // Update lastEventDate for any complete SSE event
172+ // (including keepalives with unrecognized event types)
173+ // so the life probe knows the connection is alive.
174+ res .ifPresent (sse -> lastEventDate .set (LocalDateTime .now ()));
125175 return res .flatMap (EventService ::fromSSE );
126176 })
127177 .flatMap (Optional ::stream )
128178 .forEach (evt -> {
129- lastEventDate .set (LocalDateTime .now ());
130179 consumer .accept (id , evt );
131180 });
132181 }).exceptionally (e -> {
133182 connected .set (false );
134- LOGGER .error ("An error occured while connecting to sse endpoint : " , e );
135- CompletableFuture .delayedExecutor (5 , SECONDS , executorService ).execute (() -> doConnect (request , consumer , id ));
136- throw new RuntimeException (e );
183+ if (closed .get ()) {
184+ LOGGER .debug ("SSE client is closed, not reconnecting" );
185+ return null ;
186+ }
187+ if (e instanceof CancellationException ||
188+ (e .getCause () instanceof CancellationException )) {
189+ LOGGER .debug ("SSE connection was intentionally cancelled" );
190+ return null ;
191+ }
192+
193+ // Transient errors are network-level failures expected to self-heal:
194+ // IOException/UncheckedIOException — connection drop, network blip, RST
195+ // TimeoutException — server slow to respond
196+ // Permanent errors are server-side rejections (401, 403, 404, 500)
197+ // that won't resolve without a config or server-side change.
198+ //
199+ // Both schedule a reconnect (server errors may be temporary),
200+ // but only permanent errors are propagated to the caller so it
201+ // can apply its error strategy immediately. Transient errors
202+ // return null and let the reconnect deliver data silently.
203+ Throwable cause = (e instanceof CompletionException && e .getCause () != null ) ? e .getCause () : e ;
204+ boolean isTransient = cause instanceof java .io .IOException
205+ || cause instanceof java .io .UncheckedIOException
206+ || cause instanceof java .util .concurrent .TimeoutException ;
207+
208+ long myGen = reconnectGeneration .get ();
209+ long delay = Math .min (5 * (1L << Math .min (reconnectAttempts .getAndIncrement (), 4 )), 60 );
210+ LOGGER .warn ("SSE connection lost (transient={}), will reconnect in {}s: {}" , isTransient , delay , e .getMessage ());
211+ CompletableFuture .delayedExecutor (delay , SECONDS , executorService )
212+ .execute (() -> {
213+ if (closed .get ()) {
214+ LOGGER .debug ("SSE client closed, skipping reconnect" );
215+ } else if (reconnectGeneration .get () == myGen ) {
216+ reconnect ();
217+ } else {
218+ LOGGER .debug ("Skipping stale reconnect, connection already refreshed" );
219+ }
220+ });
221+
222+ if (isTransient ) {
223+ return null ;
224+ }
225+ // Permanent failure — propagate so caller can react,
226+ // but reconnect is still scheduled above
227+ throw (e instanceof CompletionException ) ? (CompletionException ) e : new CompletionException (e );
137228 });
138229 return queryFuture ;
139230 } catch (URISyntaxException e ) {
@@ -142,17 +233,38 @@ public CompletableFuture<Void> doConnect(FeatureRequest request, BiConsumer<Long
142233 }
143234
144235 public CompletableFuture <Void > disconnect () {
145- LOGGER .info ("Disconnecting from SSE endpoint" );
146- if (Objects .nonNull (currentConnection )) {
147- LOGGER .debug ("Closing event stream" );
148- currentConnection .close ();
149- }
236+ synchronized (connectionLock ) {
237+ LOGGER .info ("Disconnecting from SSE endpoint" );
238+
239+ // Cancel futures FIRST:
240+ // 1. rawFuture.cancel() — if HTTP response hasn't arrived yet,
241+ // this prevents thenAccept from ever executing (no leaked stream reader)
242+ // 2. queryFuture.cancel() — completes the terminal stage, so the
243+ // exceptionally handler doesn't fire with IOException when close()
244+ // unblocks the reading thread
245+ if (Objects .nonNull (rawFuture )) {
246+ rawFuture .cancel (true );
247+ rawFuture = null ;
248+ }
249+
250+ if (Objects .nonNull (queryFuture )) {
251+ queryFuture .cancel (true );
252+ queryFuture = null ;
253+ }
150254
151- connected .set (false );
152- return CompletableFuture .completedFuture (null );
255+ if (Objects .nonNull (currentConnection )) {
256+ LOGGER .debug ("Closing event stream" );
257+ currentConnection .close ();
258+ currentConnection = null ;
259+ }
260+
261+ connected .set (false );
262+ return CompletableFuture .completedFuture (null );
263+ }
153264 }
154265
155266 public void close () {
267+ closed .set (true );
156268 disconnect ();
157269 lifeProbeExecutorService .shutdown ();
158270 executorService .shutdown ();
@@ -164,26 +276,30 @@ public interface ReconnectionConsumer {
164276 void apply (Long connectionId , Long eventId , IzanamiEvent event );
165277 }
166278
167- // TODO factorise
168279 public CompletableFuture <Void > reconnect () {
169- LOGGER .debug ("Reconnecting with new feature set..." );
170- if (Objects .nonNull (currentConnection )) {
171- LOGGER .debug ("Disconnecting" );
172- disconnect ();
173- } else {
174- LOGGER .debug ("No connection opened" );
175- }
280+ synchronized (connectionLock ) {
281+ reconnectGeneration .incrementAndGet ();
282+ LOGGER .debug ("Reconnecting..." );
283+ if (Objects .nonNull (currentConnection ) || Objects .nonNull (rawFuture )) {
284+ LOGGER .debug ("Disconnecting" );
285+ disconnect ();
286+ } else {
287+ LOGGER .debug ("No connection opened" );
288+ }
176289
177- LOGGER .debug ("Reconnecting" );
178- return doConnect (request , consumer , connectionId .get ());
290+ LOGGER .debug ("Reconnecting" );
291+ return doConnect (request , consumer , connectionId .get ());
292+ }
179293 }
180294
181295 public CompletableFuture <Void > reconnectWith (FeatureRequest request , ReconnectionConsumer consumer ) {
182- long nextId = connectionId .incrementAndGet ();
183- this .request = request ;
184- this .consumer = (evtId , evt ) -> consumer .apply (nextId , evtId , evt );
296+ synchronized (connectionLock ) {
297+ long nextId = connectionId .incrementAndGet ();
298+ this .request = request ;
299+ this .consumer = (evtId , evt ) -> consumer .apply (nextId , evtId , evt );
185300
186- return reconnect ();
301+ return reconnect ();
302+ }
187303 }
188304
189305 public static class EventService {
@@ -217,7 +333,7 @@ public static class SSEStateMachine {
217333 private ServerSentEvent .Builder currentBuilder = ServerSentEvent .newBuilder ();
218334
219335 public Optional <ServerSentEvent > addLine (String line ) {
220- LOGGER .info ("LINE {}" , line );
336+ LOGGER .debug ("LINE {}" , line );
221337 if (Objects .isNull (line ) || line .isBlank ()) {
222338 var sse = currentBuilder .build ();
223339 this .currentBuilder = ServerSentEvent .newBuilder ();
0 commit comments