Skip to content

Commit 01453cd

Browse files
authored
wasip3: Refactor the readdir stream iterators (#11615)
* wasip3: Refactor `BlockingDirectoryStreamProducer` * Generalize this into a `FallibleStreamProducer` structure * Don't read the entire iterator on the first call to `poll_produce` * Do a blocking read of `dir.entries()` in the original function call to avoid handling state in the iterator itself. * wasip3: Refactor `NonblockingDirectoryStreamProducer` * Start the reading task before iteration starts to move the spawn out of the `poll_*` method. * Rely on fusing behavior of mpsc/tasks to avoid extra state structure. * Specifically handle 0-length reads. Mostly try to refactor the state representation to be more struct-like rather than enum like which is a little easier to follow. * wasip3: Port `preview1_fd_readdir` to WASIp3 Have at least one test looking at the readdir behavior. * Fix `finish` handling in `FallibleIteratorProducer` * Fix a typo
1 parent 92f7974 commit 01453cd

File tree

4 files changed

+294
-142
lines changed

4 files changed

+294
-142
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use test_programs::p3::wasi;
2+
use test_programs::p3::wasi::filesystem::types::{
3+
Descriptor, DescriptorFlags, DescriptorType, DirectoryEntry, OpenFlags, PathFlags,
4+
};
5+
6+
struct Component;
7+
8+
test_programs::p3::export!(Component);
9+
10+
impl test_programs::p3::exports::wasi::cli::run::Guest for Component {
11+
async fn run() -> Result<(), ()> {
12+
let preopens = wasi::filesystem::preopens::get_directories();
13+
let (dir, _) = &preopens[0];
14+
15+
test_readdir(dir).await;
16+
test_readdir_lots(dir).await;
17+
Ok(())
18+
}
19+
}
20+
21+
fn main() {
22+
unreachable!()
23+
}
24+
25+
async fn read_dir(dir: &Descriptor) -> Vec<DirectoryEntry> {
26+
let (dirs, result) = dir.read_directory().await;
27+
let mut dirs = dirs.collect().await;
28+
result.await.unwrap();
29+
dirs.sort_by_key(|d| d.name.clone());
30+
dirs
31+
}
32+
33+
async fn assert_empty_dir(dir: &Descriptor) {
34+
let dirs = read_dir(dir).await;
35+
assert_eq!(dirs.len(), 0);
36+
}
37+
38+
async fn test_readdir(dir: &Descriptor) {
39+
// Check the behavior in an empty directory
40+
assert_empty_dir(dir).await;
41+
42+
dir.open_at(
43+
PathFlags::empty(),
44+
"file".to_string(),
45+
OpenFlags::CREATE,
46+
DescriptorFlags::READ | DescriptorFlags::WRITE,
47+
)
48+
.await
49+
.unwrap();
50+
51+
dir.create_directory_at("nested".to_string()).await.unwrap();
52+
let nested = dir
53+
.open_at(
54+
PathFlags::empty(),
55+
"nested".to_string(),
56+
OpenFlags::DIRECTORY,
57+
DescriptorFlags::empty(),
58+
)
59+
.await
60+
.unwrap();
61+
62+
let entries = read_dir(dir).await;
63+
assert_eq!(entries.len(), 2);
64+
assert_eq!(entries[0].name, "file");
65+
assert_eq!(entries[0].type_, DescriptorType::RegularFile);
66+
assert_eq!(entries[1].name, "nested");
67+
assert_eq!(entries[1].type_, DescriptorType::Directory);
68+
69+
assert_empty_dir(&nested).await;
70+
drop(nested);
71+
72+
dir.unlink_file_at("file".to_string()).await.unwrap();
73+
dir.remove_directory_at("nested".to_string()).await.unwrap();
74+
}
75+
76+
async fn test_readdir_lots(dir: &Descriptor) {
77+
for count in 0..1000 {
78+
dir.open_at(
79+
PathFlags::empty(),
80+
format!("file.{count}"),
81+
OpenFlags::CREATE,
82+
DescriptorFlags::READ | DescriptorFlags::WRITE,
83+
)
84+
.await
85+
.expect("failed to create file");
86+
}
87+
88+
assert_eq!(read_dir(dir).await.len(), 1000);
89+
90+
for count in 0..1000 {
91+
dir.unlink_file_at(format!("file.{count}"))
92+
.await
93+
.expect("removing a file");
94+
}
95+
}

crates/wasi/src/p3/filesystem/host.rs

Lines changed: 84 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use crate::p3::bindings::filesystem::types::{
66
};
77
use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
88
use crate::p3::{
9-
DEFAULT_BUFFER_CAPACITY, FutureOneshotProducer, FutureReadyProducer, StreamEmptyProducer,
9+
DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer, FutureOneshotProducer, FutureReadyProducer,
10+
StreamEmptyProducer,
1011
};
1112
use crate::{DirPerms, FilePerms};
1213
use anyhow::{Context as _, anyhow};
@@ -22,7 +23,7 @@ use tokio::task::{JoinHandle, spawn_blocking};
2223
use wasmtime::StoreContextMut;
2324
use wasmtime::component::{
2425
Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
25-
StreamProducer, StreamReader, StreamResult, VecBuffer,
26+
StreamProducer, StreamReader, StreamResult,
2627
};
2728

2829
fn get_descriptor<'a>(
@@ -291,150 +292,94 @@ fn map_dir_entry(
291292
}
292293
}
293294

294-
struct BlockingDirectoryStreamProducer {
295-
dir: Arc<cap_std::fs::Dir>,
295+
struct ReadDirStream {
296+
rx: mpsc::Receiver<DirectoryEntry>,
297+
task: JoinHandle<Result<(), ErrorCode>>,
296298
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
297299
}
298300

299-
impl Drop for BlockingDirectoryStreamProducer {
300-
fn drop(&mut self) {
301-
self.close(Ok(()))
302-
}
303-
}
304-
305-
impl BlockingDirectoryStreamProducer {
306-
fn close(&mut self, res: Result<(), ErrorCode>) {
307-
if let Some(tx) = self.result.take() {
308-
_ = tx.send(res);
309-
}
310-
}
311-
}
312-
313-
impl<D> StreamProducer<D> for BlockingDirectoryStreamProducer {
314-
type Item = DirectoryEntry;
315-
type Buffer = VecBuffer<DirectoryEntry>;
316-
317-
fn poll_produce<'a>(
318-
mut self: Pin<&mut Self>,
319-
_: &mut Context<'_>,
320-
_: StoreContextMut<'a, D>,
321-
mut dst: Destination<'a, Self::Item, Self::Buffer>,
322-
_finish: bool,
323-
) -> Poll<wasmtime::Result<StreamResult>> {
324-
let entries = match self.dir.entries() {
325-
Ok(entries) => entries,
326-
Err(err) => {
327-
self.close(Err(err.into()));
328-
return Poll::Ready(Ok(StreamResult::Dropped));
329-
}
330-
};
331-
let res = match entries
332-
.filter_map(|entry| map_dir_entry(entry).transpose())
333-
.collect::<Result<Vec<_>, _>>()
334-
{
335-
Ok(entries) => {
336-
dst.set_buffer(entries.into());
337-
Ok(())
338-
}
339-
Err(err) => Err(err),
340-
};
341-
self.close(res);
342-
Poll::Ready(Ok(StreamResult::Dropped))
343-
}
344-
}
345-
346-
struct NonblockingDirectoryStreamProducer(DirStreamState);
347-
348-
enum DirStreamState {
349-
Init {
301+
impl ReadDirStream {
302+
fn new(
350303
dir: Arc<cap_std::fs::Dir>,
351304
result: oneshot::Sender<Result<(), ErrorCode>>,
352-
},
353-
InProgress {
354-
rx: mpsc::Receiver<DirectoryEntry>,
355-
task: JoinHandle<Result<(), ErrorCode>>,
356-
result: oneshot::Sender<Result<(), ErrorCode>>,
357-
},
358-
Closed,
359-
}
360-
361-
impl Drop for NonblockingDirectoryStreamProducer {
362-
fn drop(&mut self) {
363-
self.close(Ok(()))
305+
) -> ReadDirStream {
306+
let (tx, rx) = mpsc::channel(1);
307+
ReadDirStream {
308+
task: spawn_blocking(move || {
309+
let entries = dir.entries()?;
310+
for entry in entries {
311+
if let Some(entry) = map_dir_entry(entry)? {
312+
if let Err(_) = tx.blocking_send(entry) {
313+
break;
314+
}
315+
}
316+
}
317+
Ok(())
318+
}),
319+
rx,
320+
result: Some(result),
321+
}
364322
}
365-
}
366323

367-
impl NonblockingDirectoryStreamProducer {
368324
fn close(&mut self, res: Result<(), ErrorCode>) {
369-
if let DirStreamState::Init { result, .. } | DirStreamState::InProgress { result, .. } =
370-
mem::replace(&mut self.0, DirStreamState::Closed)
371-
{
372-
_ = result.send(res);
373-
}
325+
self.rx.close();
326+
self.task.abort();
327+
let _ = self.result.take().unwrap().send(res);
374328
}
375329
}
376330

377-
impl<D> StreamProducer<D> for NonblockingDirectoryStreamProducer {
331+
impl<D> StreamProducer<D> for ReadDirStream {
378332
type Item = DirectoryEntry;
379333
type Buffer = Option<DirectoryEntry>;
380334

381335
fn poll_produce<'a>(
382336
mut self: Pin<&mut Self>,
383337
cx: &mut Context<'_>,
384-
store: StoreContextMut<'a, D>,
338+
mut store: StoreContextMut<'a, D>,
385339
mut dst: Destination<'a, Self::Item, Self::Buffer>,
386340
finish: bool,
387341
) -> Poll<wasmtime::Result<StreamResult>> {
388-
match mem::replace(&mut self.0, DirStreamState::Closed) {
389-
DirStreamState::Init { .. } if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
390-
DirStreamState::Init { dir, result } => {
391-
let (entry_tx, entry_rx) = mpsc::channel(1);
392-
let task = spawn_blocking(move || {
393-
let entries = dir.entries()?;
394-
for entry in entries {
395-
if let Some(entry) = map_dir_entry(entry)? {
396-
if let Err(_) = entry_tx.blocking_send(entry) {
397-
break;
398-
}
399-
}
400-
}
401-
Ok(())
402-
});
403-
self.0 = DirStreamState::InProgress {
404-
rx: entry_rx,
405-
task,
406-
result,
407-
};
408-
self.poll_produce(cx, store, dst, finish)
342+
// If this is a 0-length read then `mpsc::Receiver` does not expose an
343+
// API to wait for an item to be available without taking it out of the
344+
// channel. In lieu of that just say that we're complete and ready for a
345+
// read.
346+
if dst.remaining(&mut store) == Some(0) {
347+
return Poll::Ready(Ok(StreamResult::Completed));
348+
}
349+
350+
match self.rx.poll_recv(cx) {
351+
// If an item is on the channel then send that along and say that
352+
// the read is now complete with one item being yielded.
353+
Poll::Ready(Some(item)) => {
354+
dst.set_buffer(Some(item));
355+
Poll::Ready(Ok(StreamResult::Completed))
409356
}
410-
DirStreamState::InProgress {
411-
mut rx,
412-
mut task,
413-
result,
414-
} => {
415-
let Poll::Ready(res) = rx.poll_recv(cx) else {
416-
self.0 = DirStreamState::InProgress { rx, task, result };
417-
if finish {
418-
return Poll::Ready(Ok(StreamResult::Cancelled));
419-
}
420-
return Poll::Pending;
421-
};
422-
match res {
423-
Some(entry) => {
424-
self.0 = DirStreamState::InProgress { rx, task, result };
425-
dst.set_buffer(Some(entry));
426-
Poll::Ready(Ok(StreamResult::Completed))
427-
}
428-
None => {
429-
let res = ready!(Pin::new(&mut task).poll(cx))
430-
.context("failed to join I/O task")?;
431-
self.0 = DirStreamState::InProgress { rx, task, result };
432-
self.close(res);
433-
Poll::Ready(Ok(StreamResult::Dropped))
434-
}
435-
}
357+
358+
// If there's nothing left on the channel then that means that an
359+
// error occurred or the iterator is done. In both cases an
360+
// un-cancellable wait for the spawned task is entered and we await
361+
// its completion. Upon completion there our own stream is closed
362+
// with the result (sending an error code on our oneshot) and then
363+
// the stream is reported as dropped.
364+
Poll::Ready(None) => {
365+
let result = ready!(Pin::new(&mut self.task).poll(cx))
366+
.expect("spawned task should not panic");
367+
self.close(result);
368+
Poll::Ready(Ok(StreamResult::Dropped))
436369
}
437-
DirStreamState::Closed => Poll::Ready(Ok(StreamResult::Dropped)),
370+
371+
// If an item isn't ready yet then cancel this outstanding request
372+
// if `finish` is set, otherwise propagate the `Pending` status.
373+
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
374+
Poll::Pending => Poll::Pending,
375+
}
376+
}
377+
}
378+
379+
impl Drop for ReadDirStream {
380+
fn drop(&mut self) {
381+
if self.result.is_some() {
382+
self.close(Ok(()));
438383
}
439384
}
440385
}
@@ -848,23 +793,22 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
848793
let dir = Arc::clone(dir.as_dir());
849794
let (result_tx, result_rx) = oneshot::channel();
850795
let stream = if allow_blocking_current_thread {
851-
StreamReader::new(
852-
instance,
853-
&mut store,
854-
BlockingDirectoryStreamProducer {
855-
dir,
856-
result: Some(result_tx),
857-
},
858-
)
796+
match dir.entries() {
797+
Ok(readdir) => StreamReader::new(
798+
instance,
799+
&mut store,
800+
FallibleIteratorProducer::new(
801+
readdir.filter_map(|e| map_dir_entry(e).transpose()),
802+
result_tx,
803+
),
804+
),
805+
Err(e) => {
806+
result_tx.send(Err(e.into())).unwrap();
807+
StreamReader::new(instance, &mut store, StreamEmptyProducer::default())
808+
}
809+
}
859810
} else {
860-
StreamReader::new(
861-
instance,
862-
&mut store,
863-
NonblockingDirectoryStreamProducer(DirStreamState::Init {
864-
dir,
865-
result: result_tx,
866-
}),
867-
)
811+
StreamReader::new(instance, &mut store, ReadDirStream::new(dir, result_tx))
868812
};
869813
Ok((
870814
stream,

0 commit comments

Comments
 (0)