Skip to content

Commit d54f218

Browse files
authored
feat(ilp): QWiP durable ack (#14)
1 parent b400a5a commit d54f218

13 files changed

Lines changed: 2388 additions & 141 deletions

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ public int getTimeout() {
619619
private PrivateKey privateKey;
620620
private int protocol = PARAMETER_NOT_SET_EXPLICITLY;
621621
private int protocolVersion = PARAMETER_NOT_SET_EXPLICITLY;
622+
private boolean requestDurableAck;
622623
private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY;
623624
private boolean shouldDestroyPrivKey;
624625
private boolean tlsEnabled;
@@ -932,7 +933,8 @@ public Sender build() {
932933
actualAutoFlushIntervalNanos,
933934
actualInFlightWindowSize,
934935
wsAuthHeader,
935-
actualMaxSchemasPerConnection
936+
actualMaxSchemasPerConnection,
937+
requestDurableAck
936938
);
937939
}
938940

@@ -1483,6 +1485,27 @@ public LineSenderBuilder protocolVersion(int protocolVersion) {
14831485
return this;
14841486
}
14851487

1488+
/**
1489+
* Opts the connection in for STATUS_DURABLE_ACK frames. When enabled,
1490+
* servers with primary replication will emit per-table durable-upload
1491+
* watermarks as WAL data reaches the object store.
1492+
* <p>
1493+
* This setting is only supported for WebSocket transport.
1494+
* <p>
1495+
* Observe durable progress via
1496+
* {@link QwpWebSocketSender#getHighestDurableSeqTxn(CharSequence)}.
1497+
*
1498+
* @param enabled true to request durable ACKs
1499+
* @return this instance for method chaining
1500+
*/
1501+
public LineSenderBuilder requestDurableAck(boolean enabled) {
1502+
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
1503+
throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
1504+
}
1505+
this.requestDurableAck = enabled;
1506+
return this;
1507+
}
1508+
14861509
/**
14871510
* Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server.
14881511
* <br>
@@ -1875,6 +1898,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
18751898
pos = getValue(configurationString, pos, sink, "in_flight_window");
18761899
int windowSize = parseIntValue(sink, "in_flight_window");
18771900
inFlightWindowSize(windowSize);
1901+
} else if (Chars.equals("request_durable_ack", sink)) {
1902+
if (protocol != PROTOCOL_WEBSOCKET) {
1903+
throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
1904+
}
1905+
pos = getValue(configurationString, pos, sink, "request_durable_ack");
1906+
if (Chars.equalsIgnoreCase("on", sink)) {
1907+
requestDurableAck(true);
1908+
} else if (Chars.equalsIgnoreCase("off", sink)) {
1909+
requestDurableAck(false);
1910+
} else {
1911+
throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]");
1912+
}
18781913
} else if (Chars.equals("max_schemas_per_connection", sink)) {
18791914
if (protocol != PROTOCOL_WEBSOCKET) {
18801915
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
@@ -1972,6 +2007,9 @@ private void validateParameters() {
19722007
.put(", requestedCapacity=").put(bufferCapacity)
19732008
.put("]");
19742009
}
2010+
if (requestDurableAck && protocol != PROTOCOL_WEBSOCKET) {
2011+
throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
2012+
}
19752013
if (protocol == PROTOCOL_HTTP) {
19762014
if (httpClientConfiguration.getMaximumRequestBufferSize() < httpClientConfiguration.getInitialRequestBufferSize()) {
19772015
throw new LineSenderException("maximum buffer capacity cannot be less than initial buffer capacity ")

core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ public abstract class WebSocketClient implements QuietCloseable {
107107
// QWP version negotiation
108108
private String qwpClientId;
109109
private int qwpMaxVersion = 1;
110+
// Opt-in for STATUS_DURABLE_ACK frames; sent as X-QWP-Request-Durable-Ack: true
111+
private boolean qwpRequestDurableAck;
110112
// Receive buffer (native memory)
111113
private long recvBufPtr;
112114
private int recvBufSize;
@@ -390,6 +392,16 @@ public void setQwpMaxVersion(int maxVersion) {
390392
this.qwpMaxVersion = maxVersion;
391393
}
392394

395+
/**
396+
* Enables the opt-in X-QWP-Request-Durable-Ack upgrade header. When set,
397+
* servers with primary replication configured will additionally emit
398+
* STATUS_DURABLE_ACK frames as the WAL containing committed client
399+
* messages reaches the object store.
400+
*/
401+
public void setQwpRequestDurableAck(boolean enabled) {
402+
this.qwpRequestDurableAck = enabled;
403+
}
404+
393405
/**
394406
* Non-blocking attempt to receive a WebSocket frame.
395407
* Returns immediately if no complete frame is available.
@@ -476,6 +488,9 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe
476488
sendBuffer.putAscii(qwpClientId);
477489
sendBuffer.putAscii("\r\n");
478490
}
491+
if (qwpRequestDurableAck) {
492+
sendBuffer.putAscii("X-QWP-Request-Durable-Ack: true\r\n");
493+
}
479494
if (authorizationHeader != null) {
480495
sendBuffer.putAscii("Authorization: ");
481496
sendBuffer.putAscii(authorizationHeader);

core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,28 @@ public Throwable getLastError() {
377377
return lastError.get();
378378
}
379379

380+
/**
381+
* Returns the highest batch sequence acknowledged by the server, or -1 if
382+
* no acknowledgment has been received yet.
383+
*/
384+
public long getHighestAckedSequence() {
385+
return highestAcked;
386+
}
387+
380388
/**
381389
* Returns the maximum window size.
382390
*/
383391
public int getMaxWindowSize() {
384392
return maxWindowSize;
385393
}
386394

395+
/**
396+
* Returns the timeout (ms) applied to blocking window operations.
397+
*/
398+
public long getTimeoutMs() {
399+
return timeoutMs;
400+
}
401+
387402
/**
388403
* Returns the total number of batches acknowledged.
389404
*/

0 commit comments

Comments
 (0)