Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit be2c10c

Browse files
authored
Merge pull request #237 from alexcrichton/stream-writer-mut-self
Migrate `StreamWriter` methods to `&mut self`
2 parents 0dab298 + 0e21f28 commit be2c10c

File tree

8 files changed

+151
-116
lines changed

8 files changed

+151
-116
lines changed

crates/misc/component-async-tests/src/resource_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ impl bindings::local::local::resource_stream::HostConcurrent for Ctx {
4949

5050
impl<T> AccessorTask<T, Ctx, Result<()>> for Task {
5151
async fn run(self, accessor: &Accessor<T, Ctx>) -> Result<()> {
52-
let mut tx = Some(self.tx);
52+
let mut tx = self.tx;
5353
for _ in 0..self.count {
5454
let item =
5555
accessor.with(|mut view| view.get().table().push(ResourceStreamX))?;
56-
tx = tx.take().unwrap().write_all(accessor, Some(item)).await.0;
56+
tx.write_all(accessor, Some(item)).await;
5757
}
5858
Ok(())
5959
}

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,12 @@ pub async fn async_watch_streams() -> Result<()> {
130130
.is_pending()
131131
);
132132
futures.push(
133-
tx.write_all(store, Some(42))
134-
.map(|(w, _)| Event::Write(w))
135-
.boxed(),
133+
async move {
134+
tx.write_all(store, Some(42)).await;
135+
let w = if tx.is_closed() { None } else { Some(tx) };
136+
Event::Write(w)
137+
}
138+
.boxed(),
136139
);
137140
futures.push(rx.read(store, None).map(|(r, b)| Event::Read(r, b)).boxed());
138141
let mut rx = None;
@@ -152,7 +155,8 @@ pub async fn async_watch_streams() -> Result<()> {
152155

153156
let mut tx = tx.take().unwrap();
154157
tx.watch_reader(store).await;
155-
assert!(tx.write_all(store, Some(42)).await.0.is_none());
158+
tx.write_all(store, Some(42)).await;
159+
assert!(tx.is_closed());
156160
Ok(())
157161
})
158162
.await??;
@@ -216,14 +220,17 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
216220
// First, test stream host->host
217221
instance
218222
.run_with(&mut store, async |store| -> wasmtime::Result<_> {
219-
let (tx, rx) = store.with(|mut s| instance.stream(&mut s))?;
223+
let (mut tx, rx) = store.with(|mut s| instance.stream(&mut s))?;
220224

221225
let mut futures = FuturesUnordered::new();
222-
futures.push(
223-
tx.write_all(store, values.clone().into())
224-
.map(|(w, _)| StreamEvent::FirstWrite(w))
225-
.boxed(),
226-
);
226+
futures.push({
227+
let values = values.clone();
228+
async move {
229+
tx.write_all(store, values.into()).await;
230+
StreamEvent::FirstWrite(if tx.is_closed() { None } else { Some(tx) })
231+
}
232+
.boxed()
233+
});
227234
futures.push(
228235
rx.read(store, Vec::with_capacity(3))
229236
.map(|(r, b)| StreamEvent::FirstRead(r, b))
@@ -244,11 +251,18 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
244251
.boxed(),
245252
);
246253
} else {
247-
futures.push(
248-
tx.write_all(store, values.clone().into())
249-
.map(|(w, _)| StreamEvent::SecondWrite(w))
250-
.boxed(),
251-
);
254+
futures.push({
255+
let values = values.clone();
256+
async move {
257+
tx.write_all(store, values.into()).await;
258+
StreamEvent::SecondWrite(if tx.is_closed() {
259+
None
260+
} else {
261+
Some(tx)
262+
})
263+
}
264+
.boxed()
265+
});
252266
}
253267
}
254268
StreamEvent::FirstWrite(None) => {
@@ -324,7 +338,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
324338

325339
// Next, test stream host->guest
326340
{
327-
let (tx, rx) = instance.stream::<_, _, Vec<_>>(&mut store)?;
341+
let (mut tx, rx) = instance.stream::<_, _, Vec<_>>(&mut store)?;
328342

329343
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
330344

@@ -338,11 +352,15 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
338352
.map(|v| v.map(|()| StreamEvent::GuestCompleted))
339353
.boxed(),
340354
);
341-
futures.push(
342-
tx.write_all(accessor, values.clone().into())
343-
.map(|(w, _)| Ok(StreamEvent::FirstWrite(w)))
344-
.boxed(),
345-
);
355+
futures.push({
356+
let values = values.clone();
357+
async move {
358+
tx.write_all(accessor, values.into()).await;
359+
let w = if tx.is_closed() { None } else { Some(tx) };
360+
Ok(StreamEvent::FirstWrite(w))
361+
}
362+
.boxed()
363+
});
346364

347365
let mut count = 0;
348366
while let Some(event) = futures.try_next().await? {
@@ -358,11 +376,15 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
358376
.boxed(),
359377
);
360378
} else {
361-
futures.push(
362-
tx.write_all(accessor, values.clone().into())
363-
.map(|(w, _)| Ok(StreamEvent::SecondWrite(w)))
364-
.boxed(),
365-
);
379+
futures.push({
380+
let values = values.clone();
381+
async move {
382+
tx.write_all(accessor, values.into()).await;
383+
let w = if tx.is_closed() { None } else { Some(tx) };
384+
Ok(StreamEvent::SecondWrite(w))
385+
}
386+
.boxed()
387+
});
366388
}
367389
}
368390
StreamEvent::FirstWrite(None) => {

crates/misc/component-async-tests/tests/scenario/transmit.rs

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,9 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
360360
ReadNone(Option<StreamReader<Option<String>>>),
361361
}
362362

363-
let (control_tx, control_rx) = instance.stream::<_, _, Option<_>>(&mut store)?;
364-
let (caller_stream_tx, caller_stream_rx) = instance.stream::<_, _, Option<_>>(&mut store)?;
363+
let (mut control_tx, control_rx) = instance.stream::<_, _, Option<_>>(&mut store)?;
364+
let (mut caller_stream_tx, caller_stream_rx) =
365+
instance.stream::<_, _, Option<_>>(&mut store)?;
365366
let (caller_future1_tx, caller_future1_rx) = instance.future(|| unreachable!(), &mut store)?;
366367
let (_caller_future2_tx, caller_future2_rx) = instance.future(|| unreachable!(), &mut store)?;
367368

@@ -376,17 +377,28 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
376377
let mut complete = false;
377378

378379
futures.push(
379-
control_tx
380-
.write_all(accessor, Some(Control::ReadStream("a".into())))
381-
.map(|(w, _)| Ok(Event::ControlWriteA(w)))
382-
.boxed(),
380+
async move {
381+
control_tx
382+
.write_all(accessor, Some(Control::ReadStream("a".into())))
383+
.await;
384+
let w = if control_tx.is_closed() {
385+
None
386+
} else {
387+
Some(control_tx)
388+
};
389+
Ok(Event::ControlWriteA(w))
390+
}
391+
.boxed(),
383392
);
384393

385394
futures.push(
386-
caller_stream_tx
387-
.write_all(accessor, Some(String::from("a")))
388-
.map(|_| Ok(Event::WriteA))
389-
.boxed(),
395+
async move {
396+
caller_stream_tx
397+
.write_all(accessor, Some(String::from("a")))
398+
.await;
399+
Ok(Event::WriteA)
400+
}
401+
.boxed(),
390402
);
391403

392404
futures.push(
@@ -416,10 +428,14 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
416428
}
417429
Event::ControlWriteA(tx) => {
418430
futures.push(
419-
tx.unwrap()
420-
.write_all(accessor, Some(Control::ReadFuture("b".into())))
421-
.map(|(w, _)| Ok(Event::ControlWriteB(w)))
422-
.boxed(),
431+
async move {
432+
let mut tx = tx.unwrap();
433+
tx.write_all(accessor, Some(Control::ReadFuture("b".into())))
434+
.await;
435+
let w = if tx.is_closed() { None } else { Some(tx) };
436+
Ok(Event::ControlWriteB(w))
437+
}
438+
.boxed(),
423439
);
424440
}
425441
Event::WriteA => {
@@ -435,10 +451,14 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
435451
}
436452
Event::ControlWriteB(tx) => {
437453
futures.push(
438-
tx.unwrap()
439-
.write_all(accessor, Some(Control::WriteStream("c".into())))
440-
.map(|(w, _)| Ok(Event::ControlWriteC(w)))
441-
.boxed(),
454+
async move {
455+
let mut tx = tx.unwrap();
456+
tx.write_all(accessor, Some(Control::WriteStream("c".into())))
457+
.await;
458+
let w = if tx.is_closed() { None } else { Some(tx) };
459+
Ok(Event::ControlWriteC(w))
460+
}
461+
.boxed(),
442462
);
443463
}
444464
Event::WriteB(delivered) => {
@@ -454,10 +474,13 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
454474
}
455475
Event::ControlWriteC(tx) => {
456476
futures.push(
457-
tx.unwrap()
458-
.write_all(accessor, Some(Control::WriteFuture("d".into())))
459-
.map(|_| Ok(Event::ControlWriteD))
460-
.boxed(),
477+
async move {
478+
let mut tx = tx.unwrap();
479+
tx.write_all(accessor, Some(Control::WriteFuture("d".into())))
480+
.await;
481+
Ok(Event::ControlWriteD)
482+
}
483+
.boxed(),
461484
);
462485
}
463486
Event::ReadC(None, _) => unreachable!(),

crates/wasi-http/src/p3/host/types.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,8 @@ where
238238
let mut contents_tx = self.contents_tx;
239239
match buffer {
240240
Some(BodyFrame::Data(buffer)) => {
241-
let (tx_tail, buffer) =
242-
contents_tx.write_all(store, Cursor::new(buffer)).await;
243-
let Some(tx_tail) = tx_tail else {
241+
let buffer = contents_tx.write_all(store, Cursor::new(buffer)).await;
242+
if contents_tx.is_closed() {
244243
let Ok(mut body) = self.body.lock() else {
245244
bail!("lock poisoned");
246245
};
@@ -254,8 +253,7 @@ where
254253
content_length,
255254
};
256255
return Ok(());
257-
};
258-
contents_tx = tx_tail;
256+
}
259257
}
260258
Some(BodyFrame::Trailers(..)) => bail!("corrupted guest body state"),
261259
None => {}
@@ -289,7 +287,7 @@ where
289287
return Ok(());
290288
};
291289
rx_buffer = b;
292-
let tx_tail = contents_tx;
290+
let mut tx_tail = contents_tx;
293291
let Some(rx_tail) = rx_tail else {
294292
debug_assert!(rx_buffer.is_empty());
295293
if let Some(ContentLength { limit, sent }) = content_length {
@@ -342,8 +340,8 @@ where
342340
let buffer = rx_buffer.split().freeze();
343341
rx_buffer.reserve(DEFAULT_BUFFER_CAPACITY);
344342
contents_rx = rx_tail;
345-
let (tx_tail, buffer) = tx_tail.write_all(store, Cursor::new(buffer)).await;
346-
let Some(tx_tail) = tx_tail else {
343+
let buffer = tx_tail.write_all(store, Cursor::new(buffer)).await;
344+
if tx_tail.is_closed() {
347345
let Ok(mut body) = self.body.lock() else {
348346
bail!("lock poisoned");
349347
};
@@ -413,9 +411,8 @@ where
413411
let mut contents_tx = self.contents_tx;
414412
match buffer {
415413
Some(BodyFrame::Data(buffer)) => {
416-
let (tx_tail, buffer) =
417-
contents_tx.write_all(store, Cursor::new(buffer)).await;
418-
let Some(tx_tail) = tx_tail else {
414+
let buffer = contents_tx.write_all(store, Cursor::new(buffer)).await;
415+
if contents_tx.is_closed() {
419416
let Ok(mut body) = self.body.lock() else {
420417
bail!("lock poisoned");
421418
};
@@ -426,8 +423,7 @@ where
426423
buffer: Some(BodyFrame::Data(buffer)),
427424
};
428425
return Ok(());
429-
};
430-
contents_tx = tx_tail;
426+
}
431427
}
432428
Some(BodyFrame::Trailers(..)) => bail!("corrupted guest body state"),
433429
None => {}
@@ -469,9 +465,9 @@ where
469465
Some(Some(Ok(frame))) => {
470466
match frame.into_data().map_err(http_body::Frame::into_trailers) {
471467
Ok(buffer) => {
472-
let (tx_tail, buffer) =
468+
let buffer =
473469
contents_tx.write_all(store, Cursor::new(buffer)).await;
474-
let Some(tx_tail) = tx_tail else {
470+
if contents_tx.is_closed() {
475471
let Ok(mut body) = self.body.lock() else {
476472
bail!("lock poisoned");
477473
};
@@ -483,7 +479,6 @@ where
483479
};
484480
return Ok(());
485481
};
486-
contents_tx = tx_tail;
487482
}
488483
Err(Ok(trailers)) => {
489484
drop(contents_tx);

crates/wasi/src/p3/cli/host.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,21 @@ where
2626
async fn run(mut self, store: &Accessor<T, WasiCli<U>>) -> wasmtime::Result<()> {
2727
let mut tx = self.tx;
2828
let mut buf = BytesMut::with_capacity(8096);
29-
loop {
29+
while !tx.is_closed() {
30+
buf.clear();
3031
match self.input.read_buf(&mut buf).await {
3132
Ok(0) => return Ok(()),
3233
Ok(_) => {
33-
let (Some(tail), buf_again) = tx.write_all(store, Cursor::new(buf)).await
34-
else {
35-
break Ok(());
36-
};
37-
tx = tail;
38-
buf = buf_again.into_inner();
39-
buf.clear();
34+
buf = tx.write_all(store, Cursor::new(buf)).await.into_inner();
4035
}
4136
Err(_err) => {
4237
// TODO: Close the stream with an error context
4338
drop(tx);
44-
return Ok(());
39+
break;
4540
}
4641
}
4742
}
43+
Ok(())
4844
}
4945
}
5046

crates/wasi/src/p3/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,10 @@ where
251251
break Ok(());
252252
}
253253
Some(Ok(buf)) => {
254-
let fut = tx.write_all(store, buf.into());
255-
let (Some(tail), _) = fut.await else {
254+
tx.write_all(store, buf.into()).await;
255+
if tx.is_closed() {
256256
break Ok(());
257-
};
258-
tx = tail;
257+
}
259258
}
260259
Some(Err(err)) => {
261260
// TODO: Close the stream with an error context

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,10 @@ where
177177
.push(TcpSocket::from_state(state, self.family))
178178
.context("failed to push socket to table")
179179
})?;
180-
let (Some(tail), _) = tx.write(store, Some(socket)).await else {
180+
tx.write(store, Some(socket)).await;
181+
if tx.is_closed() {
181182
return Ok(());
182-
};
183-
tx = tail;
183+
}
184184
}
185185
Ok(())
186186
}

0 commit comments

Comments
 (0)