Skip to content

Commit 52a5c16

Browse files
committed
hello darkness (aka java logging APIs) my old friend...
1 parent c8e2fc2 commit 52a5c16

2 files changed

Lines changed: 58 additions & 20 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,9 @@ public void flush() {
622622
inFlightWindow.awaitEmpty();
623623
}
624624

625-
LOG.debug("Flush complete [totalBatches={}, totalBytes={}, totalAcked={}]", sendQueue.getTotalBatchesSent(), sendQueue.getTotalBytesSent(), inFlightWindow.getTotalAcked());
625+
if (LOG.isDebugEnabled()) {
626+
LOG.debug("Flush complete [totalBatches={}, totalBytes={}, totalAcked={}]", sendQueue.getTotalBatchesSent(), sendQueue.getTotalBytesSent(), inFlightWindow.getTotalAcked());
627+
}
626628
} else {
627629
// Sync mode (window=1): flush pending rows and wait for ACKs synchronously
628630
flushSync();
@@ -983,7 +985,9 @@ private void ensureActiveBufferReady() {
983985
// Buffer is in use (SEALED or SENDING) - wait for it
984986
// Use a while loop to handle spurious wakeups and race conditions with the latch
985987
while (activeBuffer.isInUse()) {
986-
LOG.debug("Waiting for active buffer [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(activeBuffer.getState()));
988+
if (LOG.isDebugEnabled()) {
989+
LOG.debug("Waiting for active buffer [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(activeBuffer.getState()));
990+
}
987991
boolean recycled = activeBuffer.awaitRecycled(30, TimeUnit.SECONDS);
988992
if (!recycled) {
989993
throw new LineSenderException("Timeout waiting for active buffer to be recycled");
@@ -1064,7 +1068,9 @@ private void flushPendingRows() {
10641068
cachedTimestampColumn = null;
10651069
cachedTimestampNanosColumn = null;
10661070

1067-
LOG.debug("Flushing pending rows [count={}, tables={}]", pendingRowCount, tableBuffers.size());
1071+
if (LOG.isDebugEnabled()) {
1072+
LOG.debug("Flushing pending rows [count={}, tables={}]", pendingRowCount, tableBuffers.size());
1073+
}
10681074

10691075
// Ensure activeBuffer is ready for writing
10701076
// It might be in RECYCLED state if previous batch was sent but we didn't swap yet
@@ -1090,7 +1096,9 @@ private void flushPendingRows() {
10901096
long schemaKey = schemaHash ^ ((long) tableBuffer.getTableName().hashCode() << 32);
10911097
boolean useSchemaRef = sentSchemaHashes.contains(schemaKey);
10921098

1093-
LOG.debug("Encoding table [name={}, rows={}, maxSentSymbolId={}, batchMaxId={}, useSchemaRef={}]", tableName, rowCount, maxSentSymbolId, currentBatchMaxSymbolId, useSchemaRef);
1099+
if (LOG.isDebugEnabled()) {
1100+
LOG.debug("Encoding table [name={}, rows={}, maxSentSymbolId={}, batchMaxId={}, useSchemaRef={}]", tableName, rowCount, maxSentSymbolId, currentBatchMaxSymbolId, useSchemaRef);
1101+
}
10941102

10951103
// Encode this table's rows with delta symbol dictionary
10961104
int messageSize = encoder.encodeWithDeltaDict(
@@ -1147,7 +1155,9 @@ private void flushSync() {
11471155
cachedTimestampColumn = null;
11481156
cachedTimestampNanosColumn = null;
11491157

1150-
LOG.debug("Sync flush [pendingRows={}, tables={}]", pendingRowCount, tableBuffers.size());
1158+
if (LOG.isDebugEnabled()) {
1159+
LOG.debug("Sync flush [pendingRows={}, tables={}]", pendingRowCount, tableBuffers.size());
1160+
}
11511161

11521162
// Encode all table buffers that have data into a single message
11531163
ObjList<CharSequence> keys = tableBuffers.keys();
@@ -1183,7 +1193,9 @@ private void flushSync() {
11831193
long batchSequence = nextBatchSequence++;
11841194
inFlightWindow.addInFlight(batchSequence);
11851195

1186-
LOG.debug("Sending sync batch [seq={}, bytes={}, rows={}, maxSentSymbolId={}, useSchemaRef={}]", batchSequence, messageSize, tableBuffer.getRowCount(), currentBatchMaxSymbolId, useSchemaRef);
1196+
if (LOG.isDebugEnabled()) {
1197+
LOG.debug("Sending sync batch [seq={}, bytes={}, rows={}, maxSentSymbolId={}, useSchemaRef={}]", batchSequence, messageSize, tableBuffer.getRowCount(), currentBatchMaxSymbolId, useSchemaRef);
1198+
}
11871199

11881200
// Send over WebSocket and fail the in-flight entry if send throws,
11891201
// so close() does not hang waiting for an ACK that will never arrive.
@@ -1223,7 +1235,9 @@ private void flushSync() {
12231235
pendingRowCount = 0;
12241236
firstPendingRowTimeNanos = 0;
12251237

1226-
LOG.debug("Sync flush complete [totalAcked={}]", inFlightWindow.getTotalAcked());
1238+
if (LOG.isDebugEnabled()) {
1239+
LOG.debug("Sync flush complete [totalAcked={}]", inFlightWindow.getTotalAcked());
1240+
}
12271241
}
12281242

12291243
private long getPendingBytes() {
@@ -1242,25 +1256,33 @@ private void sealAndSwapBuffer() {
12421256
MicrobatchBuffer toSend = activeBuffer;
12431257
toSend.seal();
12441258

1245-
LOG.debug("Sealing buffer [id={}, rows={}, bytes={}]", toSend.getBatchId(), toSend.getRowCount(), toSend.getBufferPos());
1259+
if (LOG.isDebugEnabled()) {
1260+
LOG.debug("Sealing buffer [id={}, rows={}, bytes={}]", toSend.getBatchId(), toSend.getRowCount(), toSend.getBufferPos());
1261+
}
12461262

12471263
// Swap to the other buffer
12481264
activeBuffer = (activeBuffer == buffer0) ? buffer1 : buffer0;
12491265

12501266
// If the other buffer is still being sent, wait for it
12511267
// Use a while loop to handle spurious wakeups and race conditions with the latch
12521268
while (activeBuffer.isInUse()) {
1253-
LOG.debug("Waiting for buffer recycle [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(activeBuffer.getState()));
1269+
if (LOG.isDebugEnabled()) {
1270+
LOG.debug("Waiting for buffer recycle [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(activeBuffer.getState()));
1271+
}
12541272
boolean recycled = activeBuffer.awaitRecycled(30, TimeUnit.SECONDS);
12551273
if (!recycled) {
12561274
throw new LineSenderException("Timeout waiting for buffer to be recycled");
12571275
}
1258-
LOG.debug("Buffer recycled [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(activeBuffer.getState()));
1276+
if (LOG.isDebugEnabled()) {
1277+
LOG.debug("Buffer recycled [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(activeBuffer.getState()));
1278+
}
12591279
}
12601280

12611281
// Reset the new active buffer
12621282
int stateBeforeReset = activeBuffer.getState();
1263-
LOG.debug("Resetting buffer [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(stateBeforeReset));
1283+
if (LOG.isDebugEnabled()) {
1284+
LOG.debug("Resetting buffer [id={}, state={}]", activeBuffer.getBatchId(), MicrobatchBuffer.stateName(stateBeforeReset));
1285+
}
12641286
activeBuffer.reset();
12651287

12661288
// Enqueue the sealed buffer for sending.

core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,9 @@ public boolean enqueue(MicrobatchBuffer buffer) {
265265
}
266266
}
267267
}
268-
LOG.debug("Enqueued batch [id={}, bytes={}, rows={}]", buffer.getBatchId(), buffer.getBufferPos(), buffer.getRowCount());
268+
if (LOG.isDebugEnabled()) {
269+
LOG.debug("Enqueued batch [id={}, bytes={}, rows={}]", buffer.getBatchId(), buffer.getBufferPos(), buffer.getRowCount());
270+
}
269271
return true;
270272
}
271273

@@ -548,23 +550,33 @@ private void sendBatch(MicrobatchBuffer batch) {
548550
int bytes = batch.getBufferPos();
549551
int rows = batch.getRowCount();
550552

551-
LOG.debug("Sending batch [seq={}, bytes={}, rows={}, bufferId={}]", batchSequence, bytes, rows, batch.getBatchId());
553+
if (LOG.isDebugEnabled()) {
554+
LOG.debug("Sending batch [seq={}, bytes={}, rows={}, bufferId={}]", batchSequence, bytes, rows, batch.getBatchId());
555+
}
552556

553557
// Add to in-flight window BEFORE sending (so we're ready for ACK)
554558
// Use non-blocking tryAddInFlight since we already checked window space in ioLoop
555559
if (inFlightWindow != null) {
556-
LOG.debug("Adding to in-flight window [seq={}, inFlight={}, max={}]", batchSequence, inFlightWindow.getInFlightCount(), inFlightWindow.getMaxWindowSize());
560+
if (LOG.isDebugEnabled()) {
561+
LOG.debug("Adding to in-flight window [seq={}, inFlight={}, max={}]", batchSequence, inFlightWindow.getInFlightCount(), inFlightWindow.getMaxWindowSize());
562+
}
557563
if (!inFlightWindow.tryAddInFlight(batchSequence)) {
558564
// Should not happen since we checked hasWindowSpace before polling
559565
throw new LineSenderException("In-flight window unexpectedly full");
560566
}
561-
LOG.debug("Added to in-flight window [seq={}]", batchSequence);
567+
if (LOG.isDebugEnabled()) {
568+
LOG.debug("Added to in-flight window [seq={}]", batchSequence);
569+
}
562570
}
563571

564572
// Send over WebSocket
565-
LOG.debug("Calling sendBinary [seq={}]", batchSequence);
573+
if (LOG.isDebugEnabled()) {
574+
LOG.debug("Calling sendBinary [seq={}]", batchSequence);
575+
}
566576
client.sendBinary(batch.getBufferPtr(), bytes);
567-
LOG.debug("sendBinary returned [seq={}]", batchSequence);
577+
if (LOG.isDebugEnabled()) {
578+
LOG.debug("sendBinary returned [seq={}]", batchSequence);
579+
}
568580

569581
// Update statistics
570582
totalBatchesSent.incrementAndGet();
@@ -573,7 +585,9 @@ private void sendBatch(MicrobatchBuffer batch) {
573585
// Transition state: SENDING -> RECYCLED
574586
batch.markRecycled();
575587

576-
LOG.debug("Batch sent and recycled [seq={}, bufferId={}]", batchSequence, batch.getBatchId());
588+
if (LOG.isDebugEnabled()) {
589+
LOG.debug("Batch sent and recycled [seq={}, bufferId={}]", batchSequence, batch.getBatchId());
590+
}
577591
}
578592

579593
/**
@@ -639,8 +653,10 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) {
639653
int acked = inFlightWindow.acknowledgeUpTo(sequence);
640654
if (acked > 0) {
641655
totalAcks.addAndGet(acked);
642-
LOG.debug("Cumulative ACK received [upTo={}, acked={}]", sequence, acked);
643-
} else {
656+
if (LOG.isDebugEnabled()) {
657+
LOG.debug("Cumulative ACK received [upTo={}, acked={}]", sequence, acked);
658+
}
659+
} else if (LOG.isDebugEnabled()) {
644660
LOG.debug("ACK for already-acknowledged sequences [upTo={}]", sequence);
645661
}
646662
}

0 commit comments

Comments
 (0)