Skip to content

Commit ca3282c

Browse files
committed
Don't block IceAdapter.start() on telemetry websocket connect (fixes #42)
TelemetryDebugger.startupComplete() called websocketClient.connectBlocking() synchronously on the IceAdapter startup thread, with no timeout. Telemetry is a debug-only observability channel - its data flow is strictly outgoing (see ice-adapter/src/main/java/com/faforever/iceadapter/telemetry/, all messages implement OutgoingMessageV1) and no game logic depends on it being connected - so it should never gate peer connectivity. When the telemetry server is silently unreachable (TCP layer up but app layer hung - for example an alive load balancer fronting a dead backend), the WebSocket Upgrade handshake never receives an HTTP 101 response. connectBlocking() then waits until TCP keepalive eventually kills the socket, about two hours later. During that whole window IceAdapter.start() is blocked, the FAF client's adapter-ready orchestration trips its own timeouts, and the user-visible symptom is "the ice adapter doesn't connect to other players". Fix: run the connect on a virtual thread so startup returns immediately. The pre-existing reconnect loop in sendingLoop() already handles transient drops, and queued messages catch up once the socket opens because the messageQueue is unbounded. Also adds the missing return after the InterruptedException catch (previously fell through to sendMessage on an interrupted thread) and re-asserts the interrupt flag, per Java best practice. Fixes #42
1 parent d49f9fe commit ca3282c

1 file changed

Lines changed: 174 additions & 8 deletions

File tree

ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java

Lines changed: 174 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@
2828
import org.java_websocket.client.WebSocketClient;
2929
import org.java_websocket.handshake.ServerHandshake;
3030

31+
/**
32+
* {@link Debugger} implementation that streams ICE adapter lifecycle, GPGNet, and per-peer events
33+
* to the FAF telemetry server over a WebSocket.
34+
*
35+
* <p>The data flow is strictly outgoing: every payload type in
36+
* {@code com.faforever.iceadapter.telemetry} implements {@link OutgoingMessageV1}, and the only
37+
* inbound handler ({@link WebSocketClient#onMessage(String)}) just logs what arrives. As a result
38+
* telemetry is a debug-only observability channel and must never gate core ICE adapter behaviour.
39+
*
40+
* <p>Outgoing messages are decoupled from the calling thread by a
41+
* {@link LinkedBlockingQueue} drained by {@link #sendingLoop()} on a dedicated virtual thread.
42+
* The queue is unbounded, so messages produced before the socket opens (or during a transient
43+
* drop) are buffered and flushed on (re)connect.
44+
*/
3145
@Slf4j
3246
public class TelemetryDebugger implements Debugger, AutoCloseable {
3347
private final WebSocketClient websocketClient;
@@ -38,6 +52,16 @@ public class TelemetryDebugger implements Debugger, AutoCloseable {
3852

3953
private final Thread sendingLoopThread;
4054

55+
/**
56+
* Registers this debugger with {@link Debug}, builds the telemetry WebSocket client targeting
57+
* {@code <telemetryServer>/adapter/v1/game/<gameId>/player/<playerId>}, and starts the virtual
58+
* thread that drains the outgoing message queue. The connection itself is not opened here; see
59+
* {@link #startupComplete()}.
60+
*
61+
* @param telemetryServer base URI of the telemetry server (e.g. {@code wss://...}).
62+
* @param gameId FAF game id this adapter instance belongs to.
63+
* @param playerId FAF player id this adapter instance belongs to.
64+
*/
4165
public TelemetryDebugger(String telemetryServer, int gameId, int playerId) {
4266
Debug.register(this);
4367

@@ -49,21 +73,38 @@ public TelemetryDebugger(String telemetryServer, int gameId, int playerId) {
4973
playerId);
5074

5175
websocketClient = new WebSocketClient(uri) {
76+
/** Logs that the WebSocket handshake completed. The server handshake metadata is unused. */
5277
@Override
5378
public void onOpen(ServerHandshake handshakedata) {
5479
log.info("Telemetry websocket opened");
5580
}
5681

82+
/**
83+
* Logs an inbound telemetry message.
84+
*
85+
* <p>Telemetry is one-way (adapter -&gt; server) by design, so receiving a message is
86+
* unusual and worth surfacing in logs but never acted upon by the adapter.
87+
*/
5788
@Override
5889
public void onMessage(String message) {
5990
log.info("Telemetry websocket message: {}", message);
6091
}
6192

93+
/** Logs that the WebSocket has been closed, including the reason reported by the peer. */
6294
@Override
6395
public void onClose(int code, String reason, boolean remote) {
6496
log.info("Telemetry websocket closed (reason: {})", reason);
6597
}
6698

99+
/**
100+
* Handles errors raised by the WebSocket client.
101+
*
102+
* <p>A {@link ConnectException} indicates the telemetry server is unreachable; in that
103+
* case the surrounding {@link TelemetryDebugger} is removed so subsequent
104+
* {@code debug().*} calls iterate an empty debugger list and become no-ops. All other
105+
* exceptions are logged but the debugger is kept registered so transient errors don't
106+
* disable telemetry permanently.
107+
*/
67108
@Override
68109
public void onError(Exception ex) {
69110
if (ex instanceof ConnectException) {
@@ -81,6 +122,16 @@ public void onError(Exception ex) {
81122
sendingLoopThread = Thread.ofVirtual().name("sendingLoop").start(this::sendingLoop);
82123
}
83124

125+
/**
126+
* Enqueues an outgoing telemetry message for transmission by {@link #sendingLoop()}.
127+
*
128+
* <p>The queue is unbounded and the put is interruptible; if the calling thread is interrupted
129+
* while waiting (which the {@link LinkedBlockingQueue} does not in practice for an unbounded
130+
* queue) the {@link InterruptedException} is rethrown wrapped as a {@link RuntimeException} so
131+
* callers do not need to declare it.
132+
*
133+
* @param message message to enqueue.
134+
*/
84135
private void sendMessage(OutgoingMessageV1 message) {
85136
try {
86137
messageQueue.put(message);
@@ -89,6 +140,16 @@ private void sendMessage(OutgoingMessageV1 message) {
89140
}
90141
}
91142

143+
/**
144+
* Loop body of the dedicated {@code sendingLoop} virtual thread.
145+
*
146+
* <p>Blocks on {@link BlockingQueue#take()}, serializes each message to JSON via Jackson, and
147+
* sends it over the WebSocket. If the socket has dropped, {@link WebSocketClient#reconnectBlocking()}
148+
* is called before the message is sent so transient drops are transparent to producers.
149+
*
150+
* <p>Exits cleanly on interrupt; other exceptions are logged and the loop continues so a single
151+
* malformed message cannot stall the entire telemetry channel.
152+
*/
92153
@SneakyThrows
93154
private void sendingLoop() {
94155
while (!Thread.currentThread().isInterrupted()) {
@@ -113,39 +174,97 @@ private void sendingLoop() {
113174
}
114175
}
115176

177+
/**
178+
* Connects to the telemetry websocket server asynchronously and registers this adapter as a peer
179+
* once the connection is established.
180+
*
181+
* <p>Telemetry is a debug-only, strictly outgoing observability channel; no game logic depends
182+
* on it being connected. The connect is therefore performed on a virtual thread so that
183+
* {@code IceAdapter.start()} returns immediately and peer connectivity is never gated on the
184+
* telemetry server being reachable.
185+
*
186+
* <p>Previously this method called {@code websocketClient.connectBlocking()} synchronously on
187+
* the startup thread with no timeout. When the telemetry server's TCP layer was reachable but
188+
* its application layer was hung (for example an alive load balancer fronting a dead backend),
189+
* the WebSocket Upgrade response never arrived and startup blocked until TCP keepalive killed
190+
* the socket roughly two hours later. The user-visible symptom was "the ice adapter doesn't
191+
* connect to other players" because the FAF client's adapter-ready orchestration tripped its
192+
* own timeouts long before {@code start()} returned.
193+
*
194+
* <p>Failure handling:
195+
* <ul>
196+
* <li>If {@code connectBlocking()} returns {@code false}, this debugger is removed via
197+
* {@link Debug#remove(Debugger)} and no further telemetry is queued.</li>
198+
* <li>If the connect thread is interrupted, the debugger is removed, the failure is logged,
199+
* and the interrupt flag is re-asserted on the current thread per Java best practice.</li>
200+
* <li>On a successful connect, a {@link RegisterAsPeer} message is enqueued. The pre-existing
201+
* reconnect logic in {@link #sendingLoop()} handles transient drops; queued messages catch
202+
* up once the socket re-opens because {@code messageQueue} is unbounded.</li>
203+
* </ul>
204+
*
205+
* @see <a href="https://github.com/FAForever/java-ice-adapter/issues/42">Issue #42</a>
206+
*/
116207
@Override
117208
public void startupComplete() {
118-
try {
119-
if (!websocketClient.connectBlocking()) {
209+
Thread.ofVirtual().name("telemetry-connect").start(() -> {
210+
try {
211+
if (!websocketClient.connectBlocking()) {
212+
Debug.remove(this);
213+
return;
214+
}
215+
} catch (InterruptedException e) {
120216
Debug.remove(this);
217+
log.error("Failed to connect to telemetry websocket", e);
218+
Thread.currentThread().interrupt();
121219
return;
122220
}
123-
} catch (InterruptedException e) {
124-
Debug.remove(this);
125-
log.error("Failed to connect to telemetry websocket", e);
126-
}
127221

128-
sendMessage(new RegisterAsPeer(
129-
UUID.randomUUID(), "java-ice-adapter/" + IceAdapter.getVersion(), IceAdapter.getLogin()));
222+
sendMessage(new RegisterAsPeer(
223+
UUID.randomUUID(), "java-ice-adapter/" + IceAdapter.getVersion(), IceAdapter.getLogin()));
224+
});
130225
}
131226

227+
/**
228+
* Logs that the JSON-RPC server has started and arranges to log again once a peer connects.
229+
*
230+
* <p>No telemetry message is sent here because RPC peer state is not part of the v1 telemetry
231+
* protocol; this hook exists purely for local diagnostics.
232+
*
233+
* @param peerFuture future that completes with the connected RPC peer.
234+
*/
132235
@Override
133236
public void rpcStarted(CompletableFuture<JJsonPeer> peerFuture) {
134237
log.info("RPC started");
135238
peerFuture.thenAccept(peer -> log.info("RPC connected"));
136239
}
137240

241+
/**
242+
* Sends an {@link UpdateGpgnetState} indicating the GPGNet server is up and waiting for the game
243+
* client to connect.
244+
*/
138245
@Override
139246
public void gpgnetStarted() {
140247
sendMessage(new UpdateGpgnetState(UUID.randomUUID(), "WAITING_FOR_GAME"));
141248
}
142249

250+
/**
251+
* Sends an {@link UpdateGpgnetState} reflecting the current GPGNet connection state -
252+
* {@code GAME_CONNECTED} when {@link GPGNetServer#isConnected()} is {@code true}, otherwise
253+
* {@code WAITING_FOR_GAME}.
254+
*/
143255
@Override
144256
public void gpgnetConnectedDisconnected() {
145257
sendMessage(new UpdateGpgnetState(
146258
UUID.randomUUID(), GPGNetServer.isConnected() ? "GAME_CONNECTED" : "WAITING_FOR_GAME"));
147259
}
148260

261+
/**
262+
* Sends an {@link UpdateGameState} carrying the latest GPGNet game state.
263+
*
264+
* @throws IllegalStateException if {@link GPGNetServer#getGameState()} is empty - the contract
265+
* of this hook is that it is invoked on a state transition, so
266+
* an absent state would indicate a programming error upstream.
267+
*/
149268
@Override
150269
public void gameStateChanged() {
151270
sendMessage(new UpdateGameState(
@@ -154,17 +273,40 @@ public void gameStateChanged() {
154273
.orElseThrow(() -> new IllegalStateException("gameState must not change to null"))));
155274
}
156275

276+
/**
277+
* Sends a {@link ConnectToPeer} event when the adapter starts attempting to reach a peer.
278+
*
279+
* @param id remote player id.
280+
* @param login remote player login.
281+
* @param localOffer {@code true} if this adapter generated the SDP offer, {@code false} if it
282+
* is the answering side.
283+
*/
157284
@Override
158285
public void connectToPeer(int id, String login, boolean localOffer) {
159286
sendMessage(new ConnectToPeer(UUID.randomUUID(), id, login, localOffer));
160287
}
161288

289+
/**
290+
* Sends a {@link DisconnectFromPeer} event and drops any rate-limiter state held for the peer
291+
* so a future reconnect starts from a clean slate.
292+
*
293+
* @param id remote player id.
294+
*/
162295
@Override
163296
public void disconnectFromPeer(int id) {
164297
peerRateLimiter.remove(id);
165298
sendMessage(new DisconnectFromPeer(UUID.randomUUID(), id));
166299
}
167300

301+
/**
302+
* Sends an {@link UpdatePeerState} carrying the current ICE state and the candidate types of
303+
* the currently selected pair (local and remote), if any.
304+
*
305+
* <p>Unlike {@link #peerConnectivityUpdate(Peer)} this method is not rate-limited: ICE state
306+
* transitions are infrequent and each one is independently meaningful for diagnostics.
307+
*
308+
* @param peer peer whose ICE state changed.
309+
*/
168310
@Override
169311
public void peerStateChanged(Peer peer) {
170312
sendMessage(new UpdatePeerState(
@@ -183,6 +325,16 @@ public void peerStateChanged(Peer peer) {
183325
.orElse(null)));
184326
}
185327

328+
/**
329+
* Sends an {@link UpdatePeerConnectivity} with the latest RTT and last-packet-received timestamp
330+
* for the given peer.
331+
*
332+
* <p>Rate-limited to one update per second per peer via a per-peer Guava {@link RateLimiter},
333+
* because connectivity samples can fire at the underlying STUN keepalive cadence and would
334+
* otherwise dominate the telemetry stream.
335+
*
336+
* @param peer peer whose connectivity sample is being reported.
337+
*/
186338
@Override
187339
public void peerConnectivityUpdate(Peer peer) {
188340
if (!peerRateLimiter
@@ -210,6 +362,13 @@ public void peerConnectivityUpdate(Peer peer) {
210362
.orElse(null)));
211363
}
212364

365+
/**
366+
* Sends an {@link UpdateCoturnList} reporting the COTURN servers currently known to the adapter.
367+
* The first server's host is included as a convenience so the telemetry UI can display the
368+
* "primary" relay without pulling the whole list apart.
369+
*
370+
* @param servers current COTURN server list (may be empty).
371+
*/
213372
@Override
214373
public void updateCoturnList(Collection<CoturnServer> servers) {
215374
sendMessage(new UpdateCoturnList(
@@ -218,6 +377,13 @@ public void updateCoturnList(Collection<CoturnServer> servers) {
218377
servers));
219378
}
220379

380+
/**
381+
* Interrupts the sending loop so the dedicated virtual thread can exit cleanly.
382+
*
383+
* <p>Implements {@link AutoCloseable} so callers can manage telemetry lifetime with
384+
* try-with-resources. The WebSocket itself is not closed here - the underlying
385+
* {@link WebSocketClient} owns its own close lifecycle and the JVM will tear it down on exit.
386+
*/
221387
@Override
222388
public void close() {
223389
sendingLoopThread.interrupt();

0 commit comments

Comments
 (0)