Skip to content

Commit c25773f

Browse files
bluestreak01claude
andcommitted
feat(ilp): background drainer pool — adopt orphan slots and replay them
Wires the drainer runtime onto the orphan-scanner foundation. With drain_orphans=true the foreground sender now actually empties sibling slots holding unacked data instead of just logging that they exist. Per-drainer lifecycle: 1. Open CursorSendEngine on the slot — its constructor takes the slot lock; if another sender or drainer holds it, the engine throws and the drainer exits silently (LOCKED_BY_OTHER, not a failure). 2. Open a fresh WebSocketClient via the foreground sender's connect factory — separate connection, same auth/host/port/TLS config. 3. Run a CursorWebSocketSendLoop until ackedFsn catches up to the publishedFsn snapshot taken at startup. 4. On terminal failure (auth, recovery, budget), drop a .failed sentinel into the slot. Future scans skip it until an operator clears it manually — bounded retry, then human-in-the-loop. Pool: bounded fixed-thread executor, daemon threads, sized by max_background_drainers (default 4). Closes via cooperative stop + 3s grace; daemon threads ensure no JVM-exit blocking. Visibility: QwpWebSocketSender#getBackgroundDrainers returns a snapshot list of live drainers with {slot, target, acked, outcome, lastError}. Test: ghost sender writes 30 distinct rows against a silent server and closes fast — leaves an unacked slot. Foreground sender opens the same group root with a different sender_id and drain_orphans=true against an ack server; asserts every distinct payload reaches the new server. Plus a sentinel-skip test confirming an operator-set .failed file disqualifies the slot from the next foreground run's scan. Empty active segments and stale hot spares are left in the slot dir per spec decision #13 ("no automatic cleanup of empty slot dirs"); the scanner's no-op behavior on empty slots makes this cheap. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fa5c838 commit c25773f

5 files changed

Lines changed: 709 additions & 33 deletions

File tree

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,25 +1073,6 @@ public Sender build() {
10731073
}
10741074
}
10751075
slotPath = sfDir + "/" + senderId;
1076-
// Orphan scan runs BEFORE we open our own slot — keeps
1077-
// the scan's "exclude my slot" filter conceptually
1078-
// simple. If the user opted in, log what we found so
1079-
// they have visibility on pending drain candidates
1080-
// until the drainer runtime lands.
1081-
if (drainOrphans) {
1082-
io.questdb.client.std.ObjList<String> orphans =
1083-
io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner
1084-
.scan(sfDir, senderId);
1085-
if (orphans.size() > 0) {
1086-
org.slf4j.LoggerFactory.getLogger(LineSenderBuilder.class)
1087-
.info("found {} orphan slot(s) under {} (drainer "
1088-
+ "runtime not yet implemented; "
1089-
+ "max_background_drainers={}); "
1090-
+ "first paths: {}",
1091-
orphans.size(), sfDir, maxBackgroundDrainers,
1092-
sample(orphans, 3));
1093-
}
1094-
}
10951076
}
10961077
long actualSfAppendDeadlineNanos =
10971078
sfAppendDeadlineMillis == PARAMETER_NOT_SET_EXPLICITLY
@@ -1101,7 +1082,7 @@ public Sender build() {
11011082
slotPath, actualSfMaxBytes,
11021083
actualSfMaxTotalBytes, actualSfAppendDeadlineNanos);
11031084
try {
1104-
return QwpWebSocketSender.connect(
1085+
QwpWebSocketSender connected = QwpWebSocketSender.connect(
11051086
hosts.getQuick(0),
11061087
ports.getQuick(0),
11071088
wsTlsConfig,
@@ -1119,6 +1100,28 @@ public Sender build() {
11191100
actualReconnectMaxBackoffMillis,
11201101
initialConnectRetry
11211102
);
1103+
// Once the foreground sender is up, dispatch drainers
1104+
// for any sibling orphan slots. Scan AFTER we acquire
1105+
// our own slot lock so we never accidentally try to
1106+
// adopt our own data; the OrphanScanner.scan filter
1107+
// also excludes our sender_id.
1108+
if (drainOrphans && sfDir != null) {
1109+
io.questdb.client.std.ObjList<String> orphans =
1110+
io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner
1111+
.scan(sfDir, senderId);
1112+
if (orphans.size() > 0) {
1113+
org.slf4j.LoggerFactory.getLogger(LineSenderBuilder.class)
1114+
.info("dispatching drainers for {} orphan slot(s) under {} "
1115+
+ "(max_background_drainers={})",
1116+
orphans.size(), sfDir, maxBackgroundDrainers);
1117+
connected.startOrphanDrainers(
1118+
orphans,
1119+
maxBackgroundDrainers,
1120+
actualSfMaxBytes,
1121+
actualSfMaxTotalBytes);
1122+
}
1123+
}
1124+
return connected;
11221125
} catch (Throwable t) {
11231126
try {
11241127
cursorEngine.close();
@@ -1753,19 +1756,6 @@ public LineSenderBuilder senderId(String id) {
17531756
return this;
17541757
}
17551758

1756-
private static String sample(io.questdb.client.std.ObjList<String> list, int n) {
1757-
int take = Math.min(n, list.size());
1758-
StringBuilder sb = new StringBuilder("[");
1759-
for (int i = 0; i < take; i++) {
1760-
if (i > 0) sb.append(", ");
1761-
sb.append(list.get(i));
1762-
}
1763-
if (list.size() > take) {
1764-
sb.append(", ...(").append(list.size() - take).append(" more)");
1765-
}
1766-
return sb.append("]").toString();
1767-
}
1768-
17691759
private static void validateSenderId(String id) {
17701760
if (id == null || id.isEmpty()) {
17711761
throw new LineSenderException("sender_id must not be empty");

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ public class QwpWebSocketSender implements Sender {
176176
// true → startup connect goes through the same retry-with-backoff
177177
// loop as in-flight reconnect; auth failures still terminal.
178178
private boolean initialConnectRetry = false;
179+
// Orphan-slot drainer pool. Non-null only when the builder requested
180+
// drain_orphans=true AND we have a slot path to scan against. Closed
181+
// alongside the cursor send loop in close().
182+
private io.questdb.client.cutlass.qwp.client.sf.cursor.BackgroundDrainerPool
183+
drainerPool;
179184
// Single volatile counter, single writer (the wire-side actor that
180185
// performs reconnect; for now: ensureConnected during recovery).
181186
// Bumped on every successful reconnect AND on initial recovery from
@@ -641,6 +646,17 @@ public void close() {
641646
LOG.error("Error closing cursor send loop: {}", String.valueOf(e));
642647
}
643648
}
649+
// Drainer pool runs after the foreground I/O loop is wound
650+
// down — drainers don't share state with the foreground, so
651+
// ordering doesn't matter for correctness, just predictable
652+
// shutdown.
653+
if (drainerPool != null) {
654+
try {
655+
drainerPool.close();
656+
} catch (Throwable e) {
657+
LOG.error("Error closing drainer pool: {}", String.valueOf(e));
658+
}
659+
}
644660

645661
// Always free resources the I/O thread never touches:
646662
// encoder and table buffers are user-thread-only.
@@ -985,6 +1001,52 @@ public long getTotalAcks() {
9851001
return l == null ? 0L : l.getTotalAcks();
9861002
}
9871003

1004+
/**
1005+
* Starts orphan drainers for the given list of slot paths. Each path
1006+
* gets its own drainer thread, capped at {@code maxBackgroundDrainers}
1007+
* concurrent. Drainers run until the slot is fully drained or a
1008+
* terminal error occurs (then they drop a {@code .failed} sentinel).
1009+
* <p>
1010+
* Should be called once, immediately after {@code connect()} returns.
1011+
* Subsequent calls add more drainers to the same pool.
1012+
*/
1013+
public synchronized void startOrphanDrainers(
1014+
io.questdb.client.std.ObjList<String> orphanSlotPaths,
1015+
int maxBackgroundDrainers,
1016+
long segmentSizeBytes,
1017+
long sfMaxTotalBytes
1018+
) {
1019+
if (orphanSlotPaths == null || orphanSlotPaths.size() == 0
1020+
|| maxBackgroundDrainers <= 0) {
1021+
return;
1022+
}
1023+
if (drainerPool == null) {
1024+
drainerPool = new io.questdb.client.cutlass.qwp.client.sf.cursor
1025+
.BackgroundDrainerPool(maxBackgroundDrainers);
1026+
}
1027+
for (int i = 0, n = orphanSlotPaths.size(); i < n; i++) {
1028+
String slot = orphanSlotPaths.get(i);
1029+
io.questdb.client.cutlass.qwp.client.sf.cursor.BackgroundDrainer drainer =
1030+
new io.questdb.client.cutlass.qwp.client.sf.cursor.BackgroundDrainer(
1031+
slot, segmentSizeBytes, sfMaxTotalBytes,
1032+
this::buildAndConnect,
1033+
reconnectMaxDurationMillis,
1034+
reconnectInitialBackoffMillis,
1035+
reconnectMaxBackoffMillis);
1036+
drainerPool.submit(drainer);
1037+
}
1038+
}
1039+
1040+
/**
1041+
* Snapshot of drainers the foreground sender has dispatched. Useful
1042+
* for monitoring orphan-drain progress without parsing logs.
1043+
*/
1044+
public java.util.List<io.questdb.client.cutlass.qwp.client.sf.cursor.BackgroundDrainer>
1045+
getBackgroundDrainers() {
1046+
if (drainerPool == null) return java.util.Collections.emptyList();
1047+
return drainerPool.snapshot();
1048+
}
1049+
9881050
/**
9891051
* Frames re-sent on the post-reconnect catch-up window — i.e. frames
9901052
* whose FSN was already on the wire before the drop. Useful for

0 commit comments

Comments
 (0)