Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@
* <br>
* Note: If the underlying error is permanent, retrying {@link #flush()} will fail again.
* Use {@link #reset()} to discard the problematic data and continue with new data.
* <br>
* Note: WebSocket transport uses a terminal sender-level failure model after a
* connection has been established. After a WebSocket send, ACK, or connection
* failure, {@link #reset()} does not recover the sender; close it and create a
* new one.
*
*/
public interface Sender extends Closeable, ArraySender<Sender> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.questdb.client.std.SecureRnd;
import io.questdb.client.std.Unsafe;
import io.questdb.client.std.Vect;
import io.questdb.client.std.str.Utf8s;

/**
* Zero-GC WebSocket send buffer that implements {@link ArrayBufferAppender} for direct
Expand Down Expand Up @@ -208,13 +209,13 @@ public int getPosition() {
}

@Override
public long getWriteAddress() {
return bufPtr + writePos;
public int getWritableBytes() {
return bufCapacity - writePos;
}

@Override
public int getWritableBytes() {
return bufCapacity - writePos;
public long getWriteAddress() {
return bufPtr + writePos;
}

/**
Expand Down Expand Up @@ -329,32 +330,27 @@ public void putUtf8(String value) {
if (value == null || value.isEmpty()) {
return;
}
for (int i = 0, n = value.length(); i < n; i++) {

int charLen = value.length();
ensureCapacity(charLen);

// Single-pass for ASCII. Mixed strings keep the ASCII prefix and resume UTF-8 encoding.
long addr = bufPtr + writePos;
int i = 0;
for (; i < charLen; i++) {
char c = value.charAt(i);
if (c < 0x80) {
putByte((byte) c);
} else if (c < 0x800) {
putByte((byte) (0xC0 | (c >> 6)));
putByte((byte) (0x80 | (c & 0x3F)));
} else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n) {
char c2 = value.charAt(++i);
if (Character.isLowSurrogate(c2)) {
int codePoint = 0x10000 + ((c - 0xD800) << 10) + (c2 - 0xDC00);
putByte((byte) (0xF0 | (codePoint >> 18)));
putByte((byte) (0x80 | ((codePoint >> 12) & 0x3F)));
putByte((byte) (0x80 | ((codePoint >> 6) & 0x3F)));
putByte((byte) (0x80 | (codePoint & 0x3F)));
} else {
putByte((byte) '?');
i--;
}
} else if (Character.isSurrogate(c)) {
putByte((byte) '?');
} else {
putByte((byte) (0xE0 | (c >> 12)));
putByte((byte) (0x80 | ((c >> 6) & 0x3F)));
putByte((byte) (0x80 | (c & 0x3F)));
if (c >= 0x80) {
break;
}
Unsafe.getUnsafe().putByte(addr++, (byte) c);
}

if (i == charLen) {
writePos += charLen;
} else {
int utf8Len = Utf8s.utf8Bytes(value, i, charLen);
ensureCapacity(i + utf8Len);
writePos += i + Utf8s.strCpyUtf8(value, i, bufPtr + writePos + i, utf8Len);
}
}

Expand Down Expand Up @@ -397,10 +393,10 @@ public void skip(int bytes) {
*/
public FrameInfo writeCloseFrame(int code, String reason) {
int payloadLen = 2; // status code
byte[] reasonBytes = null;
int reasonLen = 0;
if (reason != null && !reason.isEmpty()) {
reasonBytes = reason.getBytes(java.nio.charset.StandardCharsets.UTF_8);
payloadLen += reasonBytes.length;
reasonLen = Utf8s.utf8Bytes(reason);
payloadLen += reasonLen;
}

if (payloadLen > 125) {
Expand All @@ -422,10 +418,8 @@ public FrameInfo writeCloseFrame(int code, String reason) {
writePos += 2;

// Write reason if present
if (reasonBytes != null) {
for (byte reasonByte : reasonBytes) {
Unsafe.getUnsafe().putByte(bufPtr + writePos++, reasonByte);
}
if (reasonLen > 0) {
writePos += Utf8s.strCpyUtf8(reason, bufPtr + writePos, reasonLen);
}

// Mask the payload (including status code and reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.questdb.client.std.MemoryTag;
import io.questdb.client.std.QuietCloseable;
import io.questdb.client.std.Unsafe;
import io.questdb.client.std.str.Utf8s;

/**
* A simple native memory buffer writer for encoding QWP v1 messages.
Expand Down Expand Up @@ -61,24 +62,7 @@ public NativeBufferWriter(int initialCapacity) {
* @return the number of bytes needed to encode the string as UTF-8
*/
public static int utf8Length(String s) {
if (s == null) return 0;
int len = 0;
for (int i = 0, n = s.length(); i < n; i++) {
char c = s.charAt(i);
if (c < 0x80) {
len++;
} else if (c < 0x800) {
len += 2;
} else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n && Character.isLowSurrogate(s.charAt(i + 1))) {
i++;
len += 4;
} else if (Character.isSurrogate(c)) {
len++;
} else {
len += 3;
}
}
return len;
return s == null ? 0 : Utf8s.utf8Bytes(s);
}

/**
Expand Down Expand Up @@ -274,7 +258,7 @@ public void putString(String value) {
int utf8Len = utf8Length(value);
putVarint(utf8Len);
ensureCapacity(utf8Len);
encodeUtf8(value);
encodeUtf8(value, utf8Len);
}
}

Expand Down Expand Up @@ -305,10 +289,9 @@ public void putUtf8(String value) {
// All ASCII — done in a single pass
position += charLen;
} else {
// Non-ASCII — fall back to two-pass (re-encodes from start)
int utf8Len = utf8Length(value);
ensureCapacity(utf8Len);
encodeUtf8(value);
int utf8Len = Utf8s.utf8Bytes(value, i, charLen);
ensureCapacity(i + utf8Len);
position += i + Utf8s.strCpyUtf8(value, i, bufferPtr + position + i, utf8Len);
}
}

Expand Down Expand Up @@ -355,35 +338,7 @@ private static void writeVarintDirect(long addr, long value) {
Unsafe.getUnsafe().putByte(addr, (byte) value);
}

private void encodeUtf8(String value) {
long addr = bufferPtr + position;
for (int i = 0, n = value.length(); i < n; i++) {
char c = value.charAt(i);
if (c < 0x80) {
Unsafe.getUnsafe().putByte(addr++, (byte) c);
} else if (c < 0x800) {
Unsafe.getUnsafe().putByte(addr++, (byte) (0xC0 | (c >> 6)));
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (c & 0x3F)));
} else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n) {
char c2 = value.charAt(++i);
if (Character.isLowSurrogate(c2)) {
int codePoint = 0x10000 + ((c - 0xD800) << 10) + (c2 - 0xDC00);
Unsafe.getUnsafe().putByte(addr++, (byte) (0xF0 | (codePoint >> 18)));
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((codePoint >> 12) & 0x3F)));
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((codePoint >> 6) & 0x3F)));
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (codePoint & 0x3F)));
} else {
Unsafe.getUnsafe().putByte(addr++, (byte) '?');
i--;
}
} else if (Character.isSurrogate(c)) {
Unsafe.getUnsafe().putByte(addr++, (byte) '?');
} else {
Unsafe.getUnsafe().putByte(addr++, (byte) (0xE0 | (c >> 12)));
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((c >> 6) & 0x3F)));
Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (c & 0x3F)));
}
}
position = (int) (addr - bufferPtr);
private void encodeUtf8(String value, int utf8Len) {
position += Utf8s.strCpyUtf8(value, bufferPtr + position, utf8Len);
}
}
Loading