Skip to content

Commit 5d702d1

Browse files
authored
custom non blocking reader (#322)
The previous import workflow streaming approach used PipedReader, which would block the web socket when receive many chunks of a single message (like say a large import). Replaced PipedReader with a custom reader that stores all the received text chunks in a queue w/o blocking the web socket thread
1 parent 43e9780 commit 5d702d1

2 files changed

Lines changed: 154 additions & 29 deletions

File tree

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import java.io.IOException;
4545
import java.io.InputStream;
4646
import java.io.OutputStream;
47-
import java.io.PipedReader;
48-
import java.io.PipedWriter;
4947
import java.io.Reader;
5048
import java.net.InetAddress;
5149
import java.net.URI;
@@ -62,6 +60,7 @@
6260
import java.util.Objects;
6361
import java.util.concurrent.CompletableFuture;
6462
import java.util.concurrent.Executors;
63+
import java.util.concurrent.LinkedBlockingQueue;
6564
import java.util.concurrent.ScheduledExecutorService;
6665
import java.util.concurrent.ScheduledFuture;
6766
import java.util.concurrent.TimeUnit;
@@ -165,16 +164,12 @@ public void checkServerTrusted(X509Certificate[] certs, String authType) {}
165164

166165
private class WebSocketListener implements WebSocket.Listener {
167166
private final StringBuilder messageBuffer = new StringBuilder();
168-
private PipedWriter importPipeWriter = null;
167+
private ImportFrameQueue importFrameQueue = null;
169168

170169
private void closePipe() {
171-
if (importPipeWriter != null) {
172-
try {
173-
importPipeWriter.close();
174-
} catch (IOException e) {
175-
logger.debug("Error closing import pipe writer", e);
176-
}
177-
importPipeWriter = null;
170+
if (importFrameQueue != null) {
171+
importFrameQueue.finish();
172+
importFrameQueue = null;
178173
}
179174
}
180175

@@ -188,12 +183,14 @@ public void onOpen(WebSocket ws) {
188183

189184
@Override
190185
public CompletableFuture<?> onText(WebSocket ws, CharSequence data, boolean last) {
191-
// Streaming import path: pipe each frame directly to the worker thread
192-
if (importPipeWriter != null) {
186+
logger.debug("onText data size {} last {}", data.length(), last);
187+
188+
// Streaming import path: queue each frame for the worker thread (non-blocking)
189+
if (importFrameQueue != null) {
193190
try {
194-
importPipeWriter.append(data);
195-
} catch (IOException e) {
196-
logger.error("Error writing frame to import pipe; streaming import aborted", e);
191+
importFrameQueue.addFrame(data);
192+
} catch (Exception e) {
193+
logger.error("Error queuing frame for import; streaming import aborted", e);
197194
closePipe();
198195
}
199196
if (last) {
@@ -205,21 +202,18 @@ public CompletableFuture<?> onText(WebSocket ws, CharSequence data, boolean last
205202

206203
// Detect import_workflow on the first frame of a new message
207204
if (messageBuffer.length() == 0 && isImportMessage(data)) {
208-
try {
209-
PipedReader pipeReader = new PipedReader(8 * 1024);
210-
importPipeWriter = new PipedWriter(pipeReader);
211-
importPipeWriter.append(data);
212-
if (last) {
213-
closePipe();
214-
}
215-
streamImportAsync(Conductor.this, ws, pipeReader);
216-
ws.request(1);
217-
return null;
218-
} catch (IOException e) {
219-
logger.error("Failed to start streaming import; falling back to buffered path", e);
205+
logger.debug("import message detected");
206+
207+
importFrameQueue = new ImportFrameQueue();
208+
importFrameQueue.addFrame(data);
209+
streamImportAsync(Conductor.this, ws, importFrameQueue);
210+
logger.debug("streamImportAsync started");
211+
if (last) {
220212
closePipe();
221-
// fall through to buffered path
222213
}
214+
215+
ws.request(1);
216+
return null;
223217
}
224218

225219
messageBuffer.append(data);
@@ -343,6 +337,62 @@ public void onError(WebSocket ws, Throwable error) {
343337
}
344338
}
345339

340+
/**
341+
* A Reader backed by an unbounded queue of CharSequence frames. The producer side (onText) calls
342+
* addFrame/finish which never block. The consumer side (streamImportAsync) reads data normally,
343+
* blocking only when waiting for more frames to arrive. This avoids blocking the WebSocket I/O
344+
* thread, which would prevent the TCP receive buffer from draining and cause the conductor's pong
345+
* writes to time out.
346+
*/
347+
private static class ImportFrameQueue extends Reader {
348+
private final LinkedBlockingQueue<CharSequence> frames = new LinkedBlockingQueue<>();
349+
private final AtomicBoolean done = new AtomicBoolean(false);
350+
private CharSequence current = null;
351+
private int pos = 0;
352+
353+
void addFrame(CharSequence data) {
354+
frames.add(data);
355+
}
356+
357+
void finish() {
358+
if (done.compareAndSet(false, true)) {
359+
frames.add(""); // wake up a blocked reader
360+
}
361+
}
362+
363+
@Override
364+
public int read(char[] cbuf, int off, int len) throws IOException {
365+
while (true) {
366+
if (current != null && pos < current.length()) {
367+
int n = Math.min(len, current.length() - pos);
368+
for (int i = 0; i < n; i++) {
369+
cbuf[off + i] = current.charAt(pos + i);
370+
}
371+
pos += n;
372+
return n;
373+
}
374+
try {
375+
CharSequence next = frames.poll(100, TimeUnit.MILLISECONDS);
376+
if (next == null) {
377+
if (done.get() && frames.isEmpty()) return -1;
378+
continue;
379+
}
380+
if (next.length() == 0 && done.get()) return -1;
381+
current = next;
382+
pos = 0;
383+
} catch (InterruptedException e) {
384+
Thread.currentThread().interrupt();
385+
throw new IOException("Interrupted reading import stream", e);
386+
}
387+
}
388+
}
389+
390+
@Override
391+
public void close() {
392+
finish();
393+
}
394+
}
395+
346396
private static void writeFragmentedResponse(WebSocket ws, BaseResponse response)
347397
throws Exception {
348398
int fragmentSize = 128 * 1024; // 128k
@@ -372,7 +422,7 @@ private static class FragmentingOutputStream extends OutputStream {
372422
this.ws = ws;
373423
this.fragmentSize = fragmentSize;
374424
this.buffer = new byte[fragmentSize];
375-
logger.debug("Created JdkFragmentingOutputStream with fragment size: {}", fragmentSize);
425+
logger.debug("Created FragmentingOutputStream with fragment size: {}", fragmentSize);
376426
}
377427

378428
@Override

transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.mockito.ArgumentMatchers.anyLong;
1212
import static org.mockito.ArgumentMatchers.anyString;
1313
import static org.mockito.ArgumentMatchers.eq;
14+
import static org.mockito.Mockito.doAnswer;
1415
import static org.mockito.Mockito.doThrow;
1516
import static org.mockito.Mockito.mock;
1617
import static org.mockito.Mockito.never;
@@ -1565,6 +1566,80 @@ public void canImportLargePayload() throws Exception {
15651566
}
15661567
}
15671568

1569+
// Regression test for: when a large import is being processed, the WebSocket I/O thread must
1570+
// remain free to deliver pong frames. The old PipedWriter implementation blocked onText() when
1571+
// the 8KB pipe buffer filled up, stalling the I/O thread and preventing pong delivery, which
1572+
// caused the server's pong write to time out and the connection to be reset.
1573+
//
1574+
// This test verifies the connection stays alive during a long-running import by requiring a
1575+
// ping/pong cycle to complete while importWorkflow() is blocked. If the I/O thread were stuck,
1576+
// the pong would not be delivered, the ping would time out, the connection would reset, and
1577+
// messageLatch would never fire.
1578+
@RetryingTest(3)
1579+
public void pingsSucceedDuringLargeImport() throws Exception {
1580+
class Listener extends MessageListener {
1581+
final CountDownLatch pingLatch = new CountDownLatch(1);
1582+
volatile boolean connectionReset = false;
1583+
1584+
@Override
1585+
public void onPing(WebSocket conn, Framedata frame) {
1586+
super.onPing(conn, frame); // sends pong back to the conductor
1587+
pingLatch.countDown();
1588+
}
1589+
1590+
@Override
1591+
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
1592+
connectionReset = true;
1593+
}
1594+
}
1595+
1596+
// Block importWorkflow until the test confirms a ping was received (and pong sent back).
1597+
// At least one full ping/pong cycle must complete before the import finishes.
1598+
CountDownLatch importMayProceed = new CountDownLatch(1);
1599+
doAnswer(
1600+
invocation -> {
1601+
assertTrue(importMayProceed.await(10, TimeUnit.SECONDS), "import was not released");
1602+
return null;
1603+
})
1604+
.when(mockDB)
1605+
.importWorkflow(any());
1606+
1607+
var workflows = createLargeTestExportedWorkflows();
1608+
var serialized = Conductor.serializeExportedWorkflows(workflows);
1609+
1610+
Listener listener = new Listener();
1611+
testServer.setListener(listener);
1612+
1613+
// Ping fires frequently; timeout is long enough for normal test overhead but short enough
1614+
// that a missed pong would reset the connection before importMayProceed is released.
1615+
builder.pingPeriodMs(300).pingTimeoutMs(2000);
1616+
1617+
try (Conductor conductor = builder.build()) {
1618+
conductor.start();
1619+
assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out");
1620+
1621+
listener.send(
1622+
MessageType.IMPORT_WORKFLOW,
1623+
"ping-during-import",
1624+
Map.of("serialized_workflow", serialized));
1625+
1626+
// Wait for a ping to arrive at the server (and pong to be sent) while import is blocked.
1627+
assertTrue(listener.pingLatch.await(5, TimeUnit.SECONDS), "no ping received during import");
1628+
assertFalse(listener.connectionReset, "connection was reset during import");
1629+
1630+
// Release importWorkflow and verify the import completes cleanly.
1631+
importMayProceed.countDown();
1632+
assertTrue(listener.messageLatch.await(15, TimeUnit.SECONDS), "import did not complete");
1633+
assertFalse(listener.connectionReset, "connection was reset after import");
1634+
1635+
JsonNode jsonNode = mapper.readTree(listener.message);
1636+
assertEquals("import_workflow", jsonNode.get("type").asText());
1637+
assertEquals("ping-during-import", jsonNode.get("request_id").asText());
1638+
assertNull(jsonNode.get("error_message"));
1639+
assertTrue(jsonNode.get("success").asBoolean());
1640+
}
1641+
}
1642+
15681643
@RetryingTest(3)
15691644
public void canExportLargePayload() throws Exception {
15701645
MessageListener listener = new MessageListener();

0 commit comments

Comments
 (0)