Skip to content

Commit 20bfcb3

Browse files
tabcatclaude
andauthored
fix(transport-webrtc): abort stream when datachannel send throws (#3469)
* test(transport-webrtc): reproduce datachannel send during state divergence Triggers the JS-vs-native state divergence in node-datachannel polyfill by closing the peer connection synchronously before send(), so the cached readyState is still 'open' while the native channel is closed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(transport-webrtc): abort stream when datachannel send throws channel.send can throw synchronously when the native libdatachannel state diverges from the polyfill's cached readyState. Wrap the call and abort the stream so the error surfaces as a transport failure instead of going uncaught. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(transport-webrtc): skip datachannel send race test in browsers The JS-vs-native state divergence is specific to the node-datachannel polyfill; native browser WebRTC does not reproduce it. Also add libdatachannel to the spell-check dictionary for the new fix comment. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * chore(transport-webrtc): shorten comments on send error fix Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7cea7b9 commit 20bfcb3

2 files changed

Lines changed: 74 additions & 4 deletions

File tree

packages/transport-webrtc/src/stream.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,15 @@ export class WebRTCStream extends AbstractStream {
139139

140140
this.log.trace('sending message, channel state "%s"', this.channel.readyState)
141141

142-
// send message without copying data
143-
for (const buf of data) {
144-
this.channel.send(buf)
142+
try {
143+
// send message without copying data
144+
for (const buf of data) {
145+
this.channel.send(buf)
146+
}
147+
} catch (err: any) {
148+
// channel.send can throw synchronously if the polyfill's cached readyState is stale
149+
this.log.error('error sending datachannel message - %e', err)
150+
this.abort(err)
145151
}
146152
}
147153

packages/transport-webrtc/test/stream.spec.ts

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ import * as lengthPrefixed from 'it-length-prefixed'
55
import { bytes } from 'multiformats'
66
import { pEvent } from 'p-event'
77
import { stubInterface } from 'sinon-ts'
8+
import { isNode, isElectronMain } from 'wherearewe'
89
import { MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from '../src/constants.js'
910
import { Message } from '../src/private-to-public/pb/message.js'
1011
import { createStream } from '../src/stream.js'
1112
import { RTCPeerConnection } from '../src/webrtc/index.js'
1213
import { receiveFinAck, receiveRemoteCloseWrite } from './util.ts'
1314
import type { WebRTCStream } from '../src/stream.js'
14-
import type { Stream } from '@libp2p/interface'
15+
import type { Stream, StreamCloseEvent } from '@libp2p/interface'
1516

1617
describe('Max message size', () => {
1718
it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => {
@@ -72,6 +73,69 @@ describe('Max message size', () => {
7273
})
7374
})
7475

76+
describe('Datachannel send errors', () => {
77+
let pcA: RTCPeerConnection
78+
let pcB: RTCPeerConnection
79+
80+
afterEach(() => {
81+
pcA?.close()
82+
pcB?.close()
83+
})
84+
85+
it('aborts the stream when underlying datachannel is closed mid-send', async function () {
86+
// polyfill-specific race; native browser WebRTC doesn't exhibit it
87+
if (!isNode && !isElectronMain) {
88+
return this.skip()
89+
}
90+
91+
// the node-datachannel polyfill's cached readyState updates on the next
92+
// tick after onClosed fires, so closing the peer leaves a window where
93+
// send() passes the guard and hits an already-closed native channel
94+
pcA = new RTCPeerConnection()
95+
pcB = new RTCPeerConnection()
96+
const channelA = pcA.createDataChannel('test', { negotiated: true, id: 91 })
97+
const channelB = pcB.createDataChannel('test', { negotiated: true, id: 91 })
98+
99+
pcA.onicecandidate = ({ candidate }) => {
100+
if (candidate != null) {
101+
pcB.addIceCandidate(candidate).catch(() => {})
102+
}
103+
}
104+
pcB.onicecandidate = ({ candidate }) => {
105+
if (candidate != null) {
106+
pcA.addIceCandidate(candidate).catch(() => {})
107+
}
108+
}
109+
110+
await pcA.setLocalDescription(await pcA.createOffer())
111+
await pcB.setRemoteDescription(pcA.localDescription as RTCSessionDescriptionInit)
112+
await pcB.setLocalDescription(await pcB.createAnswer())
113+
await pcA.setRemoteDescription(pcB.localDescription as RTCSessionDescriptionInit)
114+
115+
await Promise.all([
116+
pEvent(channelA, 'open', { rejectionEvents: ['close', 'error'] }),
117+
pEvent(channelB, 'open', { rejectionEvents: ['close', 'error'] })
118+
])
119+
120+
const webrtcStream = createStream({
121+
channel: channelA,
122+
direction: 'outbound',
123+
closeTimeout: 1,
124+
log: defaultLogger().forComponent('test')
125+
})
126+
127+
pcA.close()
128+
expect(channelA.readyState).to.equal('open')
129+
130+
const closeEventPromise = pEvent<'close', StreamCloseEvent>(webrtcStream, 'close')
131+
webrtcStream.send(new Uint8Array([1, 2, 3, 4]))
132+
const closeEvent = await closeEventPromise
133+
134+
expect(closeEvent.error).to.exist()
135+
expect(webrtcStream.status).to.equal('aborted')
136+
})
137+
})
138+
75139
const TEST_MESSAGE = 'test_message'
76140

77141
async function setup (): Promise<{ peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream }> {

0 commit comments

Comments
 (0)