Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion packages/utils/src/abstract-message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli

const continueSendingOnDrain = (): void => {
if (this.writableNeedsDrain) {
// a drain event can arrive after writeStatus transitions to
// 'closing'/'closed' during connection teardown - bail out to
// avoid calling send() on a non-writable stream which would
// throw an uncaught StreamStateError
if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') {
this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus)
return
}

this.log.trace('drain event received, continue sending data')
this.writableNeedsDrain = false

Expand Down Expand Up @@ -479,7 +488,24 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli

// sending data can cause buffers to fill up, events to be emitted and
// this method to be invoked again
const sendResult = this.sendData(toSend)
let sendResult: SendResult
try {
sendResult = this.sendData(toSend)
} catch (err: any) {
// the underlying transport may have closed between the drain event
// and this send attempt - treat as a failed send rather than letting
// the error propagate as an uncaught exception
if (err.name === 'StreamStateError') {
this.log('send failed during queue processing, stream is %s - %e', this.writeStatus, err)
// Requeue the defensive copy in case sendData mutated `toSend`
// before throwing.
this.writeBuffer.prepend(willSend)
return false
}

throw err
}

canSendMore = sendResult.canSendMore
sentBytes += sendResult.sentBytes

Expand Down
153 changes: 153 additions & 0 deletions packages/utils/test/stream-utils-test.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,157 @@ describe('stream-pair', () => {
await expect(outgoing.onDrain()).to.eventually.be.rejected
.with.property('name', 'StreamResetError')
})

it('should not throw uncaught error on drain event when stream is closed', async () => {
const [outgoing] = await streamPair({
capacity: 1,
delay: 100
})

// fill the write buffer to trigger backpressure
while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {
// drain the send buffer
}
expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false()

// set the stream to closed - drain events after this should be ignored
outgoing.writeStatus = 'closed'

// dispatching drain on a closed stream should not throw
expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

outgoing.abort(new Error('cleanup'))
})

it('should catch StreamStateError from sendData during closing', async () => {
const [outgoing] = await streamPair({
capacity: 1,
delay: 100
})
const outgoingStream = outgoing as typeof outgoing & {
sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean }
}

// fill the write buffer to trigger backpressure
while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {
// drain the send buffer
}
expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false()

// stub sendData to throw StreamStateError (simulates underlying
// transport closing while the outer stream is still flushing)
const err = new Error('Cannot write to a stream that is closing')
err.name = 'StreamStateError'
Sinon.stub(outgoingStream, 'sendData').throws(err)

outgoing.writeStatus = 'closing'

// the drain event should not throw - the error should be caught internally
expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

outgoing.abort(new Error('cleanup'))
})

it('should preserve queued bytes when sendData throws StreamStateError during drain', async () => {
const [outgoing] = await streamPair()
const outgoingStream = outgoing as typeof outgoing & {
sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean }
}

const payload = new Uint8ArrayList(
Uint8Array.from([0, 1, 2, 3]),
Uint8Array.from([4, 5, 6, 7])
)

outgoing.writableNeedsDrain = true
expect(outgoing.send(payload)).to.be.false()
expect(outgoing.writeBufferLength).to.equal(payload.byteLength)

const err = new Error('Cannot write to a stream that is closing')
err.name = 'StreamStateError'
const stub = Sinon.stub(outgoingStream, 'sendData').throws(err)

outgoing.writeStatus = 'closing'

expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

// drain handler defers processSendQueue via queueMicrotask - wait for it
await new Promise<void>((resolve) => { queueMicrotask(resolve) })

expect(stub.calledOnce).to.be.true()
expect(outgoing.writeBufferLength).to.equal(payload.byteLength)

stub.restore()
outgoing.abort(new Error('cleanup'))
})

it('should not duplicate queued bytes after transient StreamStateError during drain', async () => {
const [outgoing] = await streamPair()
const outgoingStream = outgoing as typeof outgoing & {
sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean }
}

const payload = new Uint8ArrayList(
Uint8Array.from([0, 1, 2, 3]),
Uint8Array.from([4, 5, 6, 7]),
Uint8Array.from([8, 9, 10, 11])
)

outgoing.writableNeedsDrain = true
expect(outgoing.send(payload)).to.be.false()
expect(outgoing.writeBufferLength).to.equal(payload.byteLength)

const streamStateError = new Error('Cannot write to a stream that is closing')
streamStateError.name = 'StreamStateError'

const sentPayloads: Uint8Array[] = []
let throwOnce = true

const stub = Sinon.stub(outgoingStream, 'sendData').callsFake((data: Uint8ArrayList) => {
if (throwOnce) {
throwOnce = false
throw streamStateError
}

sentPayloads.push(data.subarray())

return {
sentBytes: data.byteLength,
canSendMore: true
}
})

outgoing.writeStatus = 'closing'

expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

// drain handler defers processSendQueue via queueMicrotask - wait for it
await new Promise<void>((resolve) => { queueMicrotask(resolve) })

expect(outgoing.writeBufferLength).to.equal(payload.byteLength)

// simulate a later drain event after writableNeedsDrain is set again
outgoing.writableNeedsDrain = true
expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

await new Promise<void>((resolve) => { queueMicrotask(resolve) })

expect(stub.callCount).to.equal(2)
expect(outgoing.writeBufferLength).to.equal(0)
expect(sentPayloads).to.have.lengthOf(1)
expect(sentPayloads[0]).to.equalBytes(payload.subarray())

stub.restore()
outgoing.abort(new Error('cleanup'))
})
})
Loading