Skip to content

Commit 8afd5f6

Browse files
committed
dead code removed
1 parent 9b49eed commit 8afd5f6

3 files changed

Lines changed: 2 additions & 126 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/MicrobatchBuffer.java

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,14 @@ public class MicrobatchBuffer implements QuietCloseable {
6161
public static final int STATE_SEALED = 1;
6262
public static final int STATE_SENDING = 2;
6363
private static final AtomicLong nextBatchId = new AtomicLong();
64-
private final long maxAgeNanos;
65-
private final int maxBytes;
6664
// Flush trigger thresholds
67-
private final int maxRows;
6865
// Batch identification
6966
private long batchId;
7067
private int bufferCapacity;
7168
private int bufferPos;
7269
// Native memory buffer
7370
private long bufferPtr;
7471
private long firstRowTimeNanos;
75-
// Symbol tracking for delta encoding
76-
private int maxSymbolId = -1;
7772
// For waiting on recycle (user thread waits for I/O thread to finish)
7873
private volatile Thread recycleWaiter;
7974
// Row tracking
@@ -98,9 +93,6 @@ public MicrobatchBuffer(int initialCapacity, int maxRows, int maxBytes, long max
9893
this.bufferPos = 0;
9994
this.rowCount = 0;
10095
this.firstRowTimeNanos = 0;
101-
this.maxRows = maxRows;
102-
this.maxBytes = maxBytes;
103-
this.maxAgeNanos = maxAgeNanos;
10496
this.batchId = nextBatchId.getAndIncrement();
10597
}
10698

@@ -288,24 +280,6 @@ public void incrementRowCount() {
288280
rowCount++;
289281
}
290282

291-
/**
292-
* Checks if the age limit has been exceeded.
293-
*/
294-
public boolean isAgeLimitExceeded() {
295-
if (maxAgeNanos <= 0 || rowCount == 0) {
296-
return false;
297-
}
298-
long ageNanos = System.nanoTime() - firstRowTimeNanos;
299-
return ageNanos >= maxAgeNanos;
300-
}
301-
302-
/**
303-
* Checks if the byte size limit has been exceeded.
304-
*/
305-
public boolean isByteLimitExceeded() {
306-
return maxBytes > 0 && bufferPos >= maxBytes;
307-
}
308-
309283
/**
310284
* Returns true if the buffer is in FILLING state (available for writing).
311285
*/
@@ -328,13 +302,6 @@ public boolean isRecycled() {
328302
return state == STATE_RECYCLED;
329303
}
330304

331-
/**
332-
* Checks if the row count limit has been exceeded.
333-
*/
334-
public boolean isRowLimitExceeded() {
335-
return maxRows > 0 && rowCount >= maxRows;
336-
}
337-
338305
/**
339306
* Returns true if the buffer is in SEALED state (ready to send).
340307
*/
@@ -394,7 +361,6 @@ public void reset() {
394361
bufferPos = 0;
395362
rowCount = 0;
396363
firstRowTimeNanos = 0;
397-
maxSymbolId = -1;
398364
batchId = nextBatchId.getAndIncrement();
399365
recycleWaiter = null;
400366
state = STATE_FILLING;
@@ -445,26 +411,6 @@ public void setBufferPos(int pos) {
445411
this.bufferPos = pos;
446412
}
447413

448-
/**
449-
* Sets the maximum symbol ID used in this batch.
450-
* Used for delta symbol dictionary tracking.
451-
*/
452-
public void setMaxSymbolId(int maxSymbolId) {
453-
this.maxSymbolId = maxSymbolId;
454-
}
455-
456-
/**
457-
* Checks if the buffer should be flushed based on configured thresholds.
458-
*
459-
* @return true if any flush threshold is exceeded
460-
*/
461-
public boolean shouldFlush() {
462-
if (!hasData()) {
463-
return false;
464-
}
465-
return isRowLimitExceeded() || isByteLimitExceeded() || isAgeLimitExceeded();
466-
}
467-
468414
@Override
469415
public String toString() {
470416
return "MicrobatchBuffer{" +

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,6 @@ private void flushPendingRows() {
11161116
activeBuffer.ensureCapacity(messageSize);
11171117
activeBuffer.write(buffer.getBufferPtr(), messageSize);
11181118
activeBuffer.incrementRowCount();
1119-
activeBuffer.setMaxSymbolId(currentBatchMaxSymbolId);
11201119

11211120
// Seal and enqueue for sending
11221121
sealAndSwapBuffer();

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/MicrobatchBufferTest.java

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import io.questdb.client.cutlass.qwp.client.MicrobatchBuffer;
2828
import io.questdb.client.std.MemoryTag;
2929
import io.questdb.client.std.Unsafe;
30+
3031
import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
32+
3133
import org.junit.Assert;
3234
import org.junit.Test;
3335

@@ -578,77 +580,6 @@ public void testSetBufferPosOutOfBounds() throws Exception {
578580
});
579581
}
580582

581-
@Test
582-
public void testShouldFlushAgeLimit() throws Exception {
583-
assertMemoryLeak(() -> {
584-
// 50ms timeout
585-
try (MicrobatchBuffer buffer = new MicrobatchBuffer(1024, 0, 0, 50_000_000L)) {
586-
buffer.writeByte((byte) 1);
587-
buffer.incrementRowCount();
588-
Assert.assertFalse(buffer.shouldFlush());
589-
590-
Thread.sleep(60);
591-
592-
Assert.assertTrue(buffer.shouldFlush());
593-
Assert.assertTrue(buffer.isAgeLimitExceeded());
594-
}
595-
});
596-
}
597-
598-
@Test
599-
public void testShouldFlushByteLimit() throws Exception {
600-
assertMemoryLeak(() -> {
601-
try (MicrobatchBuffer buffer = new MicrobatchBuffer(1024, 0, 10, 0)) {
602-
for (int i = 0; i < 9; i++) {
603-
buffer.writeByte((byte) i);
604-
buffer.incrementRowCount();
605-
Assert.assertFalse(buffer.shouldFlush());
606-
}
607-
buffer.writeByte((byte) 9);
608-
buffer.incrementRowCount();
609-
Assert.assertTrue(buffer.shouldFlush());
610-
Assert.assertTrue(buffer.isByteLimitExceeded());
611-
}
612-
});
613-
}
614-
615-
@Test
616-
public void testShouldFlushEmptyBuffer() throws Exception {
617-
assertMemoryLeak(() -> {
618-
try (MicrobatchBuffer buffer = new MicrobatchBuffer(1024, 1, 1, 1)) {
619-
Assert.assertFalse(buffer.shouldFlush()); // Empty buffer never flushes
620-
}
621-
});
622-
}
623-
624-
@Test
625-
public void testShouldFlushRowLimit() throws Exception {
626-
assertMemoryLeak(() -> {
627-
try (MicrobatchBuffer buffer = new MicrobatchBuffer(1024, 5, 0, 0)) {
628-
for (int i = 0; i < 4; i++) {
629-
buffer.writeByte((byte) i);
630-
buffer.incrementRowCount();
631-
Assert.assertFalse(buffer.shouldFlush());
632-
}
633-
buffer.writeByte((byte) 4);
634-
buffer.incrementRowCount();
635-
Assert.assertTrue(buffer.shouldFlush());
636-
Assert.assertTrue(buffer.isRowLimitExceeded());
637-
}
638-
});
639-
}
640-
641-
@Test
642-
public void testShouldFlushWithNoThresholds() throws Exception {
643-
assertMemoryLeak(() -> {
644-
try (MicrobatchBuffer buffer = new MicrobatchBuffer(1024)) {
645-
buffer.writeByte((byte) 1);
646-
buffer.incrementRowCount();
647-
Assert.assertFalse(buffer.shouldFlush()); // No thresholds set
648-
}
649-
});
650-
}
651-
652583
@Test
653584
public void testStateName() {
654585
Assert.assertEquals("FILLING", MicrobatchBuffer.stateName(MicrobatchBuffer.STATE_FILLING));

0 commit comments

Comments
 (0)