Skip to content

Commit 6a9235d

Browse files
mtopolnikclaude
andcommitted
Add QWP version negotiation to WebSocket upgrade
The client sends X-QWP-Max-Version and X-QWP-Client-Id headers in the WebSocket upgrade request. After the server responds with X-QWP-Version, the client reads the selected version and uses it in the version byte of all subsequent QWP message headers. QwpWebSocketEncoder.writeHeader() now uses a configurable version field instead of the hardcoded VERSION_1 constant. QwpWebSocketSender.ensureConnected() sets the version on the encoder after reading it from the upgrade response. QwpConstants gains MAX_SUPPORTED_VERSION and CLIENT_ID constants for the negotiation headers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4597aa1 commit 6a9235d

5 files changed

Lines changed: 106 additions & 2 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ public abstract class WebSocketClient implements QuietCloseable {
9898
// Connection state
9999
private CharSequence host;
100100
private int port;
101+
// QWP version negotiation
102+
private String qwpClientId;
103+
private int qwpMaxVersion = 1;
104+
private int serverQwpVersion = 1;
101105
// Receive buffer (native memory)
102106
private long recvBufPtr;
103107
private int recvBufSize;
@@ -231,6 +235,13 @@ public int getPort() {
231235
return port;
232236
}
233237

238+
/**
239+
* Returns the QWP version selected by the server during the upgrade handshake.
240+
*/
241+
public int getServerQwpVersion() {
242+
return serverQwpVersion;
243+
}
244+
234245
/**
235246
* Gets the send buffer for building WebSocket frames.
236247
* <p>
@@ -380,6 +391,20 @@ public boolean tryReceiveFrame(WebSocketFrameHandler handler) {
380391
return result != null && result;
381392
}
382393

394+
/**
395+
* Sets the QWP client identifier sent in the X-QWP-Client-Id upgrade header.
396+
*/
397+
public void setQwpClientId(String clientId) {
398+
this.qwpClientId = clientId;
399+
}
400+
401+
/**
402+
* Sets the maximum QWP version this client supports, sent in the X-QWP-Max-Version upgrade header.
403+
*/
404+
public void setQwpMaxVersion(int maxVersion) {
405+
this.qwpMaxVersion = maxVersion;
406+
}
407+
383408
/**
384409
* Performs WebSocket upgrade handshake.
385410
*
@@ -423,6 +448,14 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe
423448
sendBuffer.putAscii(handshakeKey);
424449
sendBuffer.putAscii("\r\n");
425450
sendBuffer.putAscii("Sec-WebSocket-Version: 13\r\n");
451+
sendBuffer.putAscii("X-QWP-Max-Version: ");
452+
sendBuffer.putAscii(Integer.toString(qwpMaxVersion));
453+
sendBuffer.putAscii("\r\n");
454+
if (qwpClientId != null) {
455+
sendBuffer.putAscii("X-QWP-Client-Id: ");
456+
sendBuffer.putAscii(qwpClientId);
457+
sendBuffer.putAscii("\r\n");
458+
}
426459
if (authorizationHeader != null) {
427460
sendBuffer.putAscii("Authorization: ");
428461
sendBuffer.putAscii(authorizationHeader);
@@ -889,6 +922,30 @@ private void validateUpgradeResponse(int headerEnd) {
889922
if (!containsHeaderValue(response, "Sec-WebSocket-Accept:", expectedAccept, false)) {
890923
throw new HttpClientException("Invalid Sec-WebSocket-Accept header");
891924
}
925+
926+
// Extract X-QWP-Version (optional — defaults to 1 if absent)
927+
serverQwpVersion = extractIntHeader(response, "X-QWP-Version:", 1);
928+
}
929+
930+
private static int extractIntHeader(String response, String headerName, int defaultValue) {
931+
int headerLen = headerName.length();
932+
int responseLen = response.length();
933+
for (int i = 0; i <= responseLen - headerLen; i++) {
934+
if (response.regionMatches(true, i, headerName, 0, headerLen)) {
935+
int valueStart = i + headerLen;
936+
int lineEnd = response.indexOf('\r', valueStart);
937+
if (lineEnd < 0) {
938+
lineEnd = responseLen;
939+
}
940+
String value = response.substring(valueStart, lineEnd).trim();
941+
try {
942+
return Integer.parseInt(value);
943+
} catch (NumberFormatException e) {
944+
return defaultValue;
945+
}
946+
}
947+
}
948+
return defaultValue;
892949
}
893950

894951
protected void dieWaiting(int n) {

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class QwpWebSocketEncoder implements QuietCloseable {
4242
private byte flags;
4343
private int payloadStart;
4444
private byte savedFlags;
45+
private byte version = VERSION_1;
4546

4647
public QwpWebSocketEncoder() {
4748
this.buffer = new NativeBufferWriter();
@@ -133,12 +134,16 @@ public void setGorillaEnabled(boolean enabled) {
133134
}
134135
}
135136

137+
public void setVersion(byte version) {
138+
this.version = version;
139+
}
140+
136141
public void writeHeader(int tableCount, int payloadLength) {
137142
buffer.putByte((byte) 'Q');
138143
buffer.putByte((byte) 'W');
139144
buffer.putByte((byte) 'P');
140145
buffer.putByte((byte) '1');
141-
buffer.putByte(VERSION_1);
146+
buffer.putByte(version);
142147
buffer.putByte(flags);
143148
buffer.putShort((short) tableCount);
144149
buffer.putInt(payloadLength);

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,8 @@ private void ensureConnected() {
10141014

10151015
// Connect and upgrade to WebSocket
10161016
try {
1017+
client.setQwpMaxVersion(MAX_SUPPORTED_VERSION);
1018+
client.setQwpClientId(CLIENT_ID);
10171019
client.connect(host, port);
10181020
client.upgrade(WRITE_PATH, authorizationHeader);
10191021
} catch (Exception e) {
@@ -1041,11 +1043,15 @@ private void ensureConnected() {
10411043
}
10421044
// Sync mode (window=1): no send queue - we send and read ACKs synchronously
10431045

1046+
// Use the version selected by the server
1047+
encoder.setVersion((byte) client.getServerQwpVersion());
1048+
10441049
// Clear sent schema hashes - server starts fresh on each connection
10451050
sentSchemaHashes.clear();
10461051

10471052
connected = true;
1048-
LOG.info("Connected to WebSocket [host={}, port={}, windowSize={}]", host, port, inFlightWindowSize);
1053+
LOG.info("Connected to WebSocket [host={}, port={}, windowSize={}, qwpVersion={}]",
1054+
host, port, inFlightWindowSize, client.getServerQwpVersion());
10491055
}
10501056
}
10511057

core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,18 @@ public final class QwpConstants {
240240
* Column type: VARCHAR (length-prefixed UTF-8, aux storage).
241241
*/
242242
public static final byte TYPE_VARCHAR = 0x0F;
243+
/**
244+
* Client identifier sent in the X-QWP-Client-Id upgrade header.
245+
*/
246+
public static final String CLIENT_ID = "java/1.0.2";
243247
/**
244248
* Current protocol version.
245249
*/
246250
public static final byte VERSION_1 = 1;
251+
/**
252+
* Maximum protocol version supported by this client.
253+
*/
254+
public static final byte MAX_SUPPORTED_VERSION = VERSION_1;
247255

248256
private QwpConstants() {
249257
// utility class

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketEncoderTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,34 @@
4242
*/
4343
public class QwpWebSocketEncoderTest {
4444

45+
@Test
46+
public void testVersionByteInHeader() throws Exception {
47+
assertMemoryLeak(() -> {
48+
try (QwpWebSocketEncoder encoder = new QwpWebSocketEncoder();
49+
QwpTableBuffer buffer = new QwpTableBuffer("test")) {
50+
51+
QwpTableBuffer.ColumnBuffer col = buffer.getOrCreateColumn("x", TYPE_LONG, false);
52+
col.addLong(42);
53+
buffer.nextRow();
54+
55+
// Default version
56+
int size = encoder.encode(buffer, false);
57+
Assert.assertTrue(size > 0);
58+
long ptr = encoder.getBuffer().getBufferPtr();
59+
Assert.assertEquals(1, Unsafe.getUnsafe().getByte(ptr + 4));
60+
61+
// Custom version
62+
buffer.reset();
63+
col = buffer.getOrCreateColumn("x", TYPE_LONG, false);
64+
col.addLong(42);
65+
buffer.nextRow();
66+
encoder.setVersion((byte) 3);
67+
encoder.encode(buffer, false);
68+
Assert.assertEquals(3, Unsafe.getUnsafe().getByte(ptr + 4));
69+
}
70+
});
71+
}
72+
4573
@Test
4674
public void testBufferResetAndReuse() throws Exception {
4775
assertMemoryLeak(() -> {

0 commit comments

Comments
 (0)