Skip to content

Commit fa5c838

Browse files
bluestreak01claude
andcommitted
fix(ilp): recovery replays sealed segments from baseSeq, not active
Three correlated bugs that together orphaned data on recovery: 1. CursorWebSocketSendLoop.start() began at engine.activeSegment(), skipping every sealed segment on disk. After a crash + restart with multiple unacked segments, only the active's tail would replay; all sealed-segment data sat orphaned. Fixed by positioning at engine.ackedFsn() + 1 (same as swapClient does on reconnect) — the cursor walks sealed segments oldest-first and falls through to active only when sealed is exhausted. Existing replay test only covered single-batch replay (first batch always carries full schemas), so the gap wasn't caught. 2. CursorSendEngine recovery left ackedFsn = -1 even when earlier segments had been trimmed before the crash (lowestBaseSeq > 0). With ackedFsn -1, positionCursorAt(0) would land before any segment exists and fall through to active.publishedOffset() — same orphan symptom. Fixed by seeding ackedFsn = lowestBaseSeq - 1 on recovery; everything trimmed must have been acked, so this is a sound lower bound. 3. SegmentManager.fileGeneration started at 0 even when the slot dir already contained sf-0000000000000000.sfa from a prior session. Manager would then mint its first hot spare at that name — openCleanRW truncates the file, scrambling the in-flight mmap of the active segment under the I/O loop. Spec called for this fix at line 93 ("seeds fileGeneration to max(existing) + 1"); now done in register() by scanning the slot dir for sf-<hex>.sfa files. Test: RecoveryReplayTest writes 50 multi-segment-spanning rows against a silent server (no acks), closes fast, then opens a fresh sender against an ack server pointed at the same slot. Asserts all 50 distinct row values reach the new server. Without the start() fix, only the active segment's frames replay (subset). Without the fileGeneration fix, the in-flight mmap gets clobbered and the cursor walks zero-padded garbage. Adds getTotalFramesSent / getTotalAcks accessors on QwpWebSocketSender (used during diagnosis; useful in their own right). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 520231c commit fa5c838

5 files changed

Lines changed: 357 additions & 1 deletion

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,18 @@ public long getTotalReconnectsSucceeded() {
973973
return l == null ? 0L : l.getTotalReconnects();
974974
}
975975

976+
/** Total binary frames the cursor I/O loop has issued to the wire. */
977+
public long getTotalFramesSent() {
978+
CursorWebSocketSendLoop l = cursorSendLoop;
979+
return l == null ? 0L : l.getTotalFramesSent();
980+
}
981+
982+
/** Total binary frames whose ACKs have been received and applied. */
983+
public long getTotalAcks() {
984+
CursorWebSocketSendLoop l = cursorSendLoop;
985+
return l == null ? 0L : l.getTotalAcks();
986+
}
987+
976988
/**
977989
* Frames re-sent on the post-reconnect catch-up window — i.e. frames
978990
* whose FSN was already on the wire before the drop. Useful for

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,22 @@ private CursorSendEngine(String sfDir, long segmentSizeBytes, SegmentManager man
166166
this.recoveredFromDisk = recovered != null;
167167
if (recovered != null) {
168168
this.ring = recovered;
169+
// Seed ackedFsn to one below the lowest segment's baseSeq.
170+
// We don't know what was actually acked before the prior
171+
// session crashed, but anything trimmed off the ring's
172+
// bottom must have been acked (trim is ack-driven). Without
173+
// this seed, ackedFsn stays at -1 and the I/O loop's
174+
// start-time positioning would walk to FSN 0 — which may
175+
// not exist on disk if earlier segments have been trimmed,
176+
// causing it to fall through to the active segment's tip
177+
// and skip the unacked sealed segments entirely.
178+
MmapSegment first = recovered.firstSealed();
179+
long lowestBase = first != null
180+
? first.baseSeq()
181+
: recovered.getActive().baseSeq();
182+
if (lowestBase > 0) {
183+
recovered.acknowledge(lowestBase - 1);
184+
}
169185
} else {
170186
MmapSegment initial;
171187
String initialPath = null;

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,12 +269,33 @@ public synchronized void start() {
269269
throw new IllegalStateException("already started");
270270
}
271271
running = true;
272-
sendingSegment = engine.activeSegment();
272+
// Position the cursor at the first unsent FSN before spinning the
273+
// I/O thread. For a fresh sender, ackedFsn=-1 → start at FSN 0,
274+
// which lands on the (empty) initial active — same as the prior
275+
// hardcoded "sendingSegment = engine.activeSegment()". For a
276+
// recovered sender with sealed segments holding unsent data, this
277+
// walks back to the lowest unacked frame so sealed-segment data
278+
// actually reaches the wire — without it, start() would skip
279+
// straight to the active and orphan everything in sealed.
280+
positionCursorForStart();
273281
ioThread = new Thread(this::ioLoop, "qdb-cursor-ws-io");
274282
ioThread.setDaemon(true);
275283
ioThread.start();
276284
}
277285

286+
/**
287+
* Sets {@code fsnAtZero}, {@code nextWireSeq}, and the cursor
288+
* (sendingSegment + sendOffset) to the first unsent FSN. Visible for
289+
* tests so they can assert correct positioning without spinning a
290+
* real I/O thread + WebSocket.
291+
*/
292+
void positionCursorForStart() {
293+
long replayStart = engine.ackedFsn() + 1L;
294+
this.fsnAtZero = replayStart;
295+
this.nextWireSeq = 0L;
296+
positionCursorAt(replayStart);
297+
}
298+
278299
/**
279300
* Walks to the next segment when the current one is sealed and fully
280301
* drained. Returns the next segment to consume (newer sealed if available,

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,57 @@ public void deregister(SegmentRing ring) {
169169
public void register(SegmentRing ring, String dir) {
170170
synchronized (lock) {
171171
rings.add(new RingEntry(ring, dir));
172+
// Skip the file-generation counter past whatever's already on
173+
// disk in this slot. Without this, on recovery the manager
174+
// would mint a new spare at sf-0000000000000000.sfa — and
175+
// openCleanRW would truncate the user's existing active file
176+
// out from under the I/O loop, scrambling the in-flight mmap.
177+
// Memory-mode rings have no dir; nothing to scan there.
178+
if (dir != null) {
179+
long minNext = scanMaxGeneration(dir) + 1L;
180+
while (true) {
181+
long cur = fileGeneration.get();
182+
if (cur >= minNext) break;
183+
if (fileGeneration.compareAndSet(cur, minNext)) break;
184+
}
185+
}
172186
}
173187
ring.setManagerWakeup(this::wakeWorker);
174188
}
175189

190+
/**
191+
* Returns the highest hex-encoded generation across {@code sf-<gen>.sfa}
192+
* files in {@code dir}, or {@code -1} if none exist. Skips files that
193+
* don't match the pattern (e.g. the legacy {@code sf-initial.sfa}).
194+
*/
195+
private static long scanMaxGeneration(String dir) {
196+
long max = -1L;
197+
if (!Files.exists(dir)) return max;
198+
long find = Files.findFirst(dir);
199+
if (find == 0) return max;
200+
try {
201+
int rc = 1;
202+
while (rc > 0) {
203+
String name = Files.utf8ToString(Files.findName(find));
204+
rc = Files.findNext(find);
205+
if (name == null || !name.startsWith("sf-") || !name.endsWith(".sfa")) {
206+
continue;
207+
}
208+
String hex = name.substring(3, name.length() - 4);
209+
if (hex.length() != 16) continue;
210+
try {
211+
long gen = Long.parseUnsignedLong(hex, 16);
212+
if (gen > max) max = gen;
213+
} catch (NumberFormatException ignored) {
214+
// sf-initial.sfa or non-hex — skip
215+
}
216+
}
217+
} finally {
218+
Files.findClose(find);
219+
}
220+
return max;
221+
}
222+
176223
/**
177224
* Unparks the worker thread out of its poll-park so it processes
178225
* registered rings on the very next loop iteration. Cheap — a single
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*+*****************************************************************************
2+
* ___ _ ____ ____
3+
* / _ \ _ _ ___ ___| |_| _ \| __ )
4+
* | | | | | | |/ _ \/ __| __| | | | _ \
5+
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
6+
* \__\_\\__,_|\___||___/\__|____/|____/
7+
*
8+
* Copyright (c) 2014-2019 Appsicle
9+
* Copyright (c) 2019-2026 QuestDB
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*
23+
******************************************************************************/
24+
25+
package io.questdb.client.test.cutlass.qwp.client;
26+
27+
import io.questdb.client.Sender;
28+
import io.questdb.client.std.Files;
29+
import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer;
30+
import org.junit.After;
31+
import org.junit.Assert;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
35+
import java.io.IOException;
36+
import java.nio.ByteBuffer;
37+
import java.nio.ByteOrder;
38+
import java.nio.file.Paths;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicLong;
41+
42+
/**
43+
* Pin-down for recovery replay across sender restarts.
44+
* <p>
45+
* Previously {@code CursorWebSocketSendLoop.start()} began at the active
46+
* segment, skipping every sealed segment on disk. After a crash + restart
47+
* with multiple segments holding unacked data, the foreground sender
48+
* would orphan everything in sealed and only ship the active's tail.
49+
* <p>
50+
* Today {@code start()} positions at {@code engine.ackedFsn() + 1} —
51+
* walking sealed segments oldest-first — and the engine constructor
52+
* seeds {@code ackedFsn} to {@code lowestBaseSeq - 1} on recovery so the
53+
* positioning lands on the right segment even if earlier ones were
54+
* trimmed before the crash.
55+
*/
56+
public class RecoveryReplayTest {
57+
58+
private static final int TEST_PORT = 19_100 + (int) (System.nanoTime() % 100);
59+
private String sfDir;
60+
61+
@Before
62+
public void setUp() {
63+
sfDir = Paths.get(System.getProperty("java.io.tmpdir"),
64+
"qdb-recov-replay-" + System.nanoTime()).toString();
65+
}
66+
67+
@After
68+
public void tearDown() {
69+
if (sfDir != null) rmDirRec(sfDir);
70+
}
71+
72+
@Test
73+
public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exception {
74+
// Phase 1: silent server, sender 1 writes enough to rotate at
75+
// least once, closes fast (no drain). Slot ends up with sealed +
76+
// active segments holding unacked data.
77+
int port1 = TEST_PORT + 1;
78+
try (TestWebSocketServer silent = new TestWebSocketServer(port1, new SilentHandler())) {
79+
silent.start();
80+
Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS));
81+
82+
// Use a tight segment cap and pad each row with a sizable
83+
// payload so 50 batches genuinely span multiple segments.
84+
// Without rotation there'd be no sealed segments and the
85+
// start-position bug couldn't manifest — defeating the test.
86+
String pad = repeat("x", 64);
87+
String cfg1 = "ws::addr=localhost:" + port1
88+
+ ";sf_dir=" + sfDir
89+
+ ";sf_max_bytes=4096"
90+
+ ";close_flush_timeout_millis=0;";
91+
try (Sender s1 = Sender.fromConfig(cfg1)) {
92+
for (int i = 0; i < 50; i++) {
93+
s1.table("foo").stringColumn("p", pad).longColumn("v", (long) i).atNow();
94+
s1.flush();
95+
}
96+
}
97+
}
98+
99+
// Sanity: the slot must hold at least one sealed segment (one
100+
// that's been rotated out of active and closed). We verify by
101+
// checking publishedFsn jumps across the active segment's base
102+
// seq when re-opened — i.e. there's data in a segment older than
103+
// the active.
104+
int populatedCount = countPopulatedSegmentFiles(sfDir + "/default");
105+
Assert.assertTrue("expected multi-segment slot with data, got "
106+
+ populatedCount + " populated .sfa files",
107+
populatedCount >= 2);
108+
109+
// Phase 2: fresh server that ACKs every binary frame. Sender 2
110+
// opens the same slot. The bug-fix expectation: every frame
111+
// sender 1 wrote (50 of them) reaches the new server. Without
112+
// the fix, the sender would only ship the active segment's data
113+
// (≪ 50) and orphan the sealed segments forever.
114+
int port2 = port1 + 50;
115+
AckHandler ack = new AckHandler();
116+
try (TestWebSocketServer good = new TestWebSocketServer(port2, ack)) {
117+
good.start();
118+
Assert.assertTrue(good.awaitStart(5, TimeUnit.SECONDS));
119+
120+
String cfg2 = "ws::addr=localhost:" + port2
121+
+ ";sf_dir=" + sfDir + ";";
122+
try (Sender s2 = Sender.fromConfig(cfg2)) {
123+
// No new appends — purely replay.
124+
long deadline = System.currentTimeMillis() + 5_000;
125+
while (System.currentTimeMillis() < deadline
126+
&& ack.distinctPayloadHashes.size() < 50) {
127+
Thread.sleep(20);
128+
}
129+
}
130+
// Each row carries a unique long, so every frame's bytes are
131+
// distinct. With the start-position fix we expect all 50 of
132+
// sender 1's rows to reach server 2; without the fix the cursor
133+
// would skip straight to the active segment and orphan
134+
// everything in sealed.
135+
Assert.assertEquals(
136+
"every distinct row written by sender 1 must replay through to server 2",
137+
50, ack.distinctPayloadHashes.size());
138+
}
139+
}
140+
141+
private static int countSegmentFiles(String dir) {
142+
if (!Files.exists(dir)) return 0;
143+
long find = Files.findFirst(dir);
144+
if (find == 0) return 0;
145+
int n = 0;
146+
try {
147+
int rc = 1;
148+
while (rc > 0) {
149+
String name = Files.utf8ToString(Files.findName(find));
150+
if (name != null && name.endsWith(".sfa")) n++;
151+
rc = Files.findNext(find);
152+
}
153+
} finally {
154+
Files.findClose(find);
155+
}
156+
return n;
157+
}
158+
159+
/**
160+
* Counts only segment files that actually carry frames — opens each
161+
* .sfa via the cursor's MmapSegment recovery path and excludes the
162+
* empty hot-spares the segment manager pre-allocates. Without this
163+
* filter, the multi-segment sanity check could pass for the wrong
164+
* reason on a deployment that's only used a single segment.
165+
*/
166+
private static int countPopulatedSegmentFiles(String dir) {
167+
if (!Files.exists(dir)) return 0;
168+
long find = Files.findFirst(dir);
169+
if (find == 0) return 0;
170+
int n = 0;
171+
try {
172+
int rc = 1;
173+
while (rc > 0) {
174+
String name = Files.utf8ToString(Files.findName(find));
175+
if (name != null && name.endsWith(".sfa")) {
176+
try {
177+
io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment seg =
178+
io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment
179+
.openExisting(dir + "/" + name);
180+
try {
181+
if (seg.frameCount() > 0) n++;
182+
} finally {
183+
seg.close();
184+
}
185+
} catch (Throwable ignored) {
186+
// best-effort
187+
}
188+
}
189+
rc = Files.findNext(find);
190+
}
191+
} finally {
192+
Files.findClose(find);
193+
}
194+
return n;
195+
}
196+
197+
private static String repeat(String c, int n) {
198+
StringBuilder sb = new StringBuilder(n);
199+
for (int i = 0; i < n; i++) sb.append(c);
200+
return sb.toString();
201+
}
202+
203+
private static void rmDirRec(String dir) {
204+
if (!Files.exists(dir)) return;
205+
long find = Files.findFirst(dir);
206+
if (find != 0) {
207+
try {
208+
int rc = 1;
209+
while (rc > 0) {
210+
String name = Files.utf8ToString(Files.findName(find));
211+
if (name != null && !".".equals(name) && !"..".equals(name)) {
212+
String child = dir + "/" + name;
213+
if (!Files.remove(child)) rmDirRec(child);
214+
}
215+
rc = Files.findNext(find);
216+
}
217+
} finally {
218+
Files.findClose(find);
219+
}
220+
}
221+
Files.remove(dir);
222+
}
223+
224+
/** Receives binary frames but never acks. Sender drops them on close. */
225+
private static class SilentHandler implements TestWebSocketServer.WebSocketServerHandler {
226+
@Override
227+
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
228+
// intentionally empty
229+
}
230+
}
231+
232+
/** Acks every binary frame and tracks distinct payloads. */
233+
private static class AckHandler implements TestWebSocketServer.WebSocketServerHandler {
234+
// Distinct *payload bytes* — each row carries a unique long value
235+
// so every frame's bytes differ. Counts unique frames received,
236+
// independent of any amplification (re-sends, fragmentation).
237+
final java.util.Set<String> distinctPayloadHashes =
238+
java.util.Collections.synchronizedSet(new java.util.HashSet<>());
239+
private final AtomicLong nextSeq = new AtomicLong(0);
240+
241+
@Override
242+
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
243+
distinctPayloadHashes.add(java.util.Arrays.toString(data));
244+
try {
245+
client.sendBinary(buildAck(nextSeq.getAndIncrement()));
246+
} catch (IOException e) {
247+
throw new RuntimeException(e);
248+
}
249+
}
250+
251+
static byte[] buildAck(long seq) {
252+
byte[] buf = new byte[1 + 8 + 2];
253+
ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
254+
bb.put((byte) 0x00);
255+
bb.putLong(seq);
256+
bb.putShort((short) 0);
257+
return buf;
258+
}
259+
}
260+
}

0 commit comments

Comments
 (0)