Skip to content

Commit 9b49eed

Browse files
committed
reduce allocation pressure
1 parent 52a5c16 commit 9b49eed

1 file changed

Lines changed: 44 additions & 12 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/MicrobatchBuffer.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import io.questdb.client.std.QuietCloseable;
2929
import io.questdb.client.std.Unsafe;
3030

31-
import java.util.concurrent.CountDownLatch;
3231
import java.util.concurrent.TimeUnit;
3332
import java.util.concurrent.atomic.AtomicLong;
33+
import java.util.concurrent.locks.LockSupport;
3434

3535
/**
3636
* A buffer for accumulating ILP data into microbatches before sending.
@@ -75,8 +75,7 @@ public class MicrobatchBuffer implements QuietCloseable {
7575
// Symbol tracking for delta encoding
7676
private int maxSymbolId = -1;
7777
// For waiting on recycle (user thread waits for I/O thread to finish)
78-
// CountDownLatch is not resettable, so we create a new instance on reset()
79-
private volatile CountDownLatch recycleLatch = new CountDownLatch(1);
78+
private volatile Thread recycleWaiter;
8079
// Row tracking
8180
private int rowCount;
8281
// State machine
@@ -137,10 +136,20 @@ public static String stateName(int state) {
137136
* Only the user thread should call this.
138137
*/
139138
public void awaitRecycled() {
139+
final Thread current = Thread.currentThread();
140+
recycleWaiter = current;
140141
try {
141-
recycleLatch.await();
142-
} catch (InterruptedException e) {
143-
Thread.currentThread().interrupt();
142+
while (state != STATE_RECYCLED) {
143+
LockSupport.park(this);
144+
if (Thread.interrupted()) {
145+
Thread.currentThread().interrupt();
146+
return;
147+
}
148+
}
149+
} finally {
150+
if (recycleWaiter == current) {
151+
recycleWaiter = null;
152+
}
144153
}
145154
}
146155

@@ -152,11 +161,31 @@ public void awaitRecycled() {
152161
* @return true if recycled, false if timeout elapsed
153162
*/
154163
public boolean awaitRecycled(long timeout, TimeUnit unit) {
164+
if (state == STATE_RECYCLED) {
165+
// fast-path
166+
return true;
167+
}
168+
169+
final Thread current = Thread.currentThread();
170+
recycleWaiter = current;
171+
final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
155172
try {
156-
return recycleLatch.await(timeout, unit);
157-
} catch (InterruptedException e) {
158-
Thread.currentThread().interrupt();
159-
return false;
173+
while (state != STATE_RECYCLED) {
174+
final long remaining = deadlineNanos - System.nanoTime();
175+
if (remaining <= 0) {
176+
return false;
177+
}
178+
LockSupport.parkNanos(this, remaining);
179+
if (Thread.interrupted()) {
180+
Thread.currentThread().interrupt();
181+
return false;
182+
}
183+
}
184+
return true;
185+
} finally {
186+
if (recycleWaiter == current) {
187+
recycleWaiter = null;
188+
}
160189
}
161190
}
162191

@@ -332,7 +361,10 @@ public void markRecycled() {
332361
throw new IllegalStateException("Cannot mark recycled in state " + stateName(state));
333362
}
334363
state = STATE_RECYCLED;
335-
recycleLatch.countDown();
364+
Thread w = recycleWaiter;
365+
if (w != null) {
366+
LockSupport.unpark(w);
367+
}
336368
}
337369

338370
/**
@@ -364,8 +396,8 @@ public void reset() {
364396
firstRowTimeNanos = 0;
365397
maxSymbolId = -1;
366398
batchId = nextBatchId.getAndIncrement();
399+
recycleWaiter = null;
367400
state = STATE_FILLING;
368-
recycleLatch = new CountDownLatch(1);
369401
}
370402

371403
/**

0 commit comments

Comments
 (0)