Skip to content

Commit 34b2439

Browse files
mtopolnikclaude
andcommitted
Fix resource leaks in QwpWebSocketSender
Close the encoder's native memory when MicrobatchBuffer allocation fails in the constructor. Previously only buffer0 was cleaned up, leaving the NativeBufferWriter leaked. In ensureConnected(), close the WebSocket client if WebSocketSendQueue construction fails (e.g. Thread.start() throws OOM), preventing a dangling socket and I/O thread. In flushSync(), fail the in-flight window entry when sendBinary() throws, so close() does not hang on awaitEmpty() waiting for an ACK that will never arrive. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 61980ee commit 34b2439

1 file changed

Lines changed: 27 additions & 7 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,14 @@ private QwpWebSocketSender(
182182
// Initialize double-buffering if async mode (window > 1)
183183
if (inFlightWindowSize > 1) {
184184
int microbatchBufferSize = Math.max(DEFAULT_MICROBATCH_BUFFER_SIZE, autoFlushBytes * 2);
185-
this.buffer0 = new MicrobatchBuffer(microbatchBufferSize, autoFlushRows, autoFlushBytes, autoFlushIntervalNanos);
186185
try {
186+
this.buffer0 = new MicrobatchBuffer(microbatchBufferSize, autoFlushRows, autoFlushBytes, autoFlushIntervalNanos);
187187
this.buffer1 = new MicrobatchBuffer(microbatchBufferSize, autoFlushRows, autoFlushBytes, autoFlushIntervalNanos);
188188
} catch (Throwable t) {
189-
buffer0.close();
189+
if (buffer0 != null) {
190+
buffer0.close();
191+
}
192+
encoder.close();
190193
throw t;
191194
}
192195
this.activeBuffer = buffer0;
@@ -980,9 +983,16 @@ private void ensureConnected() {
980983
// Initialize send queue for async mode (window > 1)
981984
// The send queue handles both sending AND receiving (single I/O thread)
982985
if (inFlightWindowSize > 1) {
983-
sendQueue = new WebSocketSendQueue(client, inFlightWindow,
984-
WebSocketSendQueue.DEFAULT_ENQUEUE_TIMEOUT_MS,
985-
WebSocketSendQueue.DEFAULT_SHUTDOWN_TIMEOUT_MS);
986+
try {
987+
sendQueue = new WebSocketSendQueue(client, inFlightWindow,
988+
WebSocketSendQueue.DEFAULT_ENQUEUE_TIMEOUT_MS,
989+
WebSocketSendQueue.DEFAULT_SHUTDOWN_TIMEOUT_MS);
990+
} catch (Throwable t) {
991+
inFlightWindow = null;
992+
client.close();
993+
client = null;
994+
throw new LineSenderException("Failed to start I/O thread for " + host + ":" + port, t);
995+
}
986996
}
987997
// Sync mode (window=1): no send queue - we send and read ACKs synchronously
988998

@@ -1134,8 +1144,18 @@ private void flushSync() {
11341144

11351145
LOG.debug("Sending sync batch [seq={}, bytes={}, rows={}, maxSentSymbolId={}, useSchemaRef={}]", batchSequence, messageSize, tableBuffer.getRowCount(), currentBatchMaxSymbolId, useSchemaRef);
11361146

1137-
// Send over WebSocket
1138-
client.sendBinary(buffer.getBufferPtr(), messageSize);
1147+
// Send over WebSocket and fail the in-flight entry if send throws,
1148+
// so close() does not hang waiting for an ACK that will never arrive.
1149+
try {
1150+
client.sendBinary(buffer.getBufferPtr(), messageSize);
1151+
} catch (LineSenderException e) {
1152+
failExpectedIfNeeded(batchSequence, e);
1153+
throw e;
1154+
} catch (Throwable t) {
1155+
LineSenderException error = new LineSenderException("Failed to send batch " + batchSequence, t);
1156+
failExpectedIfNeeded(batchSequence, error);
1157+
throw error;
1158+
}
11391159

11401160
// Wait for ACK synchronously
11411161
waitForAck(batchSequence);

0 commit comments

Comments
 (0)