Skip to content

Commit 71afa21

Browse files
bluestreak01claude
andcommitted
feat(ilp): connectionGeneration foundation + encode-mid-reconnect retry
Re-adds the volatile generation counter (and its companion retry loop in flushPendingRows) that the cursor strip had removed. This is the foundation the reconnect work (#20/#21) builds on — the producer needs a way to detect that the wire-side actor has rotated state mid-encode so it can discard now-poisoned schema-ID refs and re-encode with full schema definitions. What lands here: * QwpWebSocketSender: volatile connectionGeneration + lastSeenGeneration pair. Bumped on initial recovery from disk (the recovered FSNs were never seen by *this* server connection, so the first batch must re-publish full schemas). Reconnect path will bump in subsequent work. * flushPendingRows: encode-mid-reconnect retry loop. Sample gen before encode + after finishMessage; if it changed, discard the encoded bytes (table buffers haven't been reset yet — source rows are intact) and retry with reset schema state. Bounded at MAX_SCHEMA_RACE_RETRIES = 10 so reconnect-faster-than-encode surfaces a hard error instead of spinning. * CursorSendEngine.wasRecoveredFromDisk(): single-bit accessor the sender reads during ensureConnected to decide whether to bump. * SegmentRing.openExisting: filter out empty hot-spare leftovers (frameCount=0) from prior sessions. Those carry the provisional baseSeq=0 and would otherwise collide with the real baseSeq=0 segment and trip the contiguity check. Surfaced by the new recovery test — caught a real bug in the recovery scan. * Test hooks bumpConnectionGenerationForTest / accessors for gen and maxSent*Id so reconnect-effect tests can run without spinning up the (still-not-implemented) reconnect path. Tests cover: gen=0 for fresh connect, gen=1 after disk recovery, gen bump triggers schema-state reset on the next encode and is sticky (further flushes without bump don't re-reset). Spec decisions #4 and #5 land here. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3caa2d3 commit 71afa21

4 files changed

Lines changed: 369 additions & 23 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 114 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,22 @@ public class QwpWebSocketSender implements Sender {
164164
// 0 or -1 means "fast close" (skip the drain); otherwise close blocks
165165
// up to this many millis for ackedFsn to catch up to publishedFsn.
166166
private long closeFlushTimeoutMillis = 5_000L;
167+
// Single volatile counter, single writer (the wire-side actor that
168+
// performs reconnect; for now: ensureConnected during recovery).
169+
// Bumped on every successful reconnect AND on initial recovery from
170+
// disk. Producer thread reads it inside flushPendingRows to decide
171+
// whether to reset schema state (the new server has no memory of the
172+
// old connection's schema IDs) and to detect the encode-mid-reconnect
173+
// race. See design/qwp-cursor-durability.md "Schema state on reconnect".
174+
private volatile long connectionGeneration;
175+
// Producer-thread-only mirror of the last connectionGeneration value
176+
// we encoded against. When connectionGeneration > lastSeenGeneration,
177+
// the producer must reset schema state before the next encode.
178+
private long lastSeenGeneration;
179+
// Bound on the encode-retry loop in flushPendingRows. Reconnect
180+
// firing 10x faster than the producer can encode a single batch is
181+
// pathological — surface a hard error rather than spin.
182+
private static final int MAX_SCHEMA_RACE_RETRIES = 10;
167183

168184
private QwpWebSocketSender(
169185
String host,
@@ -842,6 +858,37 @@ public int getPendingRowCount() {
842858
return pendingRowCount;
843859
}
844860

861+
/**
862+
* Test hook: simulate a wire-side reconnect by bumping the
863+
* connectionGeneration counter. The next call into {@code flushPendingRows}
864+
* will detect the divergence and reset schema state. Production wire
865+
* code will call this from the I/O loop's reconnect path; tests use
866+
* it to exercise the schema-reset machinery without spinning up a
867+
* reconnect scenario.
868+
*/
869+
@TestOnly
870+
public void bumpConnectionGenerationForTest() {
871+
connectionGeneration++;
872+
}
873+
874+
/** Test accessor for the volatile generation counter. */
875+
@TestOnly
876+
public long getConnectionGenerationForTest() {
877+
return connectionGeneration;
878+
}
879+
880+
/** Test accessor: highest schema ID confirmed sent on the current connection. */
881+
@TestOnly
882+
public int getMaxSentSchemaIdForTest() {
883+
return maxSentSchemaId;
884+
}
885+
886+
/** Test accessor: highest symbol ID confirmed sent on the current connection. */
887+
@TestOnly
888+
public int getMaxSentSymbolIdForTest() {
889+
return maxSentSymbolId;
890+
}
891+
845892
@TestOnly
846893
public QwpTableBuffer getTableBuffer(String tableName) {
847894
QwpTableBuffer buffer = tableBuffers.get(tableName);
@@ -1298,6 +1345,15 @@ private void ensureConnected() {
12981345
// Server starts fresh on each connection — discard any schema IDs
12991346
// retained from prior state.
13001347
resetSchemaStateForNewConnection();
1348+
// If the cursor engine recovered an existing on-disk slot, the
1349+
// recovered FSNs were never seen by *this* server connection. Bump
1350+
// connectionGeneration so flushPendingRows treats the next batch as
1351+
// post-reconnect (full schema definitions, not refs). lastSeenGeneration
1352+
// stays at 0 — the divergence is what signals "reset needed" in the
1353+
// producer's retry loop.
1354+
if (cursorEngine != null && cursorEngine.wasRecoveredFromDisk()) {
1355+
connectionGeneration = 1L;
1356+
}
13011357
connectionError.set(null);
13021358

13031359
connected = true;
@@ -1344,36 +1400,72 @@ private void flushPendingRows() {
13441400
}
13451401

13461402
ensureActiveBufferReady();
1347-
int batchMaxSchemaId = maxSentSchemaId;
1348-
encoder.beginMessage(tableCount, globalSymbolDictionary, maxSentSymbolId, currentBatchMaxSymbolId);
1349-
for (int i = 0, n = keys.size(); i < n; i++) {
1350-
CharSequence tableName = keys.getQuick(i);
1351-
if (tableName == null) {
1352-
continue;
1353-
}
1354-
QwpTableBuffer tableBuffer = tableBuffers.get(tableName);
1355-
if (tableBuffer == null || tableBuffer.getRowCount() == 0) {
1356-
continue;
1403+
// Encode-mid-reconnect race retry loop. The wire-side actor (today
1404+
// the recovery startup; soon the I/O loop's reconnect path) bumps
1405+
// connectionGeneration after resetting wire state. If a bump fires
1406+
// while we're encoding, the bytes we're about to emit may carry
1407+
// schema-ID refs the new server has never assigned — the server
1408+
// would reject the batch and we'd lose data. Detect by sampling
1409+
// gen before encode and re-sampling after finishMessage; if it
1410+
// changed, discard the encoded bytes (table buffers are NOT yet
1411+
// reset, so source rows are intact) and retry. Bounded so
1412+
// reconnect-faster-than-encode surfaces a hard error.
1413+
int batchMaxSchemaId;
1414+
int messageSize;
1415+
QwpBufferWriter buffer;
1416+
int retries = 0;
1417+
while (true) {
1418+
long genBefore = connectionGeneration;
1419+
if (genBefore != lastSeenGeneration) {
1420+
resetSchemaStateForNewConnection();
1421+
lastSeenGeneration = genBefore;
13571422
}
1423+
int currBatchMaxSchemaId = maxSentSchemaId;
1424+
encoder.beginMessage(tableCount, globalSymbolDictionary, maxSentSymbolId, currentBatchMaxSymbolId);
1425+
for (int i = 0, n = keys.size(); i < n; i++) {
1426+
CharSequence tableName = keys.getQuick(i);
1427+
if (tableName == null) {
1428+
continue;
1429+
}
1430+
QwpTableBuffer tableBuffer = tableBuffers.get(tableName);
1431+
if (tableBuffer == null || tableBuffer.getRowCount() == 0) {
1432+
continue;
1433+
}
1434+
1435+
if (tableBuffer.getSchemaId() < 0) {
1436+
if (nextSchemaId >= maxSchemasPerConnection) {
1437+
throw new LineSenderException("maximum schemas per connection exceeded")
1438+
.put("[maxSchemasPerConnection=").put(maxSchemasPerConnection).put(']');
1439+
}
1440+
tableBuffer.setSchemaId(nextSchemaId++);
1441+
}
1442+
currBatchMaxSchemaId = Math.max(currBatchMaxSchemaId, tableBuffer.getSchemaId());
1443+
boolean useSchemaRef = tableBuffer.getSchemaId() <= maxSentSchemaId;
13581444

1359-
if (tableBuffer.getSchemaId() < 0) {
1360-
if (nextSchemaId >= maxSchemasPerConnection) {
1361-
throw new LineSenderException("maximum schemas per connection exceeded")
1362-
.put("[maxSchemasPerConnection=").put(maxSchemasPerConnection).put(']');
1445+
if (LOG.isDebugEnabled()) {
1446+
LOG.debug("Encoding table [name={}, rows={}, maxSentSymbolId={}, batchMaxId={}, useSchemaRef={}]", tableName, tableBuffer.getRowCount(), maxSentSymbolId, currentBatchMaxSymbolId, useSchemaRef);
13631447
}
1364-
tableBuffer.setSchemaId(nextSchemaId++);
1448+
1449+
encoder.addTable(tableBuffer, useSchemaRef);
13651450
}
1366-
batchMaxSchemaId = Math.max(batchMaxSchemaId, tableBuffer.getSchemaId());
1367-
boolean useSchemaRef = tableBuffer.getSchemaId() <= maxSentSchemaId;
1451+
messageSize = encoder.finishMessage();
1452+
buffer = encoder.getBuffer();
13681453

1454+
// Race detection: did the wire actor bump gen during encode?
1455+
if (connectionGeneration == genBefore) {
1456+
batchMaxSchemaId = currBatchMaxSchemaId;
1457+
break;
1458+
}
1459+
if (++retries >= MAX_SCHEMA_RACE_RETRIES) {
1460+
throw new LineSenderException(
1461+
"schema-reset race exceeded retry limit [" + MAX_SCHEMA_RACE_RETRIES
1462+
+ "] — wire reconnects are firing faster than the user thread "
1463+
+ "can encode a single batch");
1464+
}
13691465
if (LOG.isDebugEnabled()) {
1370-
LOG.debug("Encoding table [name={}, rows={}, maxSentSymbolId={}, batchMaxId={}, useSchemaRef={}]", tableName, tableBuffer.getRowCount(), maxSentSymbolId, currentBatchMaxSymbolId, useSchemaRef);
1466+
LOG.debug("Schema-reset race detected mid-encode; retrying [attempt={}]", retries);
13711467
}
1372-
1373-
encoder.addTable(tableBuffer, useSchemaRef);
13741468
}
1375-
int messageSize = encoder.finishMessage();
1376-
QwpBufferWriter buffer = encoder.getBuffer();
13771469

13781470
activeBuffer.ensureCapacity(messageSize);
13791471
activeBuffer.write(buffer.getBufferPtr(), messageSize);

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public final class CursorSendEngine implements QuietCloseable {
7575
private final SegmentRing ring;
7676
private final long segmentSizeBytes;
7777
private final long appendDeadlineNanos;
78+
// True when the constructor recovered an existing on-disk slot rather
79+
// than starting fresh. Read by QwpWebSocketSender during connect to
80+
// decide whether to bump connectionGeneration so the first batch
81+
// re-publishes schema definitions (the server has no memory of FSNs
82+
// we recovered from disk).
83+
private final boolean recoveredFromDisk;
7884
// Number of times appendBlocking observed BACKPRESSURE_NO_SPARE on its first
7985
// ring.appendOrFsn attempt. One increment per blocking-call that had to wait
8086
// for the manager (or for ACKs) — not one per spin-park. Producer-thread
@@ -150,6 +156,7 @@ private CursorSendEngine(String sfDir, long segmentSizeBytes, SegmentManager man
150156
// already on disk and corrupting ACK translation, trim, and replay.
151157
SegmentRing recovered = memoryMode ? null
152158
: SegmentRing.openExisting(sfDir, segmentSizeBytes);
159+
this.recoveredFromDisk = recovered != null;
153160
if (recovered != null) {
154161
this.ring = recovered;
155162
} else {
@@ -239,6 +246,17 @@ public void close() {
239246
ring.close();
240247
}
241248

249+
/**
250+
* True when this engine opened against a pre-existing on-disk slot
251+
* (i.e. {@code SegmentRing.openExisting} returned a non-null ring at
252+
* construction). Memory-mode engines and fresh-disk engines return
253+
* false. Used by the sender to decide whether to mark schema state as
254+
* needing a reset before the first send.
255+
*/
256+
public boolean wasRecoveredFromDisk() {
257+
return recoveredFromDisk;
258+
}
259+
242260
/** I/O thread accessor: highest FSN whose frame is fully written. */
243261
public long publishedFsn() {
244262
return ring.publishedFsn();

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentRing.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,19 @@ public static SegmentRing openExisting(String sfDir, long maxBytesPerSegment) {
154154
if (name != null && name.endsWith(".sfa") && !".".equals(name) && !"..".equals(name)) {
155155
String path = sfDir + "/" + name;
156156
try {
157-
opened.add(MmapSegment.openExisting(path));
157+
MmapSegment seg = MmapSegment.openExisting(path);
158+
// Filter out empty leftovers — typically hot-spare
159+
// segments the manager pre-allocated for a prior
160+
// session that never got rotated into active. They
161+
// carry the provisional baseSeq=0 and frameCount=0,
162+
// which would otherwise collide with the real
163+
// baseSeq=0 segment and trip the contiguity check
164+
// below. No data to recover; close + skip.
165+
if (seg.frameCount() == 0) {
166+
seg.close();
167+
} else {
168+
opened.add(seg);
169+
}
158170
} catch (MmapSegmentException ignored) {
159171
// Stray file with the .sfa extension but bad header /
160172
// unreadable: skip rather than fail the recovery.

0 commit comments

Comments
 (0)