Skip to content

Commit ac7f447

Browse files
committed
Reclaim host stream/future transmits when the guest drops its end
When the guest drops its end of a stream/future while the host consumer/producer is still `HostReady`, the host end was never finalized, so the `TransmitState` and both handles leaked from the concurrent-state table (eventually trapping with "resource table has no free keys"). The `HostReady` arms of `host_drop_reader` and `host_drop_writer` were no-ops; both now `delete_transmit`. `host_drop_writer` only finalizes once the writer is actually `Dropped`. Adds two `component-async-tests` regression tests (one per path) that fail on `main` and pass here. Fixes #13514. Assisted-by: Claude Code
1 parent 9c49989 commit ac7f447

5 files changed

Lines changed: 194 additions & 3 deletions

File tree

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,123 @@ pub async fn async_closed_stream() -> Result<()> {
303303
.await?
304304
}
305305

306+
mod host_consumer_drop {
307+
wasmtime::component::bindgen!({
308+
path: "wit",
309+
world: "host-consumer-drop-guest",
310+
exports: { default: store | async },
311+
});
312+
}
313+
314+
// Regression test: a host *consumer* registered via `StreamReader::pipe` must be
315+
// finalized when the guest drops the writable end *after* the consumer is
316+
// attached. The guest hands the host the readable end, keeps the writable end,
317+
// writes one byte once the consumer reads, then drops the writer. That reaches
318+
// `host_drop_writer` with the read side in `ReadState::HostReady`, which must
319+
// reclaim the transmit rather than leaving it stranded.
320+
#[tokio::test]
321+
pub async fn async_host_consumer_drop() -> Result<()> {
322+
let engine = Engine::new(&config())?;
323+
324+
let component = make_component(
325+
&engine,
326+
&[test_programs_artifacts::ASYNC_HOST_CONSUMER_DROP_COMPONENT],
327+
)
328+
.await?;
329+
330+
let mut linker = Linker::new(&engine);
331+
332+
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
333+
334+
let mut store = Store::new(
335+
&engine,
336+
Ctx {
337+
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
338+
table: ResourceTable::default(),
339+
continue_: false,
340+
},
341+
);
342+
343+
let instance = linker.instantiate_async(&mut store, &component).await?;
344+
let guest = host_consumer_drop::HostConsumerDropGuest::new(&mut store, &instance)?;
345+
store
346+
.run_concurrent(async move |accessor| {
347+
let stream = guest
348+
.local_local_host_consumer_drop()
349+
.call_get(accessor)
350+
.await?;
351+
352+
let (tx, mut rx) = mpsc::channel(1);
353+
accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)))?;
354+
assert_eq!(rx.next().await, Some(42));
355+
assert!(rx.next().await.is_none());
356+
357+
wasmtime::error::Ok(())
358+
})
359+
.await??;
360+
361+
// The host consumer and both transmit handles must be gone now that the
362+
// guest dropped its end.
363+
store.assert_concurrent_state_empty();
364+
365+
Ok(())
366+
}
367+
368+
// Regression test: the symmetric host *producer* case. The host hands the guest
369+
// a `future` via `FutureReader::new` and the guest drops the read end without
370+
// reading it. That reaches `host_drop_reader` with the write side in
371+
// `WriteState::HostReady`, which must reclaim the transmit. The guest's
372+
// `read_future` reads `rx` but drops `rx_ignored`.
373+
#[tokio::test]
374+
pub async fn async_host_producer_drop() -> Result<()> {
375+
let engine = Engine::new(&config())?;
376+
377+
let component = make_component(
378+
&engine,
379+
&[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],
380+
)
381+
.await?;
382+
383+
let mut linker = Linker::new(&engine);
384+
385+
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
386+
387+
let mut store = Store::new(
388+
&engine,
389+
Ctx {
390+
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
391+
table: ResourceTable::default(),
392+
continue_: false,
393+
},
394+
);
395+
396+
let instance = linker.instantiate_async(&mut store, &component).await?;
397+
398+
let value = 42_u8;
399+
let (tx, rx) = oneshot::channel();
400+
let rx = FutureReader::new(&mut store, OneshotProducer::new(rx))?;
401+
let (_, rx_ignored) = oneshot::channel();
402+
let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored))?;
403+
404+
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
405+
406+
store
407+
.run_concurrent(async move |accessor| {
408+
_ = tx.send(value);
409+
closed_streams
410+
.local_local_closed()
411+
.call_read_future(accessor, rx, value, rx_ignored)
412+
.await
413+
})
414+
.await??;
415+
416+
// The host producer behind `rx_ignored` and both transmit handles must be
417+
// gone now that the guest dropped the read end without reading.
418+
store.assert_concurrent_state_empty();
419+
420+
Ok(())
421+
}
422+
306423
#[tokio::test]
307424
pub async fn async_cross_instance_source() -> Result<()> {
308425
let engine = Engine::new(&config())?;

crates/misc/component-async-tests/tests/test_all.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ use scenario::round_trip_many::{
3131
async_round_trip_many_synchronous, async_round_trip_many_wait,
3232
};
3333
use scenario::streams::{
34-
async_closed_stream, async_closed_streams, async_cross_instance_source, async_short_reads,
34+
async_closed_stream, async_closed_streams, async_cross_instance_source,
35+
async_host_consumer_drop, async_short_reads,
3536
};
3637
use scenario::transmit::{
3738
async_cancel_callee, async_cancel_caller, async_cancel_transmit, async_intertask_communication,

crates/misc/component-async-tests/wit/test.wit

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@ interface closed-stream {
187187
get: func() -> stream<u8>;
188188
}
189189

190+
interface host-consumer-drop {
191+
// Returns the readable end of a fresh stream while the guest keeps the
192+
// writable end. Once a consumer is attached the guest writes one byte and
193+
// then drops the writer, so the host consumer observes a clean close while
194+
// its read side is still `HostReady`.
195+
get: async func() -> stream<u8>;
196+
}
197+
190198
interface short-reads {
191199
resource thing {
192200
constructor(s: string);
@@ -369,6 +377,10 @@ world closed-stream-guest {
369377
export closed-stream;
370378
}
371379

380+
world host-consumer-drop-guest {
381+
export host-consumer-drop;
382+
}
383+
372384
world short-reads-guest {
373385
export short-reads;
374386
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
mod bindings {
2+
wit_bindgen::generate!({
3+
path: "../misc/component-async-tests/wit",
4+
world: "host-consumer-drop-guest",
5+
async: true,
6+
});
7+
8+
use super::Component;
9+
export!(Component);
10+
}
11+
12+
use {
13+
bindings::exports::local::local::host_consumer_drop::Guest, wit_bindgen::StreamReader,
14+
};
15+
16+
struct Component;
17+
18+
impl Guest for Component {
19+
async fn get() -> StreamReader<u8> {
20+
let (mut tx, rx) = bindings::wit_stream::new();
21+
// Keep the writable end and hand the readable end to the host. The host
22+
// attaches a consumer (read side -> `HostReady`); the write below blocks
23+
// until that consumer reads, after which we drop the writer. Dropping it
24+
// while the consumer is still `HostReady` is the path that used to leak.
25+
wit_bindgen::spawn(async move {
26+
assert!(tx.write_one(42).await.is_none());
27+
drop(tx);
28+
});
29+
rx
30+
}
31+
}
32+
33+
// Unused function; required since this file is built as a `bin`:
34+
fn main() {}

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2471,7 +2471,18 @@ impl StoreOpaque {
24712471
)?;
24722472
}
24732473

2474-
WriteState::HostReady { .. } => {}
2474+
WriteState::HostReady { .. } => {
2475+
// A host producer (e.g. one installed via `FutureReader::new`)
2476+
// is only driven when a reader pulls from it; it is never
2477+
// re-polled to observe the guest dropping the read end. The read
2478+
// end is already `Dropped` (set at the top of this function), so
2479+
// the produced value can never be consumed. Reclaim the transmit
2480+
// (state + both handles) here; otherwise it would leak for the
2481+
// lifetime of the instance. The producer is dropped along with
2482+
// the matched `HostReady` value.
2483+
log::trace!("host_drop_reader: finalize host producer, delete {transmit_id:?}");
2484+
state.delete_transmit(transmit_id)?;
2485+
}
24752486

24762487
WriteState::Open => {
24772488
state.update_event(
@@ -2570,7 +2581,23 @@ impl StoreOpaque {
25702581
)?;
25712582
}
25722583

2573-
ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2584+
ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
2585+
// A host consumer (e.g. one registered via `StreamReader::pipe`)
2586+
// is only driven on guest writes; it is never re-polled to
2587+
// observe the guest dropping the write end. Reclaim the transmit
2588+
// (state + both handles) so it does not leak. Unlike
2589+
// `host_drop_reader`, the write end is not forced to `Dropped`
2590+
// earlier in this function, so only finalize once the writer is
2591+
// actually gone -- otherwise we would discard a still-live host
2592+
// writer. The consumer is dropped along with the matched value.
2593+
if matches!(
2594+
self.concurrent_state_mut().get_mut(transmit_id)?.write,
2595+
WriteState::Dropped
2596+
) {
2597+
log::trace!("host_drop_writer: finalize host consumer, delete {transmit_id:?}");
2598+
self.concurrent_state_mut().delete_transmit(transmit_id)?;
2599+
}
2600+
}
25742601

25752602
// If the read state is open, then there are no registered readers of the stream/future
25762603
ReadState::Open => {

0 commit comments

Comments
 (0)