Skip to content

Commit 637c0e4

Browse files
authored
feat(qwp): transaction support (#34)
1 parent 1af4540 commit 637c0e4

17 files changed

Lines changed: 258 additions & 140 deletions

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,7 @@ public int getTimeout() {
10741074
private long reconnectMaxDurationMillis = PARAMETER_NOT_SET_EXPLICITLY;
10751075
private boolean requestDurableAck;
10761076
private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY;
1077+
private boolean transactional;
10771078
private String senderId = DEFAULT_SENDER_ID;
10781079
// Per-append deadline for SF appendBlocking spin-then-throw. Used to
10791080
// be a hardcoded 30s constant; expose so tight-SLA users can lower
@@ -1531,6 +1532,7 @@ public Sender build() {
15311532
// closing the engine alone would leak the I/O thread,
15321533
// dispatcher daemon, drainer pool, microbatch buffers and
15331534
// WebSocketClient inside the abandoned `connected`.
1535+
connected.setTransactional(transactional);
15341536
try {
15351537
// Once the foreground sender is up, dispatch drainers
15361538
// for any sibling orphan slots. Scan AFTER we acquire
@@ -2402,6 +2404,24 @@ public LineSenderBuilder requestDurableAck(boolean enabled) {
24022404
return this;
24032405
}
24042406

2407+
/**
2408+
* Enables transactional mode. Auto-flush sends data to the server
2409+
* with {@code FLAG_DEFER_COMMIT}; only an explicit {@code flush()}
2410+
* triggers the server-side WAL commit. This allows accumulating
2411+
* datasets larger than the server's recv buffer while committing
2412+
* atomically per table.
2413+
*
2414+
* @param enabled true to enable transactional mode
2415+
* @return this instance for method chaining
2416+
*/
2417+
public LineSenderBuilder transactional(boolean enabled) {
2418+
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
2419+
throw new LineSenderException("transactional is only supported for WebSocket transport");
2420+
}
2421+
this.transactional = enabled;
2422+
return this;
2423+
}
2424+
24052425
/**
24062426
* Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server.
24072427
* <br>
@@ -3122,6 +3142,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
31223142
} else {
31233143
throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]");
31243144
}
3145+
} else if (Chars.equals("transaction", sink)) {
3146+
if (protocol != PROTOCOL_WEBSOCKET) {
3147+
throw new LineSenderException("transaction is only supported for WebSocket transport");
3148+
}
3149+
pos = getValue(configurationString, pos, sink, "transaction");
3150+
if (Chars.equalsIgnoreCase("on", sink)) {
3151+
transactional(true);
3152+
} else if (Chars.equalsIgnoreCase("off", sink)) {
3153+
transactional(false);
3154+
} else {
3155+
throw new LineSenderException("invalid transaction [value=").put(sink).put(", allowed-values=[on, off]]");
3156+
}
31253157
} else if (Chars.equals("max_schemas_per_connection", sink)) {
31263158
if (protocol != PROTOCOL_WEBSOCKET) {
31273159
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");

core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ public int getWritePos() {
225225
return writePos;
226226
}
227227

228+
@Override
229+
public void patchByte(int offset, byte value) {
230+
Unsafe.getUnsafe().putByte(bufPtr + offset, value);
231+
}
232+
228233
/**
229234
* Patches an int value at the specified offset.
230235
*/

core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ public int getWritableBytes() {
138138
return capacity - position;
139139
}
140140

141+
@Override
142+
public void patchByte(int offset, byte value) {
143+
Unsafe.getUnsafe().putByte(bufferPtr + offset, value);
144+
}
145+
141146
/**
142147
* Patches an int value at the specified offset.
143148
* Used for updating length fields after writing content.

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ public interface QwpBufferWriter extends ArrayBufferAppender {
8686
*/
8787
int getWritableBytes();
8888

89+
/**
90+
* Patches a byte value at the specified offset in the buffer.
91+
*
92+
* @param offset the byte offset from buffer start
93+
* @param value the byte value to write
94+
*/
95+
void patchByte(int offset, byte value);
96+
8997
/**
9098
* Patches an int value at the specified offset in the buffer.
9199
* <p>

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private void writeSymbolColumn(QwpTableBuffer.ColumnBuffer col, int count, int d
254254
long dataAddr = col.getDataAddress();
255255
buffer.putVarint(dictionarySize);
256256
for (int i = 0; i < dictionarySize; i++) {
257-
buffer.putString((String) col.getSymbolValue(i));
257+
buffer.putString(col.getSymbolValue(i));
258258
}
259259

260260
for (int i = 0; i < count; i++) {

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ public boolean isGorillaEnabled() {
126126
return (flags & FLAG_GORILLA) != 0;
127127
}
128128

129+
public void setDeferCommit(boolean defer) {
130+
if (defer) {
131+
flags |= FLAG_DEFER_COMMIT;
132+
} else {
133+
flags &= ~FLAG_DEFER_COMMIT;
134+
}
135+
}
136+
129137
public void setGorillaEnabled(boolean enabled) {
130138
if (enabled) {
131139
flags |= FLAG_GORILLA;

0 commit comments

Comments
 (0)