diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index e4f1356564..1f16f3b298 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -96,6 +96,15 @@ export abstract class AbstractMessageStream { if (this.writableNeedsDrain) { + // a drain event can arrive after writeStatus transitions to + // 'closing'/'closed' during connection teardown - bail out to + // avoid calling send() on a non-writable stream which would + // throw an uncaught StreamStateError + if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') { + this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus) + return + } + this.log.trace('drain event received, continue sending data') this.writableNeedsDrain = false @@ -479,7 +488,24 @@ export abstract class AbstractMessageStream { await expect(outgoing.onDrain()).to.eventually.be.rejected .with.property('name', 'StreamResetError') }) + + it('should not throw uncaught error on drain event when stream is closed', async () => { + const [outgoing] = await streamPair({ + capacity: 1, + delay: 100 + }) + + // fill the write buffer to trigger backpressure + while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) { + // drain the send buffer + } + expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false() + + // set the stream to closed - drain events after this should be ignored + outgoing.writeStatus = 'closed' + + // dispatching drain on a closed stream should not throw + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + outgoing.abort(new Error('cleanup')) + }) + + it('should catch StreamStateError from sendData during closing', async () => { + const [outgoing] = await streamPair({ + capacity: 1, + delay: 100 + }) + const outgoingStream = outgoing as typeof outgoing & { + sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } + } + + // fill the write buffer to trigger backpressure + while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) { + // drain the send buffer + } + expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false() + + // stub sendData to throw StreamStateError (simulates underlying + // transport closing while the outer stream is still flushing) + const err = new Error('Cannot write to a stream that is closing') + err.name = 'StreamStateError' + Sinon.stub(outgoingStream, 'sendData').throws(err) + + outgoing.writeStatus = 'closing' + + // the drain event should not throw - the error should be caught internally + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + outgoing.abort(new Error('cleanup')) + }) + + it('should preserve queued bytes when sendData throws StreamStateError during drain', async () => { + const [outgoing] = await streamPair() + const outgoingStream = outgoing as typeof outgoing & { + sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } + } + + const payload = new Uint8ArrayList( + Uint8Array.from([0, 1, 2, 3]), + Uint8Array.from([4, 5, 6, 7]) + ) + + outgoing.writableNeedsDrain = true + expect(outgoing.send(payload)).to.be.false() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + const err = new Error('Cannot write to a stream that is closing') + err.name = 'StreamStateError' + const stub = Sinon.stub(outgoingStream, 'sendData').throws(err) + + outgoing.writeStatus = 'closing' + + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + // drain handler defers processSendQueue via queueMicrotask - wait for it + await new Promise((resolve) => { queueMicrotask(resolve) }) + + expect(stub.calledOnce).to.be.true() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + stub.restore() + outgoing.abort(new Error('cleanup')) + }) + + it('should not duplicate queued bytes after transient StreamStateError during drain', async () => { + const [outgoing] = await streamPair() + const outgoingStream = outgoing as typeof outgoing & { + sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } + } + + const payload = new Uint8ArrayList( + Uint8Array.from([0, 1, 2, 3]), + Uint8Array.from([4, 5, 6, 7]), + Uint8Array.from([8, 9, 10, 11]) + ) + + outgoing.writableNeedsDrain = true + expect(outgoing.send(payload)).to.be.false() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + const streamStateError = new Error('Cannot write to a stream that is closing') + streamStateError.name = 'StreamStateError' + + const sentPayloads: Uint8Array[] = [] + let throwOnce = true + + const stub = Sinon.stub(outgoingStream, 'sendData').callsFake((data: Uint8ArrayList) => { + if (throwOnce) { + throwOnce = false + throw streamStateError + } + + sentPayloads.push(data.subarray()) + + return { + sentBytes: data.byteLength, + canSendMore: true + } + }) + + outgoing.writeStatus = 'closing' + + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + // drain handler defers processSendQueue via queueMicrotask - wait for it + await new Promise((resolve) => { queueMicrotask(resolve) }) + + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + // simulate a later drain event after writableNeedsDrain is set again + outgoing.writableNeedsDrain = true + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + await new Promise((resolve) => { queueMicrotask(resolve) }) + + expect(stub.callCount).to.equal(2) + expect(outgoing.writeBufferLength).to.equal(0) + expect(sentPayloads).to.have.lengthOf(1) + expect(sentPayloads[0]).to.equalBytes(payload.subarray()) + + stub.restore() + outgoing.abort(new Error('cleanup')) + }) })