Skip to content

Commit 9860fd6

Browse files
V2
1 parent 3920949 commit 9860fd6

2 files changed

Lines changed: 91 additions & 98 deletions

File tree

node-graph/graph-craft/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ web-sys = { workspace = true, features = [
6666
"DomStringList",
6767
"Event",
6868
"EventTarget",
69+
"Window",
6970
"StorageManager",
7071
"FileSystemDirectoryHandle",
7172
"FileSystemFileHandle",

node-graph/graph-craft/src/application_io/resource/opfs.rs

Lines changed: 90 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::task::{Context, Poll, Waker};
77
use wasm_bindgen::JsCast;
88
use wasm_bindgen::prelude::*;
99
use wasm_bindgen_futures::{JsFuture, spawn_local};
10-
use web_sys::{DomException, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, FileSystemGetFileOptions, FileSystemWritableFileStream};
10+
use web_sys::{Blob, DomException, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, FileSystemGetFileOptions, FileSystemWritableFileStream, WritableStream};
1111

1212
enum Mutation {
1313
Write { hash: ResourceHash, bytes: Arc<[u8]> },
@@ -32,16 +32,15 @@ unsafe impl Sync for OpfsResourceStorage {}
3232

3333
impl OpfsResourceStorage {
3434
pub async fn load(directory_name: &str) -> Result<Self, JsValue> {
35-
let root = open_root_directory(directory_name).await?;
36-
37-
let on_disk = read_all_keys(&root).await.unwrap_or_else(|error| {
35+
let directory = open_resource_directory(directory_name).await?;
36+
let on_disk = enumerate_hashes(&directory).await.unwrap_or_else(|error| {
3837
log::error!("Failed to enumerate existing OPFS resource keys: {error:?}");
3938
HashSet::new()
4039
});
4140

4241
Ok(Self {
4342
inner: Arc::new(Mutex::new(Inner {
44-
directory: root,
43+
directory,
4544
cache: HashMap::new(),
4645
on_disk,
4746
queue: VecDeque::new(),
@@ -68,8 +67,7 @@ impl Resources for OpfsResourceStorage {
6867

6968
let (sender, receiver) = oneshot();
7069
spawn_local(async move {
71-
let result = fetch_and_verify(inner, hash).await;
72-
sender.send(result);
70+
sender.send(read_from_opfs(inner, hash).await);
7371
});
7472

7573
Box::pin(receiver)
@@ -83,15 +81,17 @@ impl ResourceStorage for OpfsResourceStorage {
8381

8482
fn write(&mut self, data: &[u8]) -> ResourceHash {
8583
let hash = ResourceHash::from(data);
86-
let bytes: Arc<[u8]> = Arc::from(data);
87-
let resource = Resource::new(bytes.clone());
88-
8984
let mut guard = self.inner.lock().unwrap();
90-
guard.cache.insert(hash, resource);
9185

92-
let needs_persist = !guard.on_disk.contains(&hash) || queue_has_delete_for(&guard.queue, &hash);
86+
let mut bytes = None;
87+
if !guard.cache.contains_key(&hash) {
88+
let resource_bytes = Arc::<[u8]>::from(data);
89+
guard.cache.insert(hash, Resource::new(resource_bytes.clone()));
90+
bytes = Some(resource_bytes);
91+
}
9392

94-
if needs_persist {
93+
if !guard.on_disk.contains(&hash) {
94+
let bytes = bytes.unwrap_or_else(|| Arc::<[u8]>::from(data));
9595
guard.on_disk.insert(hash);
9696
guard.queue.push_back(Mutation::Write { hash, bytes });
9797
kick_worker(&self.inner, &mut guard);
@@ -106,57 +106,48 @@ impl ResourceStorage for OpfsResourceStorage {
106106
}
107107

108108
fn garbage_collect(&mut self, used: &[ResourceHash]) {
109-
let used_set: HashSet<ResourceHash> = used.iter().copied().collect();
109+
let used: HashSet<ResourceHash> = used.iter().copied().collect();
110110
let mut guard = self.inner.lock().unwrap();
111111

112-
guard.cache.retain(|hash, _| used_set.contains(hash));
112+
guard.cache.retain(|hash, _| used.contains(hash));
113113

114-
let to_delete: Vec<ResourceHash> = guard.on_disk.iter().copied().filter(|hash| !used_set.contains(hash)).collect();
115-
116-
if to_delete.is_empty() {
117-
return;
114+
let unused: Vec<ResourceHash> = guard.on_disk.iter().copied().filter(|hash| !used.contains(hash)).collect();
115+
for hash in unused {
116+
guard.on_disk.remove(&hash);
117+
guard.queue.push_back(Mutation::Delete { hash });
118118
}
119119

120-
for hash in &to_delete {
121-
guard.on_disk.remove(hash);
122-
guard.queue.push_back(Mutation::Delete { hash: *hash });
120+
if !guard.queue.is_empty() {
121+
kick_worker(&self.inner, &mut guard);
123122
}
124-
125-
kick_worker(&self.inner, &mut guard);
126123
}
127124
}
128125

129-
fn queue_has_delete_for(queue: &VecDeque<Mutation>, hash: &ResourceHash) -> bool {
130-
queue.iter().any(|mutation| matches!(mutation, Mutation::Delete { hash: other } if other == hash))
131-
}
132-
133126
fn kick_worker(inner: &Arc<Mutex<Inner>>, guard: &mut Inner) {
134127
if guard.worker_active {
135128
return;
136129
}
130+
137131
guard.worker_active = true;
138132
let inner = inner.clone();
139133
spawn_local(drain_queue(inner));
140134
}
141135

142136
async fn drain_queue(inner: Arc<Mutex<Inner>>) {
143137
loop {
144-
let (op, directory) = {
138+
let (directory, mutation) = {
145139
let mut guard = inner.lock().unwrap();
146-
match guard.queue.pop_front() {
147-
Some(op) => (op, guard.directory.clone()),
148-
None => {
149-
guard.worker_active = false;
150-
return;
151-
}
152-
}
140+
let Some(mutation) = guard.queue.pop_front() else {
141+
guard.worker_active = false;
142+
return;
143+
};
144+
(guard.directory.clone(), mutation)
153145
};
154146

155-
match op {
147+
match mutation {
156148
Mutation::Write { hash, bytes } => {
157149
if let Err(error) = write_file(&directory, &hash, &bytes).await {
158150
log::error!("OPFS write for {hash} failed: {error:?}");
159-
inner.lock().unwrap().on_disk.remove(&hash);
160151
}
161152
}
162153
Mutation::Delete { hash } => {
@@ -168,103 +159,94 @@ async fn drain_queue(inner: Arc<Mutex<Inner>>) {
168159
}
169160
}
170161

171-
async fn open_root_directory(directory_name: &str) -> Result<FileSystemDirectoryHandle, JsValue> {
162+
async fn open_resource_directory(directory_name: &str) -> Result<FileSystemDirectoryHandle, JsValue> {
172163
let storage = web_sys::window().ok_or_else(|| JsValue::from_str("no window"))?.navigator().storage();
164+
let root: FileSystemDirectoryHandle = JsFuture::from(storage.get_directory()).await?.dyn_into()?;
173165

174-
let opfs_root: FileSystemDirectoryHandle = JsFuture::from(storage.get_directory()).await?.dyn_into()?;
175-
176-
let create = FileSystemGetDirectoryOptions::new();
177-
create.set_create(true);
166+
let options = FileSystemGetDirectoryOptions::new();
167+
options.set_create(true);
178168

179-
JsFuture::from(opfs_root.get_directory_handle_with_options(directory_name, &create)).await?.dyn_into()
169+
JsFuture::from(root.get_directory_handle_with_options(directory_name, &options)).await?.dyn_into()
180170
}
181171

182-
async fn read_all_keys(directory: &FileSystemDirectoryHandle) -> Result<HashSet<ResourceHash>, JsValue> {
172+
async fn enumerate_hashes(directory: &FileSystemDirectoryHandle) -> Result<HashSet<ResourceHash>, JsValue> {
183173
let iterator = directory.keys();
184-
let mut keys = HashSet::new();
174+
let mut hashes = HashSet::new();
185175

186176
loop {
187-
let next: js_sys::IteratorNext = JsFuture::from(iterator.next_iterator()?).await?.unchecked_into();
177+
let next: js_sys::IteratorNext = JsFuture::from(iterator.next()?).await?.unchecked_into();
188178
if next.done() {
189179
break;
190180
}
191181

192182
let Some(name) = next.value().as_string() else {
193-
log::warn!("Skipping non-string OPFS entry name");
183+
log::warn!("Skipping non-string OPFS resource entry");
194184
continue;
195185
};
196186

197187
match ResourceHash::try_from(name.as_str()) {
198188
Ok(hash) => {
199-
keys.insert(hash);
189+
hashes.insert(hash);
200190
}
201-
Err(error) => log::warn!("Skipping unparseable OPFS entry {name:?}: {error}"),
191+
Err(error) => log::warn!("Skipping non-resource OPFS entry {name:?}: {error}"),
202192
}
203193
}
204194

205-
Ok(keys)
206-
}
207-
208-
fn file_name(hash: &ResourceHash) -> String {
209-
String::from(hash)
195+
Ok(hashes)
210196
}
211197

212198
async fn write_file(directory: &FileSystemDirectoryHandle, hash: &ResourceHash, bytes: &[u8]) -> Result<(), JsValue> {
213-
let create = FileSystemGetFileOptions::new();
214-
create.set_create(true);
199+
let options = FileSystemGetFileOptions::new();
200+
options.set_create(true);
215201

216202
let name = file_name(hash);
217-
let handle: FileSystemFileHandle = JsFuture::from(directory.get_file_handle_with_options(&name, &create)).await?.dyn_into()?;
218-
203+
let handle: FileSystemFileHandle = JsFuture::from(directory.get_file_handle_with_options(&name, &options)).await?.dyn_into()?;
219204
let writable: FileSystemWritableFileStream = JsFuture::from(handle.create_writable()).await?.dyn_into()?;
205+
let stream: WritableStream = writable.clone().unchecked_into();
206+
let bytes = Uint8Array::from(bytes);
220207

221-
let view = Uint8Array::new_with_length(bytes.len() as u32);
222-
view.copy_from(bytes);
223-
224-
let write_result = JsFuture::from(writable.write_with_js_u8_array(&view)?).await;
225-
let close_result = JsFuture::from(writable.close()).await;
208+
if let Err(error) = JsFuture::from(writable.write_with_js_u8_array(&bytes)?).await {
209+
let _ = JsFuture::from(stream.abort()).await;
210+
return Err(error);
211+
}
226212

227-
write_result?;
228-
close_result?;
213+
JsFuture::from(stream.close()).await?;
229214
Ok(())
230215
}
231216

232217
async fn delete_file(directory: &FileSystemDirectoryHandle, hash: &ResourceHash) -> Result<(), JsValue> {
233218
let name = file_name(hash);
234219
match JsFuture::from(directory.remove_entry(&name)).await {
235220
Ok(_) => Ok(()),
236-
Err(error) => {
237-
if is_not_found(&error) {
238-
Ok(())
239-
} else {
240-
Err(error)
241-
}
242-
}
221+
Err(error) if is_not_found(&error) => Ok(()),
222+
Err(error) => Err(error),
243223
}
244224
}
245225

246-
fn is_not_found(error: &JsValue) -> bool {
247-
error.dyn_ref::<DomException>().is_some_and(|error| error.name() == "NotFoundError")
248-
}
226+
async fn read_from_opfs(inner: Arc<Mutex<Inner>>, hash: ResourceHash) -> Option<Resource> {
227+
let directory = {
228+
let guard = inner.lock().unwrap();
229+
if let Some(resource) = guard.cache.get(&hash) {
230+
return Some(resource.clone());
231+
}
232+
if !guard.on_disk.contains(&hash) {
233+
return None;
234+
}
235+
guard.directory.clone()
236+
};
249237

250-
async fn fetch_and_verify(inner: Arc<Mutex<Inner>>, hash: ResourceHash) -> Option<Resource> {
251-
let directory = inner.lock().unwrap().directory.clone();
252238
let name = file_name(&hash);
253-
254-
let handle = match JsFuture::from(directory.get_file_handle(&name)).await {
255-
Ok(value) => match value.dyn_into::<FileSystemFileHandle>() {
239+
let handle: FileSystemFileHandle = match JsFuture::from(directory.get_file_handle(&name)).await {
240+
Ok(value) => match value.dyn_into() {
256241
Ok(handle) => handle,
257242
Err(value) => {
258-
log::error!("OPFS get_file_handle returned non-FileSystemFileHandle for {hash}: {value:?}");
243+
log::error!("OPFS returned non-file handle for {hash}: {value:?}");
259244
return None;
260245
}
261246
},
247+
Err(error) if is_not_found(&error) => return None,
262248
Err(error) => {
263-
if is_not_found(&error) {
264-
inner.lock().unwrap().on_disk.remove(&hash);
265-
return None;
266-
}
267-
log::error!("OPFS get_file_handle for {hash} failed: {error:?}");
249+
log::error!("OPFS getFileHandle for {hash} failed: {error:?}");
268250
return None;
269251
}
270252
};
@@ -276,39 +258,49 @@ async fn fetch_and_verify(inner: Arc<Mutex<Inner>>, hash: ResourceHash) -> Optio
276258
return None;
277259
}
278260
};
279-
let blob: web_sys::Blob = match file.dyn_into() {
261+
let blob: Blob = match file.dyn_into() {
280262
Ok(blob) => blob,
281263
Err(value) => {
282264
log::error!("OPFS getFile returned non-Blob for {hash}: {value:?}");
283265
return None;
284266
}
285267
};
286-
287268
let buffer = match JsFuture::from(blob.array_buffer()).await {
288269
Ok(buffer) => buffer,
289270
Err(error) => {
290-
log::error!("Reading OPFS array buffer for {hash} failed: {error:?}");
271+
log::error!("OPFS arrayBuffer for {hash} failed: {error:?}");
291272
return None;
292273
}
293274
};
294-
let array = Uint8Array::new(&buffer);
295-
let bytes: Arc<[u8]> = Arc::from(array.to_vec());
296275

276+
let bytes: Arc<[u8]> = Uint8Array::new(&buffer).to_vec().into();
297277
let actual = ResourceHash::from(bytes.as_ref());
298278
if actual != hash {
299-
log::error!("OPFS content-integrity failure: file {hash} hashes to {actual}, removing");
300-
let mut guard = inner.lock().unwrap();
301-
guard.on_disk.remove(&hash);
302-
guard.queue.push_back(Mutation::Delete { hash });
303-
kick_worker(&inner, &mut guard);
279+
log::error!("OPFS content-integrity failure: file {hash} hashes to {actual}");
304280
return None;
305281
}
306282

307283
let resource = Resource::new(bytes);
308-
inner.lock().unwrap().cache.insert(hash, resource.clone());
284+
let mut guard = inner.lock().unwrap();
285+
if let Some(resource) = guard.cache.get(&hash) {
286+
return Some(resource.clone());
287+
}
288+
if !guard.on_disk.contains(&hash) {
289+
return None;
290+
}
291+
292+
guard.cache.insert(hash, resource.clone());
309293
Some(resource)
310294
}
311295

296+
fn file_name(hash: &ResourceHash) -> String {
297+
String::from(hash)
298+
}
299+
300+
fn is_not_found(error: &JsValue) -> bool {
301+
error.dyn_ref::<DomException>().is_some_and(|error| error.name() == "NotFoundError")
302+
}
303+
312304
fn oneshot() -> (OneshotSender, OneshotReceiver) {
313305
let state = Arc::new(Mutex::new(OneshotState::default()));
314306
(OneshotSender { state: state.clone() }, OneshotReceiver { state })

0 commit comments

Comments
 (0)