Skip to content

Commit 956c449

Browse files
mtopolnikclaude
andcommitted
Add schema ID validation and configurable limit
Add server-side validation that schema IDs arrive in strict monotonic sequence and do not exceed a configurable per-connection limit. The client-side sender exposes the limit through the builder API and configuration string. Server side: - QwpSchemaCache.put() validates monotonic sequence and rejects IDs that exceed maxSchemasPerConnection - Add INVALID_SCHEMA_ID error code to QwpParseException - Add DEFAULT_MAX_SCHEMAS_PER_CONNECTION (65,535) to QwpConstants - Wire the limit through LineHttpProcessorConfiguration, PropServerConfiguration, and QwpProcessorState - QwpStreamingDecoder.resetConnectionState() clears schema cache on disconnect Client side: - Sender.LineSenderBuilder.maxSchemasPerConnection() builder method and max_schemas_per_connection config string parameter - QwpWebSocketSender accepts and forwards the limit - Replace static import of QwpConstants.* with explicit prefix Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 002c965 commit 956c449

6 files changed

Lines changed: 206 additions & 108 deletions

File tree

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ final class LineSenderBuilder {
585585
private int maxBackoffMillis = PARAMETER_NOT_SET_EXPLICITLY;
586586
private int maxDatagramSize = PARAMETER_NOT_SET_EXPLICITLY;
587587
private int maxNameLength = PARAMETER_NOT_SET_EXPLICITLY;
588+
private int maxSchemasPerConnection = PARAMETER_NOT_SET_EXPLICITLY;
588589
private int maximumBufferCapacity = PARAMETER_NOT_SET_EXPLICITLY;
589590
private final HttpClientConfiguration httpClientConfiguration = new DefaultHttpClientConfiguration() {
590591
@Override
@@ -711,15 +712,14 @@ public AdvancedTlsSettings advancedTls() {
711712
}
712713

713714
/**
715+
* @param enabled ignored
716+
* @return this instance for method chaining
714717
* @deprecated Async mode is now derived from {@link #inFlightWindowSize(int)}.
715718
* Window size 1 implies synchronous mode, greater than 1 implies asynchronous mode.
716719
* The default window size is 128 (asynchronous). Call {@code inFlightWindowSize(1)}
717720
* for synchronous behavior.
718721
* <br>
719722
* This method is a no-op and will be removed in a future release.
720-
*
721-
* @param enabled ignored
722-
* @return this instance for method chaining
723723
*/
724724
@Deprecated
725725
public LineSenderBuilder asyncMode(boolean enabled) {
@@ -901,6 +901,8 @@ public Sender build() {
901901
? DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS
902902
: TimeUnit.MILLISECONDS.toNanos(autoFlushIntervalMillis);
903903
int actualInFlightWindowSize = inFlightWindowSize == PARAMETER_NOT_SET_EXPLICITLY ? DEFAULT_IN_FLIGHT_WINDOW_SIZE : inFlightWindowSize;
904+
int actualMaxSchemasPerConnection = maxSchemasPerConnection == PARAMETER_NOT_SET_EXPLICITLY
905+
? QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION : maxSchemasPerConnection;
904906

905907
String wsAuthHeader = buildWebSocketAuthHeader();
906908

@@ -912,7 +914,8 @@ public Sender build() {
912914
actualAutoFlushBytes,
913915
actualAutoFlushIntervalNanos,
914916
actualInFlightWindowSize,
915-
wsAuthHeader
917+
wsAuthHeader,
918+
actualMaxSchemasPerConnection
916919
);
917920
}
918921

@@ -1093,7 +1096,7 @@ public LineSenderBuilder httpPath(String path) {
10931096
*
10941097
* <b>Example:</b> If the server configures {@code http.context.settings=/custom/settings},
10951098
* call {@code httpSettingPath("/custom/settings")}.
1096-
*
1099+
* <p>
10971100
* This is only used when communicating over HTTP transport.
10981101
*
10991102
* @param path The HTTP path to query for server protocol settings. Must:
@@ -1345,6 +1348,25 @@ public LineSenderBuilder maxNameLength(int maxNameLength) {
13451348
return this;
13461349
}
13471350

1351+
/**
1352+
* Sets the maximum number of distinct schemas the WebSocket sender may assign on one connection.
1353+
*/
1354+
public LineSenderBuilder maxSchemasPerConnection(int maxSchemasPerConnection) {
1355+
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
1356+
throw new LineSenderException("max schemas per connection is only supported for WebSocket transport");
1357+
}
1358+
if (this.maxSchemasPerConnection != PARAMETER_NOT_SET_EXPLICITLY) {
1359+
throw new LineSenderException("max schemas per connection was already configured")
1360+
.put("[maxSchemasPerConnection=").put(this.maxSchemasPerConnection).put("]");
1361+
}
1362+
if (maxSchemasPerConnection < 1) {
1363+
throw new LineSenderException("max schemas per connection must be positive")
1364+
.put("[maxSchemasPerConnection=").put(maxSchemasPerConnection).put("]");
1365+
}
1366+
this.maxSchemasPerConnection = maxSchemasPerConnection;
1367+
return this;
1368+
}
1369+
13481370
/**
13491371
* Minimum expected throughput in bytes per second for HTTP requests.
13501372
* <br>
@@ -1829,6 +1851,13 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
18291851
pos = getValue(configurationString, pos, sink, "in_flight_window");
18301852
int windowSize = parseIntValue(sink, "in_flight_window");
18311853
inFlightWindowSize(windowSize);
1854+
} else if (Chars.equals("max_schemas_per_connection", sink)) {
1855+
if (protocol != PROTOCOL_WEBSOCKET) {
1856+
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
1857+
}
1858+
pos = getValue(configurationString, pos, sink, "max_schemas_per_connection");
1859+
int maxSchemas = parseIntValue(sink, "max_schemas_per_connection");
1860+
maxSchemasPerConnection(maxSchemas);
18321861
} else if (Chars.equals("max_datagram_size", sink)) {
18331862
pos = getValue(configurationString, pos, sink, "max_datagram_size");
18341863
int mds = parseIntValue(sink, "max_datagram_size");

0 commit comments

Comments
 (0)