3030import io .questdb .client .cutlass .line .LineSenderException ;
3131import io .questdb .client .cutlass .qwp .client .InFlightWindow ;
3232import io .questdb .client .cutlass .qwp .client .MicrobatchBuffer ;
33+ import io .questdb .client .cutlass .qwp .client .WebSocketResponse ;
3334import io .questdb .client .cutlass .qwp .client .WebSocketSendQueue ;
3435import io .questdb .client .network .PlainSocketFactory ;
3536import io .questdb .client .std .MemoryTag ;
4041import java .util .concurrent .CountDownLatch ;
4142import java .util .concurrent .TimeUnit ;
4243import java .util .concurrent .atomic .AtomicBoolean ;
44+ import java .util .concurrent .atomic .AtomicInteger ;
45+ import java .util .concurrent .atomic .AtomicLong ;
4346import java .util .concurrent .atomic .AtomicReference ;
4447
4548import static org .junit .Assert .*;
@@ -266,6 +269,84 @@ public void testFlushFailsWhenServerClosesConnection() throws Exception {
266269 });
267270 }
268271
272+ @ Test
273+ public void testAwaitPendingAcksKeepsDrainNonBlocking () throws Exception {
274+ assertMemoryLeak (() -> {
275+ InFlightWindow window = new InFlightWindow (8 , 5_000 );
276+ FakeWebSocketClient client = new FakeWebSocketClient ();
277+ WebSocketSendQueue queue = null ;
278+ MicrobatchBuffer batch0 = sealedBuffer ((byte ) 1 );
279+ MicrobatchBuffer batch1 = sealedBuffer ((byte ) 2 );
280+ CountDownLatch secondBatchSent = new CountDownLatch (1 );
281+ AtomicBoolean deliverAcks = new AtomicBoolean (false );
282+ AtomicInteger tryReceivePolls = new AtomicInteger ();
283+ AtomicLong highestSent = new AtomicLong (-1 );
284+ AtomicReference <Throwable > errorRef = new AtomicReference <>();
285+
286+ try {
287+ client .setSendBehavior ((dataPtr , length ) -> {
288+ long sent = highestSent .incrementAndGet ();
289+ if (sent == 1 ) {
290+ secondBatchSent .countDown ();
291+ }
292+ });
293+ client .setReceiveBehavior ((handler , timeout ) -> {
294+ throw new AssertionError ("receiveFrame() must not be used while draining ACKs" );
295+ });
296+ client .setTryReceiveBehavior (handler -> {
297+ tryReceivePolls .incrementAndGet ();
298+ if (deliverAcks .get ()) {
299+ long sent = highestSent .get ();
300+ if (sent >= 0 && window .getInFlightCount () > 0 ) {
301+ emitAck (handler , sent );
302+ return true ;
303+ }
304+ }
305+ return false ;
306+ });
307+
308+ queue = new WebSocketSendQueue (client , window , 1_000 , 500 );
309+ queue .enqueue (batch0 );
310+ queue .flush ();
311+
312+ CountDownLatch finished = new CountDownLatch (1 );
313+ WebSocketSendQueue finalQueue = queue ;
314+ Thread waiter = new Thread (() -> {
315+ try {
316+ finalQueue .awaitPendingAcks ();
317+ } catch (Throwable t ) {
318+ errorRef .set (t );
319+ } finally {
320+ finished .countDown ();
321+ }
322+ });
323+ waiter .start ();
324+
325+ long deadline = System .nanoTime () + TimeUnit .SECONDS .toNanos (2 );
326+ while (tryReceivePolls .get () == 0 && System .nanoTime () < deadline ) {
327+ Thread .onSpinWait ();
328+ }
329+ assertTrue ("Expected non-blocking ACK polls while draining" , tryReceivePolls .get () > 0 );
330+
331+ queue .enqueue (batch1 );
332+ assertTrue ("I/O thread should still send new work while ACK drain is active" ,
333+ secondBatchSent .await (1 , TimeUnit .SECONDS ));
334+
335+ deliverAcks .set (true );
336+
337+ assertTrue ("awaitPendingAcks should complete once ACK arrives" ,
338+ finished .await (2 , TimeUnit .SECONDS ));
339+ assertNull (errorRef .get ());
340+ assertEquals (0 , window .getInFlightCount ());
341+ } finally {
342+ closeQuietly (queue );
343+ batch0 .close ();
344+ batch1 .close ();
345+ client .close ();
346+ }
347+ });
348+ }
349+
269350 private static void awaitThreadBlocked (Thread thread ) throws InterruptedException {
270351 long deadline = System .nanoTime () + TimeUnit .SECONDS .toNanos (5 );
271352 while (System .nanoTime () < deadline ) {
@@ -296,6 +377,18 @@ private static void emitBinary(WebSocketFrameHandler handler, byte[] payload) {
296377 }
297378 }
298379
380+ private static void emitAck (WebSocketFrameHandler handler , long sequence ) {
381+ WebSocketResponse response = WebSocketResponse .success (sequence );
382+ int size = response .serializedSize ();
383+ long ptr = Unsafe .malloc (size , MemoryTag .NATIVE_DEFAULT );
384+ try {
385+ response .writeTo (ptr );
386+ handler .onBinaryMessage (ptr , size );
387+ } finally {
388+ Unsafe .free (ptr , size , MemoryTag .NATIVE_DEFAULT );
389+ }
390+ }
391+
299392 private static MicrobatchBuffer sealedBuffer (byte value ) {
300393 MicrobatchBuffer buffer = new MicrobatchBuffer (64 );
301394 buffer .writeByte (value );
@@ -312,9 +405,14 @@ private interface TryReceiveBehavior {
312405 boolean tryReceive (WebSocketFrameHandler handler );
313406 }
314407
408+ private interface ReceiveBehavior {
409+ boolean receive (WebSocketFrameHandler handler , int timeout );
410+ }
411+
315412 private static class FakeWebSocketClient extends WebSocketClient {
316413 private volatile TryReceiveBehavior behavior = handler -> false ;
317414 private volatile boolean connected = true ;
415+ private volatile ReceiveBehavior receiveBehavior = (handler , timeout ) -> false ;
318416 private volatile SendBehavior sendBehavior = (dataPtr , length ) -> {
319417 };
320418
@@ -346,6 +444,15 @@ public void setTryReceiveBehavior(TryReceiveBehavior behavior) {
346444 this .behavior = behavior ;
347445 }
348446
447+ public void setReceiveBehavior (ReceiveBehavior receiveBehavior ) {
448+ this .receiveBehavior = receiveBehavior ;
449+ }
450+
451+ @ Override
452+ public boolean receiveFrame (WebSocketFrameHandler handler , int timeout ) {
453+ return receiveBehavior .receive (handler , timeout );
454+ }
455+
349456 @ Override
350457 public boolean tryReceiveFrame (WebSocketFrameHandler handler ) {
351458 return behavior .tryReceive (handler );
0 commit comments