Skip to content

Commit 1004eb2

Browse files
committed
improve Sender builder consistency
1 parent ce15fd4 commit 1004eb2

4 files changed

Lines changed: 68 additions & 10 deletions

File tree

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,9 @@ public LineSenderBuilder asyncMode(boolean enabled) {
737737
* @return this instance for method chaining
738738
*/
739739
public LineSenderBuilder autoFlushBytes(int bytes) {
740+
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
741+
throw new LineSenderException("auto flush bytes is only supported for WebSocket transport");
742+
}
740743
if (this.autoFlushBytes != PARAMETER_NOT_SET_EXPLICITLY) {
741744
throw new LineSenderException("auto flush bytes was already configured")
742745
.put("[bytes=").put(this.autoFlushBytes).put("]");
@@ -1258,6 +1261,9 @@ public LineSenderBuilder maxBackoffMillis(int maxBackoffMillis) {
12581261
* @return this instance for method chaining
12591262
*/
12601263
public LineSenderBuilder maxBufferCapacity(int maximumBufferCapacity) {
1264+
if (protocol == PROTOCOL_WEBSOCKET) {
1265+
throw new LineSenderException("maximum buffer capacity is not supported for WebSocket transport");
1266+
}
12611267
if (maximumBufferCapacity < DEFAULT_BUFFER_CAPACITY) {
12621268
throw new LineSenderException("maximum buffer capacity cannot be less than initial buffer capacity ")
12631269
.put("[maximumBufferCapacity=").put(maximumBufferCapacity)
@@ -1749,13 +1755,14 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
17491755
}
17501756
autoFlushIntervalMillis(autoFlushInterval);
17511757
} else if (Chars.equals("auto_flush_bytes", sink)) {
1752-
if (protocol != PROTOCOL_TCP) {
1753-
throw new LineSenderException("auto_flush_bytes is only supported for TCP transport");
1758+
if (protocol != PROTOCOL_TCP && protocol != PROTOCOL_WEBSOCKET) {
1759+
throw new LineSenderException("auto_flush_bytes is only supported for TCP and WebSocket transport");
17541760
}
17551761
pos = getValue(configurationString, pos, sink, "auto_flush_bytes");
1756-
if (Chars.equalsIgnoreCase("off", sink)) {
1757-
throw new LineSenderException("TCP transport must have auto_flush_bytes enabled");
1758-
} else {
1762+
if (protocol == PROTOCOL_TCP) {
1763+
if (Chars.equalsIgnoreCase("off", sink)) {
1764+
throw new LineSenderException("TCP transport must have auto_flush_bytes enabled");
1765+
}
17591766
int autoFlushBytes = parseIntValue(sink, "auto_flush_bytes");
17601767
if (initBufSizeSet) {
17611768
if (autoFlushBytes != bufferCapacity) {
@@ -1764,6 +1771,9 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
17641771
} else {
17651772
bufferCapacity(autoFlushBytes);
17661773
}
1774+
} else {
1775+
int autoFlushBytes = parseIntValue(sink, "auto_flush_bytes");
1776+
autoFlushBytes(autoFlushBytes);
17671777
}
17681778
autoFlushBytesSet = true;
17691779
} else if (Chars.equals("auto_flush", sink)) {

core/src/test/java/io/questdb/client/test/cutlass/line/LineSenderBuilderTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,11 @@ public void testConfStringValidation() throws Exception {
176176
assertConfStrError("TCPS::addr=localhost;", "invalid schema [schema=TCPS, supported-schemas=[http, https, tcp, tcps, ws, wss, udp]]");
177177
assertConfStrError("http::addr=localhost;auto_flush=off;auto_flush_interval=1;", "cannot set auto flush interval when interval based auto-flush is already disabled");
178178
assertConfStrError("http::addr=localhost;auto_flush=off;auto_flush_rows=1;", "cannot set auto flush rows when auto-flush is already disabled");
179-
assertConfStrError("http::addr=localhost;auto_flush_bytes=1024;", "auto_flush_bytes is only supported for TCP transport");
179+
assertConfStrError("http::addr=localhost;auto_flush_bytes=1024;", "auto_flush_bytes is only supported for TCP and WebSocket transport");
180180
assertConfStrError("http::addr=localhost;protocol_version=10", "current client only supports protocol version 1(text format for all datatypes), 2(binary format for part datatypes), 3(decimal datatype) or explicitly unset");
181181
assertConfStrError("http::addr=localhost:48884;max_name_len=10;", "max_name_len must be at least 16 bytes [max_name_len=10]");
182-
assertConfStrError("ws::addr=localhost;token=foo;", "token is not supported for WebSocket protocol");
183-
assertConfStrError("wss::addr=localhost;token=foo;", "token is not supported for WebSocket protocol");
182+
assertConfStrError("ws::addr=localhost;max_buf_size=1000000;", "maximum buffer capacity is not supported for WebSocket transport");
183+
assertConfStrError("wss::addr=localhost;tls_verify=unsafe_off;max_buf_size=1000000;", "maximum buffer capacity is not supported for WebSocket transport");
184184

185185
assertConfStrOk("addr=localhost:8080", "auto_flush_rows=100", "protocol_version=1");
186186
assertConfStrOk("addr=localhost:8080", "auto_flush=on", "auto_flush_rows=100", "protocol_version=2");

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/LineSenderBuilderUdpTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ public void testUdp_authNotSupported() {
8989
@Test
9090
public void testUdp_autoFlushBytesNotSupported() {
9191
assertThrowsAny(
92-
Sender.builder(Sender.Transport.UDP)
92+
() -> Sender.builder(Sender.Transport.UDP)
9393
.address("localhost")
9494
.autoFlushBytes(1000),
95-
"not supported for UDP");
95+
"auto flush bytes is only supported for WebSocket transport");
9696
}
9797

9898
@Test

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/LineSenderBuilderWebSocketTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ public void testAutoFlushBytesZero() {
120120
Assert.assertNotNull(builder);
121121
}
122122

123+
@Test
124+
public void testAutoFlushBytesNotSupportedForHttp_fails() {
125+
assertThrows("only supported for WebSocket transport",
126+
() -> Sender.builder(Sender.Transport.HTTP)
127+
.address("localhost")
128+
.autoFlushBytes(1024));
129+
}
130+
123131
@Test
124132
public void testAutoFlushIntervalMillis() {
125133
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.WEBSOCKET)
@@ -211,6 +219,14 @@ public void testBufferCapacityNegative_fails() {
211219
.bufferCapacity(-1));
212220
}
213221

222+
@Test
223+
public void testMaxBufferCapacityNotSupported_fails() {
224+
assertThrows("maximum buffer capacity is not supported for WebSocket transport",
225+
() -> Sender.builder(Sender.Transport.WEBSOCKET)
226+
.address(LOCALHOST)
227+
.maxBufferCapacity(128 * 1024));
228+
}
229+
214230
@Test
215231
public void testBuilderWithWebSocketTransport() {
216232
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.WEBSOCKET);
@@ -644,6 +660,27 @@ public void testWsConfigString_withToken() throws Exception {
644660
});
645661
}
646662

663+
@Test
664+
public void testWsConfigString_withAutoFlushBytes() {
665+
Sender.LineSenderBuilder builder = Sender.builder("ws::addr=localhost:9000;auto_flush_bytes=1024;");
666+
Assert.assertNotNull(builder);
667+
}
668+
669+
@Test
670+
public void testWsConfigString_withAutoFlushBytesDoubleSet_fails() {
671+
assertBadConfig("ws::addr=localhost:9000;auto_flush_bytes=1024;auto_flush_bytes=2048;", "already configured");
672+
}
673+
674+
@Test
675+
public void testWsConfigString_withAutoFlushBytesInvalid_fails() {
676+
assertBadConfig("ws::addr=localhost:9000;auto_flush_bytes=-1;", "cannot be negative");
677+
}
678+
679+
@Test
680+
public void testWsConfigString_withMaxBufSize_fails() {
681+
assertBadConfig("ws::addr=localhost:9000;max_buf_size=1000000;", "maximum buffer capacity is not supported for WebSocket transport");
682+
}
683+
647684
@Test
648685
public void testWsConfigString_withUsernamePassword() throws Exception {
649686
assertMemoryLeak(() -> {
@@ -665,6 +702,17 @@ public void testWssConfigString_withToken() {
665702
Assert.assertNotNull(builder);
666703
}
667704

705+
@Test
706+
public void testWssConfigString_withAutoFlushBytes() {
707+
Sender.LineSenderBuilder builder = Sender.builder("wss::addr=localhost:9000;tls_verify=unsafe_off;auto_flush_bytes=1024;");
708+
Assert.assertNotNull(builder);
709+
}
710+
711+
@Test
712+
public void testWssConfigString_withMaxBufSize_fails() {
713+
assertBadConfig("wss::addr=localhost:9000;tls_verify=unsafe_off;max_buf_size=1000000;", "maximum buffer capacity is not supported for WebSocket transport");
714+
}
715+
668716
@Test
669717
public void testWssConfigString_uppercaseNotSupported() {
670718
assertBadConfig("WSS::addr=localhost:9000;", "invalid schema");

0 commit comments

Comments
 (0)