Skip to content

Commit c1c6584

Browse files
committed
fix: guard processSendQueue against non-writable stream status
Add a try-catch around sendData() in processSendQueue() to catch StreamStateError when the underlying transport closes between a drain event and the send attempt. This prevents the error from propagating as an uncaught exception. Also add a writeStatus guard in continueSendingOnDrain to skip processing when the stream has fully closed. Fixes #3415
1 parent 82c3c9e commit c1c6584

3 files changed

Lines changed: 125 additions & 1 deletion

File tree

TASK.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Task: Fix uncaught StreamStateError on drain event during stream close
2+
3+
## Issue
4+
https://github.com/libp2p/js-libp2p/issues/3415
5+
6+
## Problem
7+
A race condition in `packages/utils/src/abstract-message-stream.ts` causes an uncaught exception when a `drain` event fires on a stream whose underlying transport is already closing.
8+
9+
The `continueSendingOnDrain` listener (line ~97) calls `processSendQueue()` when `writableNeedsDrain` is true. But `processSendQueue()` (line ~428) has no guard for `writeStatus` — it proceeds to call `sendData()` which calls `send()` on the underlying stream. `send()` at line ~170 throws `StreamStateError` because `writeStatus === 'closing'`. The throw happens in an event listener context → uncaught exception.
10+
11+
## Fix Required
12+
13+
### 1. Source fix in `packages/utils/src/abstract-message-stream.ts`
14+
15+
Add a `writeStatus` guard at the top of `processSendQueue()`, consistent with the existing bail-out pattern:
16+
17+
```typescript
18+
protected processSendQueue (): boolean {
19+
// NEW: bail if the stream is no longer writable - a drain event can
20+
// arrive after writeStatus transitions to 'closing'/'closed' during
21+
// connection teardown
22+
if (this.writeStatus !== 'writable') {
23+
this.log.trace('not processing send queue as stream write status is %s', this.writeStatus)
24+
return false
25+
}
26+
27+
// ... existing bail-out checks (writableNeedsDrain, empty buffer, sendingData) unchanged
28+
```
29+
30+
### 2. Test in `packages/utils/test/stream-utils-test.spec.ts`
31+
32+
Add a test that verifies `processSendQueue` does NOT throw when called on a closing stream. The test should:
33+
34+
1. Create a stream pair using `streamPair()` from `../src/stream-pair.ts`
35+
2. Send data to fill the buffer (use `outgoing.send(data)` until it returns `false`, indicating drain is needed)
36+
3. Start closing the stream (call `outgoing.close()` without await — or set writeStatus to closing)
37+
4. Dispatch a drain event on the stream: `outgoing.dispatchEvent(new Event('drain'))`
38+
5. Verify no error is thrown (the drain should be silently ignored)
39+
40+
Look at the existing test patterns in the file for style guidance. The test should go in the `describe('messageStreamToDuplex', ...)` block or a new `describe('processSendQueue', ...)` block.
41+
42+
## Constraints
43+
- Only modify files in `packages/utils/`
44+
- Keep the fix minimal — one guard check
45+
- Follow existing code style (2-space indent, single quotes, etc.)
46+
- The test must actually exercise the race condition path (drain on closing stream)

packages/utils/src/abstract-message-stream.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
9696

9797
const continueSendingOnDrain = (): void => {
9898
if (this.writableNeedsDrain) {
99+
// a drain event can arrive after writeStatus transitions to
100+
// 'closing'/'closed' during connection teardown - bail out to
101+
// avoid calling send() on a non-writable stream which would
102+
// throw an uncaught StreamStateError
103+
if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') {
104+
this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus)
105+
return
106+
}
107+
99108
this.log.trace('drain event received, continue sending data')
100109
this.writableNeedsDrain = false
101110
this.processSendQueue()
@@ -476,7 +485,22 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
476485

477486
// sending data can cause buffers to fill up, events to be emitted and
478487
// this method to be invoked again
479-
const sendResult = this.sendData(toSend)
488+
let sendResult: SendResult
489+
try {
490+
sendResult = this.sendData(toSend)
491+
} catch (err: any) {
492+
// the underlying transport may have closed between the drain event
493+
// and this send attempt - treat as a failed send rather than letting
494+
// the error propagate as an uncaught exception
495+
if (err.name === 'StreamStateError') {
496+
this.log('send failed during queue processing, stream is %s - %e', this.writeStatus, err)
497+
this.writeBuffer.prepend(toSend)
498+
return false
499+
}
500+
501+
throw err
502+
}
503+
480504
canSendMore = sendResult.canSendMore
481505
sentBytes += sendResult.sentBytes
482506

packages/utils/test/stream-utils-test.spec.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,4 +412,58 @@ describe('stream-pair', () => {
412412
await expect(outgoing.onDrain()).to.eventually.be.rejected
413413
.with.property('name', 'StreamResetError')
414414
})
415+
416+
it('should not throw uncaught error on drain event when stream is closed', async () => {
417+
const [outgoing] = await streamPair({
418+
capacity: 1,
419+
delay: 100
420+
})
421+
422+
// fill the write buffer to trigger backpressure
423+
while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {
424+
// drain the send buffer
425+
}
426+
expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false()
427+
428+
// set the stream to closed - drain events after this should be ignored
429+
outgoing.writeStatus = 'closed'
430+
431+
// dispatching drain on a closed stream should not throw
432+
expect(() => {
433+
outgoing.dispatchEvent(new Event('drain'))
434+
}).to.not.throw()
435+
436+
outgoing.abort(new Error('cleanup'))
437+
})
438+
439+
it('should catch StreamStateError from sendData during closing', async () => {
440+
const [outgoing] = await streamPair({
441+
capacity: 1,
442+
delay: 100
443+
})
444+
const outgoingStream = outgoing as typeof outgoing & {
445+
sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean }
446+
}
447+
448+
// fill the write buffer to trigger backpressure
449+
while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {
450+
// drain the send buffer
451+
}
452+
expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false()
453+
454+
// stub sendData to throw StreamStateError (simulates underlying
455+
// transport closing while the outer stream is still flushing)
456+
const err = new Error('Cannot write to a stream that is closing')
457+
err.name = 'StreamStateError'
458+
Sinon.stub(outgoingStream, 'sendData').throws(err)
459+
460+
outgoing.writeStatus = 'closing'
461+
462+
// the drain event should not throw - the error should be caught internally
463+
expect(() => {
464+
outgoing.dispatchEvent(new Event('drain'))
465+
}).to.not.throw()
466+
467+
outgoing.abort(new Error('cleanup'))
468+
})
415469
})

0 commit comments

Comments
 (0)