Skip to content

Commit c33eea9

Browse files
committed
fast defaults for qwp client
1 parent 6fd5b61 commit c33eea9

6 files changed

Lines changed: 97 additions & 115 deletions

File tree

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ final class LineSenderBuilder {
536536
private static final int DEFAULT_BUFFER_CAPACITY = 64 * 1024;
537537
private static final int DEFAULT_HTTP_PORT = 9000;
538538
private static final int DEFAULT_HTTP_TIMEOUT = 30_000;
539-
private static final int DEFAULT_IN_FLIGHT_WINDOW_SIZE = 8;
539+
private static final int DEFAULT_IN_FLIGHT_WINDOW_SIZE = 128;
540540
private static final int DEFAULT_MAXIMUM_BUFFER_CAPACITY = 100 * 1024 * 1024;
541541
private static final int DEFAULT_MAX_BACKOFF_MILLIS = 1_000;
542542
private static final int DEFAULT_MAX_DATAGRAM_SIZE = 1400;
@@ -546,9 +546,9 @@ final class LineSenderBuilder {
546546
private static final int DEFAULT_TCP_PORT = 9009;
547547
private static final int DEFAULT_UDP_PORT = 9007;
548548
private static final int DEFAULT_WEBSOCKET_PORT = 9000;
549-
private static final int DEFAULT_WS_AUTO_FLUSH_BYTES = 1024 * 1024; // 1MB
549+
private static final int DEFAULT_WS_AUTO_FLUSH_BYTES = 128 * 1024; // 128KB
550550
private static final long DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms
551-
private static final int DEFAULT_WS_AUTO_FLUSH_ROWS = 500;
551+
private static final int DEFAULT_WS_AUTO_FLUSH_ROWS = 1_000;
552552
private static final int MIN_BUFFER_SIZE = AuthUtils.CHALLENGE_LEN + 1; // challenge size + 1;
553553
// The PARAMETER_NOT_SET_EXPLICITLY constant is used to detect if a parameter was set explicitly in configuration parameters
554554
// where it matters. This is needed to detect invalid combinations of parameters. Why?
@@ -561,7 +561,6 @@ final class LineSenderBuilder {
561561
private static final int PROTOCOL_WEBSOCKET = 2;
562562
private final ObjList<String> hosts = new ObjList<>();
563563
private final IntList ports = new IntList();
564-
private boolean asyncMode = false;
565564
private int autoFlushBytes = PARAMETER_NOT_SET_EXPLICITLY;
566565
private int autoFlushIntervalMillis = PARAMETER_NOT_SET_EXPLICITLY;
567566
private int autoFlushRows = PARAMETER_NOT_SET_EXPLICITLY;
@@ -701,20 +700,18 @@ public AdvancedTlsSettings advancedTls() {
701700
}
702701

703702
/**
704-
* Enable asynchronous mode for WebSocket transport.
703+
* @deprecated Async mode is now derived from {@link #inFlightWindowSize(int)}.
704+
* Window size 1 implies synchronous mode, greater than 1 implies asynchronous mode.
705+
* The default window size is 128 (asynchronous). Call {@code inFlightWindowSize(1)}
706+
* for synchronous behavior.
705707
* <br>
706-
* In async mode, rows are batched and sent asynchronously with flow control.
707-
* This provides higher throughput at the cost of more complex error handling.
708-
* <br>
709-
* This is only used when communicating over WebSocket transport.
710-
* <br>
711-
* Default is synchronous mode (false).
708+
* This method is a no-op and will be removed in a future release.
712709
*
713-
* @param enabled whether to enable async mode
710+
* @param enabled ignored
714711
* @return this instance for method chaining
715712
*/
713+
@Deprecated
716714
public LineSenderBuilder asyncMode(boolean enabled) {
717-
this.asyncMode = enabled;
718715
return this;
719716
}
720717

@@ -723,7 +720,7 @@ public LineSenderBuilder asyncMode(boolean enabled) {
723720
* <br>
724721
* This is only used when communicating over WebSocket transport.
725722
* <br>
726-
* Default value is 1MB.
723+
* Default value is 128KB.
727724
*
728725
* @param bytes maximum bytes per batch
729726
* @return this instance for method chaining
@@ -888,28 +885,16 @@ public Sender build() {
888885

889886
String wsAuthHeader = buildWebSocketAuthHeader();
890887

891-
if (asyncMode) {
892-
return QwpWebSocketSender.connectAsync(
893-
hosts.getQuick(0),
894-
ports.getQuick(0),
895-
tlsEnabled,
896-
actualAutoFlushRows,
897-
actualAutoFlushBytes,
898-
actualAutoFlushIntervalNanos,
899-
actualInFlightWindowSize,
900-
wsAuthHeader
901-
);
902-
} else {
903-
return QwpWebSocketSender.connect(
904-
hosts.getQuick(0),
905-
ports.getQuick(0),
906-
tlsEnabled,
907-
actualAutoFlushRows,
908-
actualAutoFlushBytes,
909-
actualAutoFlushIntervalNanos,
910-
wsAuthHeader
911-
);
912-
}
888+
return QwpWebSocketSender.connectAsync(
889+
hosts.getQuick(0),
890+
ports.getQuick(0),
891+
tlsEnabled,
892+
actualAutoFlushRows,
893+
actualAutoFlushBytes,
894+
actualAutoFlushIntervalNanos,
895+
actualInFlightWindowSize,
896+
wsAuthHeader
897+
);
913898
}
914899

915900
if (protocol == PROTOCOL_UDP) {
@@ -1177,9 +1162,12 @@ public LineSenderBuilder httpUsernamePassword(String username, String password)
11771162
/**
11781163
* Set the maximum number of batches that can be in-flight awaiting server acknowledgment.
11791164
* <br>
1180-
* This is only used when communicating over WebSocket transport with async mode enabled.
1165+
* This is only used when communicating over WebSocket transport.
1166+
* <br>
1167+
* A value of 1 means synchronous mode: each batch waits for an ACK before sending the next one.
1168+
* A value greater than 1 enables asynchronous mode with pipelined sends and a background I/O thread.
11811169
* <br>
1182-
* Default value is 8.
1170+
* Default value is 128 (asynchronous).
11831171
*
11841172
* @param size maximum number of in-flight batches
11851173
* @return this instance for method chaining
@@ -1774,6 +1762,13 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
17741762
int protocolVersion = parseIntValue(sink, "protocol_version");
17751763
protocolVersion(protocolVersion);
17761764
}
1765+
} else if (Chars.equals("in_flight_window", sink)) {
1766+
if (protocol != PROTOCOL_WEBSOCKET) {
1767+
throw new LineSenderException("in_flight_window is only supported for WebSocket transport");
1768+
}
1769+
pos = getValue(configurationString, pos, sink, "in_flight_window");
1770+
int windowSize = parseIntValue(sink, "in_flight_window");
1771+
inFlightWindowSize(windowSize);
17771772
} else if (Chars.equals("max_datagram_size", sink)) {
17781773
pos = getValue(configurationString, pos, sink, "max_datagram_size");
17791774
int mds = parseIntValue(sink, "max_datagram_size");
@@ -1929,9 +1924,6 @@ private void validateParameters() {
19291924
if (protocolVersion != PARAMETER_NOT_SET_EXPLICITLY) {
19301925
throw new LineSenderException("protocol version is not supported for UDP transport");
19311926
}
1932-
if (asyncMode) {
1933-
throw new LineSenderException("async mode is not supported for UDP transport");
1934-
}
19351927
if (inFlightWindowSize != PARAMETER_NOT_SET_EXPLICITLY) {
19361928
throw new LineSenderException("in-flight window size is not supported for UDP transport");
19371929
}
@@ -1957,9 +1949,6 @@ private void validateParameters() {
19571949
if (httpToken != null && (username != null || password != null)) {
19581950
throw new LineSenderException("cannot use both token and username/password authentication");
19591951
}
1960-
if (inFlightWindowSize != PARAMETER_NOT_SET_EXPLICITLY && !asyncMode) {
1961-
throw new LineSenderException("in-flight window size requires async mode");
1962-
}
19631952
if (httpPath != null) {
19641953
throw new LineSenderException("HTTP path is not supported for WebSocket protocol");
19651954
}

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@
8888
*/
8989
public class QwpWebSocketSender implements Sender {
9090

91-
public static final int DEFAULT_AUTO_FLUSH_BYTES = 1024 * 1024; // 1MB
91+
public static final int DEFAULT_AUTO_FLUSH_BYTES = 128 * 1024; // 128KB
9292
public static final long DEFAULT_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms
93-
public static final int DEFAULT_AUTO_FLUSH_ROWS = 500;
94-
public static final int DEFAULT_IN_FLIGHT_WINDOW_SIZE = InFlightWindow.DEFAULT_WINDOW_SIZE; // 8
93+
public static final int DEFAULT_AUTO_FLUSH_ROWS = 1_000;
94+
public static final int DEFAULT_IN_FLIGHT_WINDOW_SIZE = 128;
9595
private static final int DEFAULT_BUFFER_SIZE = 8192;
9696
private static final int DEFAULT_MAX_NAME_LENGTH = 127;
9797
private static final int DEFAULT_MICROBATCH_BUFFER_SIZE = 1024 * 1024; // 1MB
@@ -251,7 +251,12 @@ public static QwpWebSocketSender connect(
251251
1, // window=1 for sync behavior
252252
authorizationHeader
253253
);
254-
sender.ensureConnected();
254+
try {
255+
sender.ensureConnected();
256+
} catch (Throwable t) {
257+
sender.close();
258+
throw t;
259+
}
255260
return sender;
256261
}
257262

@@ -318,7 +323,12 @@ public static QwpWebSocketSender connectAsync(
318323
QwpWebSocketSender sender = new QwpWebSocketSender(
319324
host, port, tlsEnabled, DEFAULT_BUFFER_SIZE, autoFlushRows, autoFlushBytes, autoFlushIntervalNanos, inFlightWindowSize, authorizationHeader
320325
);
321-
sender.ensureConnected();
326+
try {
327+
sender.ensureConnected();
328+
} catch (Throwable t) {
329+
sender.close();
330+
throw t;
331+
}
322332
return sender;
323333
}
324334

core/src/test/java/io/questdb/client/test/cutlass/line/tcp/v4/QwpAllocationTestClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,7 @@ private static Sender createSender(
207207
case PROTOCOL_QWP_WEBSOCKET:
208208
Sender.LineSenderBuilder wsBuilder = Sender.builder(Sender.Transport.WEBSOCKET)
209209
.address(host)
210-
.port(port)
211-
.asyncMode(true);
210+
.port(port);
212211
if (batchSize > 0) wsBuilder.autoFlushRows(batchSize);
213212
if (flushBytes > 0) wsBuilder.autoFlushBytes(flushBytes);
214213
if (flushIntervalMs > 0) wsBuilder.autoFlushIntervalMillis((int) flushIntervalMs);

core/src/test/java/io/questdb/client/test/cutlass/line/tcp/v4/StacBenchmarkClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,7 @@ private static Sender createSender(String protocol, String host, int port,
189189
case PROTOCOL_QWP_WEBSOCKET:
190190
Sender.LineSenderBuilder b = Sender.builder(Sender.Transport.WEBSOCKET)
191191
.address(host)
192-
.port(port)
193-
.asyncMode(true);
192+
.port(port);
194193
if (batchSize > 0) b.autoFlushRows(batchSize);
195194
if (flushBytes > 0) b.autoFlushBytes(flushBytes);
196195
if (flushIntervalMs > 0) b.autoFlushIntervalMillis((int) flushIntervalMs);

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/LineSenderBuilderUdpTest.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,6 @@ public void testUdp_authNotSupported() {
8686
"not supported for UDP");
8787
}
8888

89-
@Test
90-
public void testUdp_asyncModeNotSupported() {
91-
assertThrowsAny(
92-
Sender.builder(Sender.Transport.UDP)
93-
.address("localhost")
94-
.asyncMode(true),
95-
"not supported for UDP");
96-
}
97-
9889
@Test
9990
public void testUdp_autoFlushBytesNotSupported() {
10091
assertThrowsAny(
@@ -278,6 +269,11 @@ public void testUdp_tlsEnabled_throws() {
278269
"TLS is not supported for UDP");
279270
}
280271

272+
@Test
273+
public void testUdpScheme_inFlightWindow_fails() {
274+
assertBadConfig("udp::addr=localhost:9007;in_flight_window=64;", "only supported for WebSocket");
275+
}
276+
281277
@Test
282278
public void testUdp_tokenNotSupported() {
283279
assertBadConfig("udp::addr=localhost:9007;token=foo;", "token is not supported for UDP");

0 commit comments

Comments
 (0)