Skip to content

Commit 0ec66f3

Browse files
bluestreak01claude
andcommitted
feat(ilp): cursor I/O loop reconnect + replay
The cursor I/O loop previously treated any wire failure as terminal — first disconnect = sender broken, every subsequent batch threw. Now, when the sender wires a ReconnectFactory + ReconnectListener, a wire failure triggers: 1. WARN log 2. Build a fresh WebSocketClient via the factory (same auth/TLS/host) 3. Reset wire state: nextWireSeq=0, fsnAtZero = engine.ackedFsn() + 1 4. Reposition the cursor at the first unacked FSN (replay) 5. Notify the listener → producer's connectionGeneration bumps so the next encode emits full schema definitions, not refs the new server has never seen 6. Outer ioLoop continues — nextWireSeq=0 starts on the new wire, trySendOne picks up at the repositioned cursor and replays every unacked frame, then continues with whatever the producer publishes next Added in main: * CursorWebSocketSendLoop.ReconnectFactory + .ReconnectListener interfaces (both functional, both null-able for legacy "fail-fast" semantics) * positionCursorAt(fsn) — walks frames inside the segment containing fsn to find the byte offset * SegmentRing.findSegmentContaining(fsn) + CursorSendEngine pass-through — used by the cursor reposition * QwpWebSocketSender extracts buildAndConnect() to use both for the initial connect and as the reconnect factory; onWireReconnect() is the listener that bumps connectionGeneration This commit covers the *mechanics* (one attempt, succeed-or-fail). The follow-up commit adds policy: exponential backoff with jitter, per-outage time cap (reconnect_max_duration_millis, default 300s per spec decision #2), and auth-failure detection (401/403/non-101 treated as terminal so the retry budget isn't wasted on errors that won't fix themselves). Two integration tests: * testReconnectAfterServerInducedDisconnect — server ACKs then closes; sender reconnects, second batch lands on the new wire * testReplayResendsUnackedFramesAcrossReconnect — server receives the first frame WITHOUT ACKing then closes; sender reconnects and replays the unacked frame on the new connection Spec decisions #5 (encode-mid-reconnect race) and the core of #1/#2 (reconnect mechanics) land here. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 71afa21 commit 0ec66f3

5 files changed

Lines changed: 466 additions & 24 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,25 +1314,14 @@ private void ensureConnected() {
13141314
if (cursorEngine == null) {
13151315
throw new LineSenderException("cursor engine must be attached before connect");
13161316
}
1317-
if (tlsConfig != null) {
1318-
client = WebSocketClientFactory.newTlsInstance(tlsConfig);
1319-
} else {
1320-
client = WebSocketClientFactory.newPlainTextInstance();
1321-
}
1322-
try {
1323-
client.setQwpMaxVersion(QwpConstants.MAX_SUPPORTED_INGEST_VERSION);
1324-
client.setQwpClientId(QwpConstants.CLIENT_ID);
1325-
client.setQwpRequestDurableAck(requestDurableAck);
1326-
client.connect(host, port);
1327-
client.upgrade(WRITE_PATH, authorizationHeader);
1328-
} catch (Exception e) {
1329-
client.close();
1330-
client = null;
1331-
throw new LineSenderException("Failed to connect to " + host + ":" + port, e);
1332-
}
1317+
client = buildAndConnect();
13331318

13341319
try {
1335-
cursorSendLoop = new CursorWebSocketSendLoop(client, cursorEngine);
1320+
cursorSendLoop = new CursorWebSocketSendLoop(
1321+
client, cursorEngine,
1322+
0L, CursorWebSocketSendLoop.DEFAULT_PARK_NANOS,
1323+
this::buildAndConnect,
1324+
this::onWireReconnect);
13361325
cursorSendLoop.start();
13371326
} catch (Throwable t) {
13381327
client.close();
@@ -1361,6 +1350,47 @@ private void ensureConnected() {
13611350
host, port, inFlightWindowSize, client.getServerQwpVersion());
13621351
}
13631352

1353+
/**
1354+
* Build and connect a fresh WebSocket client using the sender's
1355+
* persistent config (host/port/TLS/auth/durable-ack flag). Used both
1356+
* for the initial connect and as the reconnect factory passed to the
1357+
* cursor I/O loop. Throws {@link LineSenderException} on any failure
1358+
* — the I/O loop's reconnect path treats a throw as fatal for that
1359+
* attempt (and, in the follow-up commit, schedules a backoff retry
1360+
* within the per-outage time cap).
1361+
*/
1362+
private WebSocketClient buildAndConnect() {
1363+
WebSocketClient newClient;
1364+
if (tlsConfig != null) {
1365+
newClient = WebSocketClientFactory.newTlsInstance(tlsConfig);
1366+
} else {
1367+
newClient = WebSocketClientFactory.newPlainTextInstance();
1368+
}
1369+
try {
1370+
newClient.setQwpMaxVersion(QwpConstants.MAX_SUPPORTED_INGEST_VERSION);
1371+
newClient.setQwpClientId(QwpConstants.CLIENT_ID);
1372+
newClient.setQwpRequestDurableAck(requestDurableAck);
1373+
newClient.connect(host, port);
1374+
newClient.upgrade(WRITE_PATH, authorizationHeader);
1375+
} catch (Exception e) {
1376+
newClient.close();
1377+
throw new LineSenderException("Failed to connect to " + host + ":" + port, e);
1378+
}
1379+
return newClient;
1380+
}
1381+
1382+
/**
1383+
* Called by the cursor I/O loop after a successful reconnect. The wire
1384+
* state has been reset and the cursor repositioned for replay; we bump
1385+
* connectionGeneration so the producer thread's next encode treats the
1386+
* connection as fresh (full schema definitions, not refs the new server
1387+
* has never seen). Single-writer (the I/O thread invokes this), so a
1388+
* plain volatile increment is safe.
1389+
*/
1390+
private void onWireReconnect() {
1391+
connectionGeneration++;
1392+
}
1393+
13641394
private void ensureNoInProgressRow() {
13651395
if (currentTableBuffer != null && currentTableBuffer.hasInProgressRow()) {
13661396
throw new LineSenderException(

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ public MmapSegment firstSealed() {
290290
return ring.firstSealed();
291291
}
292292

293+
/** Pass-through to {@link SegmentRing#findSegmentContaining(long)}. */
294+
public MmapSegment findSegmentContaining(long fsn) {
295+
return ring.findSegmentContaining(fsn);
296+
}
297+
293298
/** Configured per-segment size in bytes. */
294299
public long segmentSizeBytes() {
295300
return segmentSizeBytes;

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,20 +73,28 @@ public final class CursorWebSocketSendLoop implements QuietCloseable {
7373
public static final long DEFAULT_PARK_NANOS = 50_000L; // 50us idle backoff
7474
private static final Logger LOG = LoggerFactory.getLogger(CursorWebSocketSendLoop.class);
7575

76-
private final WebSocketClient client;
7776
private final AtomicLong consecutiveSendErrors = new AtomicLong();
7877
private final CursorSendEngine engine;
79-
// fsnAtZero: FSN that wireSeq=0 maps to on this connection. For a fresh
80-
// connection starting from a fresh engine (no recovery), this is 0.
81-
// Once recovery / reconnect lands (PR2), this is set to the first
82-
// unacked FSN at connect time so wire-seq math stays aligned.
83-
private final long fsnAtZero;
8478
private final long parkNanos;
8579
private final WebSocketResponse response = new WebSocketResponse();
8680
private final ResponseHandler responseHandler = new ResponseHandler();
8781
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
8882
private final AtomicLong totalAcks = new AtomicLong();
8983
private final AtomicLong totalFramesSent = new AtomicLong();
84+
private final AtomicLong totalReconnects = new AtomicLong();
85+
// Optional reconnect plumbing. If both are non-null, a wire failure
86+
// triggers a reconnect attempt instead of a terminal fail(). The factory
87+
// produces a fresh, connected+upgraded WebSocketClient; the listener is
88+
// notified after the wire state has been reset so the producer thread
89+
// can bump its connectionGeneration.
90+
private final ReconnectFactory reconnectFactory;
91+
private final ReconnectListener reconnectListener;
92+
private WebSocketClient client;
93+
// fsnAtZero: FSN that wireSeq=0 maps to on the current connection. For
94+
// a fresh connection, this is 0. After a reconnect, it's set to
95+
// engine.ackedFsn() + 1 — the first frame we replay maps to wireSeq=0
96+
// on the new connection so server-side ACK math stays aligned.
97+
private long fsnAtZero;
9098
// sendingSegment: the segment we're currently consuming bytes from. Starts
9199
// at engine.activeSegment(); advances to newer sealed segments / the new
92100
// active as the producer rotates.
@@ -100,18 +108,61 @@ public final class CursorWebSocketSendLoop implements QuietCloseable {
100108
private Thread ioThread;
101109

102110
public CursorWebSocketSendLoop(WebSocketClient client, CursorSendEngine engine) {
103-
this(client, engine, 0L, DEFAULT_PARK_NANOS);
111+
this(client, engine, 0L, DEFAULT_PARK_NANOS, null, null);
104112
}
105113

106114
public CursorWebSocketSendLoop(WebSocketClient client, CursorSendEngine engine,
107115
long fsnAtZero, long parkNanos) {
116+
this(client, engine, fsnAtZero, parkNanos, null, null);
117+
}
118+
119+
/**
120+
* Full constructor with reconnect plumbing. When {@code reconnectFactory}
121+
* and {@code reconnectListener} are both non-null, the I/O thread treats
122+
* wire failures (send/receive errors, server-initiated close) as
123+
* recoverable: it calls the factory to obtain a fresh connected client,
124+
* resets wire state, repositions its replay cursor at
125+
* {@code engine.ackedFsn() + 1}, and notifies the listener so the
126+
* producer can bump its {@code connectionGeneration}. Either being null
127+
* disables reconnect (legacy behavior — single failure is terminal).
128+
*/
129+
public CursorWebSocketSendLoop(WebSocketClient client, CursorSendEngine engine,
130+
long fsnAtZero, long parkNanos,
131+
ReconnectFactory reconnectFactory,
132+
ReconnectListener reconnectListener) {
108133
if (client == null || engine == null) {
109134
throw new IllegalArgumentException("client and engine must be non-null");
110135
}
111136
this.client = client;
112137
this.engine = engine;
113138
this.fsnAtZero = fsnAtZero;
114139
this.parkNanos = parkNanos;
140+
this.reconnectFactory = reconnectFactory;
141+
this.reconnectListener = reconnectListener;
142+
}
143+
144+
/**
145+
* Factory used by the I/O loop to build a fresh, connected, upgraded
146+
* {@link WebSocketClient} after a wire failure. Implementations close
147+
* the old client (if needed), build a new one with the same auth/TLS
148+
* config, connect, perform the WebSocket upgrade, and return it ready
149+
* to send. Throw on a terminal failure (auth rejection, etc.) — the
150+
* I/O loop will treat the throw as fatal.
151+
*/
152+
@FunctionalInterface
153+
public interface ReconnectFactory {
154+
WebSocketClient reconnect() throws Exception;
155+
}
156+
157+
/**
158+
* Notified after a successful reconnect — wire state has been reset and
159+
* the cursor repositioned for replay. Implementations typically bump a
160+
* {@code connectionGeneration} counter the producer thread reads so
161+
* the next encode emits full schema definitions instead of refs.
162+
*/
163+
@FunctionalInterface
164+
public interface ReconnectListener {
165+
void onReconnect();
115166
}
116167

117168
/**
@@ -153,6 +204,10 @@ public long getTotalFramesSent() {
153204
return totalFramesSent.get();
154205
}
155206

207+
public long getTotalReconnects() {
208+
return totalReconnects.get();
209+
}
210+
156211
public synchronized void start() {
157212
if (ioThread != null) {
158213
throw new IllegalStateException("already started");
@@ -201,14 +256,94 @@ private MmapSegment advanceSegment() {
201256
return liveActive;
202257
}
203258

259+
/**
260+
* Surface a wire failure. With reconnect plumbing wired (factory +
261+
* listener both non-null), tries one reconnect first; success returns
262+
* silently and the I/O loop continues with reset wire state. Failure
263+
* (or no reconnect plumbing) records the error and stops the loop.
264+
* <p>
265+
* Backoff / per-outage time cap / auth-failure detection land in the
266+
* follow-up commit; this commit proves the mechanics with a single
267+
* attempt.
268+
*/
204269
private void fail(Throwable t) {
270+
if (reconnectFactory != null && reconnectListener != null && running) {
271+
LOG.warn("cursor I/O loop wire failure, attempting reconnect: {}", t.getMessage());
272+
try {
273+
WebSocketClient newClient = reconnectFactory.reconnect();
274+
if (newClient != null) {
275+
swapClient(newClient);
276+
totalReconnects.incrementAndGet();
277+
reconnectListener.onReconnect();
278+
LOG.info("cursor I/O loop reconnected; replaying from FSN {}", fsnAtZero);
279+
return;
280+
}
281+
} catch (Throwable reconnectError) {
282+
LOG.error("cursor I/O loop reconnect failed: {}", reconnectError.getMessage(),
283+
reconnectError);
284+
t = new LineSenderException(
285+
"cursor I/O loop wire failure followed by reconnect failure: "
286+
+ reconnectError.getMessage(), reconnectError);
287+
}
288+
}
205289
if (lastError == null) {
206290
lastError = t;
207291
}
208292
running = false;
209293
LOG.error("Cursor I/O loop failure: {}", t.getMessage(), t);
210294
}
211295

296+
/**
297+
* Reset wire state for a fresh connection: install the new client,
298+
* realign {@code fsnAtZero} to the next unacked FSN, restart wire
299+
* sequencing from 0, and reposition the cursor so the next
300+
* {@link #trySendOne} call replays the first unacked frame.
301+
*/
302+
private void swapClient(WebSocketClient newClient) {
303+
WebSocketClient old = this.client;
304+
this.client = newClient;
305+
if (old != null) {
306+
try {
307+
old.close();
308+
} catch (Throwable ignored) {
309+
// best-effort
310+
}
311+
}
312+
long replayStart = engine.ackedFsn() + 1L;
313+
this.fsnAtZero = replayStart;
314+
this.nextWireSeq = 0L;
315+
this.consecutiveSendErrors.set(0L);
316+
positionCursorAt(replayStart);
317+
}
318+
319+
/**
320+
* Walk the engine's segments to find the one containing {@code targetFsn},
321+
* and set {@code sendOffset} to the byte offset of that frame within it.
322+
* If {@code targetFsn} is past everything published, park at the live
323+
* active segment's published offset (caller will wait for new bytes).
324+
*/
325+
private void positionCursorAt(long targetFsn) {
326+
MmapSegment seg = engine.findSegmentContaining(targetFsn);
327+
if (seg == null) {
328+
// targetFsn is at or past publishedFsn — nothing to replay.
329+
// Resume from the active segment's tip; producer may add more.
330+
sendingSegment = engine.activeSegment();
331+
sendOffset = sendingSegment.publishedOffset();
332+
return;
333+
}
334+
sendingSegment = seg;
335+
// Walk frame-by-frame from HEADER_SIZE until we land on targetFsn.
336+
long offset = MmapSegment.HEADER_SIZE;
337+
long fsn = seg.baseSeq();
338+
long base = seg.address();
339+
while (fsn < targetFsn) {
340+
int payloadLen = Unsafe.getUnsafe().getInt(base + offset + 4);
341+
offset += MmapSegment.FRAME_HEADER_SIZE + payloadLen;
342+
fsn++;
343+
}
344+
sendOffset = offset;
345+
}
346+
212347
private void ioLoop() {
213348
try {
214349
while (running) {

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentRing.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,35 @@ public synchronized MmapSegment firstSealed() {
443443
return sealedSegments.size() > 0 ? sealedSegments.get(0) : null;
444444
}
445445

446+
/**
447+
* Returns the segment whose published frame range covers {@code fsn}, or
448+
* {@code null} if no segment currently holds it (e.g. the FSN is past
449+
* {@code publishedFsn} or has been trimmed). Used by the reconnect path
450+
* to position the I/O thread's cursor at the first unacked frame for
451+
* replay.
452+
* <p>
453+
* Walks sealed first (oldest → newest) then the active. The sealed list
454+
* is small enough — and reconnects are rare enough — that the linear
455+
* scan cost doesn't matter.
456+
*/
457+
public synchronized MmapSegment findSegmentContaining(long fsn) {
458+
for (int i = 0, n = sealedSegments.size(); i < n; i++) {
459+
MmapSegment s = sealedSegments.get(i);
460+
long base = s.baseSeq();
461+
if (fsn >= base && fsn < base + s.frameCount()) {
462+
return s;
463+
}
464+
}
465+
MmapSegment a = active;
466+
if (a != null) {
467+
long base = a.baseSeq();
468+
if (fsn >= base && fsn < base + a.frameCount()) {
469+
return a;
470+
}
471+
}
472+
return null;
473+
}
474+
446475
/**
447476
* Segment manager pre-creates the next segment and parks it here. The
448477
* producer consumes the spare on its next rotation. Throws if a spare

0 commit comments

Comments
 (0)