Skip to content

Commit f1dd21c

Browse files
authored
fix(ilp): surface WebSocket frame-size errors to sender callers (#13)
1 parent 1ae55ac commit f1dd21c

13 files changed

Lines changed: 619 additions & 146 deletions

File tree

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@
102102
* <br>
103103
* Note: If the underlying error is permanent, retrying {@link #flush()} will fail again.
104104
* Use {@link #reset()} to discard the problematic data and continue with new data.
105+
* <br>
106+
* Note: WebSocket transport uses a terminal sender-level failure model after a
107+
* connection has been established. After a WebSocket send, ACK, or connection
108+
* failure, {@link #reset()} does not recover the sender; close it and create a
109+
* new one.
105110
*
106111
*/
107112
public interface Sender extends Closeable, ArraySender<Sender> {

core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.questdb.client.std.SecureRnd;
3636
import io.questdb.client.std.Unsafe;
3737
import io.questdb.client.std.Vect;
38+
import io.questdb.client.std.str.Utf8s;
3839

3940
/**
4041
* Zero-GC WebSocket send buffer that implements {@link ArrayBufferAppender} for direct
@@ -208,13 +209,13 @@ public int getPosition() {
208209
}
209210

210211
@Override
211-
public long getWriteAddress() {
212-
return bufPtr + writePos;
212+
public int getWritableBytes() {
213+
return bufCapacity - writePos;
213214
}
214215

215216
@Override
216-
public int getWritableBytes() {
217-
return bufCapacity - writePos;
217+
public long getWriteAddress() {
218+
return bufPtr + writePos;
218219
}
219220

220221
/**
@@ -329,32 +330,27 @@ public void putUtf8(String value) {
329330
if (value == null || value.isEmpty()) {
330331
return;
331332
}
332-
for (int i = 0, n = value.length(); i < n; i++) {
333+
334+
int charLen = value.length();
335+
ensureCapacity(charLen);
336+
337+
// Single-pass for ASCII. Mixed strings keep the ASCII prefix and resume UTF-8 encoding.
338+
long addr = bufPtr + writePos;
339+
int i = 0;
340+
for (; i < charLen; i++) {
333341
char c = value.charAt(i);
334-
if (c < 0x80) {
335-
putByte((byte) c);
336-
} else if (c < 0x800) {
337-
putByte((byte) (0xC0 | (c >> 6)));
338-
putByte((byte) (0x80 | (c & 0x3F)));
339-
} else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n) {
340-
char c2 = value.charAt(++i);
341-
if (Character.isLowSurrogate(c2)) {
342-
int codePoint = 0x10000 + ((c - 0xD800) << 10) + (c2 - 0xDC00);
343-
putByte((byte) (0xF0 | (codePoint >> 18)));
344-
putByte((byte) (0x80 | ((codePoint >> 12) & 0x3F)));
345-
putByte((byte) (0x80 | ((codePoint >> 6) & 0x3F)));
346-
putByte((byte) (0x80 | (codePoint & 0x3F)));
347-
} else {
348-
putByte((byte) '?');
349-
i--;
350-
}
351-
} else if (Character.isSurrogate(c)) {
352-
putByte((byte) '?');
353-
} else {
354-
putByte((byte) (0xE0 | (c >> 12)));
355-
putByte((byte) (0x80 | ((c >> 6) & 0x3F)));
356-
putByte((byte) (0x80 | (c & 0x3F)));
342+
if (c >= 0x80) {
343+
break;
357344
}
345+
Unsafe.getUnsafe().putByte(addr++, (byte) c);
346+
}
347+
348+
if (i == charLen) {
349+
writePos += charLen;
350+
} else {
351+
int utf8Len = Utf8s.utf8Bytes(value, i, charLen);
352+
ensureCapacity(i + utf8Len);
353+
writePos += i + Utf8s.strCpyUtf8(value, i, bufPtr + writePos + i, utf8Len);
358354
}
359355
}
360356

@@ -397,10 +393,10 @@ public void skip(int bytes) {
397393
*/
398394
public FrameInfo writeCloseFrame(int code, String reason) {
399395
int payloadLen = 2; // status code
400-
byte[] reasonBytes = null;
396+
int reasonLen = 0;
401397
if (reason != null && !reason.isEmpty()) {
402-
reasonBytes = reason.getBytes(java.nio.charset.StandardCharsets.UTF_8);
403-
payloadLen += reasonBytes.length;
398+
reasonLen = Utf8s.utf8Bytes(reason);
399+
payloadLen += reasonLen;
404400
}
405401

406402
if (payloadLen > 125) {
@@ -422,10 +418,8 @@ public FrameInfo writeCloseFrame(int code, String reason) {
422418
writePos += 2;
423419

424420
// Write reason if present
425-
if (reasonBytes != null) {
426-
for (byte reasonByte : reasonBytes) {
427-
Unsafe.getUnsafe().putByte(bufPtr + writePos++, reasonByte);
428-
}
421+
if (reasonLen > 0) {
422+
writePos += Utf8s.strCpyUtf8(reason, bufPtr + writePos, reasonLen);
429423
}
430424

431425
// Mask the payload (including status code and reason)

core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java

Lines changed: 8 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.questdb.client.std.MemoryTag;
2828
import io.questdb.client.std.QuietCloseable;
2929
import io.questdb.client.std.Unsafe;
30+
import io.questdb.client.std.str.Utf8s;
3031

3132
/**
3233
* A simple native memory buffer writer for encoding QWP v1 messages.
@@ -61,24 +62,7 @@ public NativeBufferWriter(int initialCapacity) {
6162
* @return the number of bytes needed to encode the string as UTF-8
6263
*/
6364
public static int utf8Length(String s) {
64-
if (s == null) return 0;
65-
int len = 0;
66-
for (int i = 0, n = s.length(); i < n; i++) {
67-
char c = s.charAt(i);
68-
if (c < 0x80) {
69-
len++;
70-
} else if (c < 0x800) {
71-
len += 2;
72-
} else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n && Character.isLowSurrogate(s.charAt(i + 1))) {
73-
i++;
74-
len += 4;
75-
} else if (Character.isSurrogate(c)) {
76-
len++;
77-
} else {
78-
len += 3;
79-
}
80-
}
81-
return len;
65+
return s == null ? 0 : Utf8s.utf8Bytes(s);
8266
}
8367

8468
/**
@@ -274,7 +258,7 @@ public void putString(String value) {
274258
int utf8Len = utf8Length(value);
275259
putVarint(utf8Len);
276260
ensureCapacity(utf8Len);
277-
encodeUtf8(value);
261+
encodeUtf8(value, utf8Len);
278262
}
279263
}
280264

@@ -305,10 +289,9 @@ public void putUtf8(String value) {
305289
// All ASCII — done in a single pass
306290
position += charLen;
307291
} else {
308-
// Non-ASCII — fall back to two-pass (re-encodes from start)
309-
int utf8Len = utf8Length(value);
310-
ensureCapacity(utf8Len);
311-
encodeUtf8(value);
292+
int utf8Len = Utf8s.utf8Bytes(value, i, charLen);
293+
ensureCapacity(i + utf8Len);
294+
position += i + Utf8s.strCpyUtf8(value, i, bufferPtr + position + i, utf8Len);
312295
}
313296
}
314297

@@ -355,35 +338,7 @@ private static void writeVarintDirect(long addr, long value) {
355338
Unsafe.getUnsafe().putByte(addr, (byte) value);
356339
}
357340

358-
private void encodeUtf8(String value) {
359-
long addr = bufferPtr + position;
360-
for (int i = 0, n = value.length(); i < n; i++) {
361-
char c = value.charAt(i);
362-
if (c < 0x80) {
363-
Unsafe.getUnsafe().putByte(addr++, (byte) c);
364-
} else if (c < 0x800) {
365-
Unsafe.getUnsafe().putByte(addr++, (byte) (0xC0 | (c >> 6)));
366-
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (c & 0x3F)));
367-
} else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n) {
368-
char c2 = value.charAt(++i);
369-
if (Character.isLowSurrogate(c2)) {
370-
int codePoint = 0x10000 + ((c - 0xD800) << 10) + (c2 - 0xDC00);
371-
Unsafe.getUnsafe().putByte(addr++, (byte) (0xF0 | (codePoint >> 18)));
372-
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((codePoint >> 12) & 0x3F)));
373-
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((codePoint >> 6) & 0x3F)));
374-
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (codePoint & 0x3F)));
375-
} else {
376-
Unsafe.getUnsafe().putByte(addr++, (byte) '?');
377-
i--;
378-
}
379-
} else if (Character.isSurrogate(c)) {
380-
Unsafe.getUnsafe().putByte(addr++, (byte) '?');
381-
} else {
382-
Unsafe.getUnsafe().putByte(addr++, (byte) (0xE0 | (c >> 12)));
383-
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((c >> 6) & 0x3F)));
384-
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (c & 0x3F)));
385-
}
386-
}
387-
position = (int) (addr - bufferPtr);
341+
private void encodeUtf8(String value, int utf8Len) {
342+
position += Utf8s.strCpyUtf8(value, bufferPtr + position, utf8Len);
388343
}
389344
}

0 commit comments

Comments
 (0)