Skip to content

Commit c902384

Browse files
WIP 2
1 parent 1d9a07c commit c902384

9 files changed

Lines changed: 110 additions & 71 deletions

File tree

editor/src/messages/portfolio/resource/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ mod resource_message_handler;
44
#[doc(inline)]
55
pub use resource_message::{ResourceMessage, ResourceMessageDiscriminant};
66
#[doc(inline)]
7-
pub use resource_message_handler::{ResourceMessageContext, ResourceMessageHandler, ResourceReadHandle, resources};
7+
pub use resource_message_handler::{ResourceMessageContext, ResourceMessageHandler, ResourceReadHandle, resources, set_resource_storage};

editor/src/messages/portfolio/resource/resource_message_handler.rs

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,50 @@
11
use crate::messages::prelude::*;
22
use graph_craft::application_io::{ResourceFuture, ResourceHash, ResourceStorage, Resources};
3-
use std::sync::{Arc, RwLock};
3+
use std::sync::{OnceLock, RwLock};
44

5-
const RESOURCE_STORAGE: Arc<RwLock<Option<Box<dyn ResourceStorage>>>> = Arc::new(RwLock::new(None));
5+
/// Process-wide replaceable resource storage. The `RwLock` lets the message handler take exclusive
6+
/// write access (`&mut dyn ResourceStorage`) when processing a `ResourceMessage`, while any number
7+
/// of [`ResourceReadHandle`] clones can take the read side concurrently to call `Resources::load`.
8+
static RESOURCE_STORAGE: OnceLock<RwLock<Option<Box<dyn ResourceStorage>>>> = OnceLock::new();
69

7-
pub(crate) fn set_resource_storage(storage: Box<dyn ResourceStorage>) {
8-
let binding = RESOURCE_STORAGE;
9-
let mut guard = binding.write().unwrap();
10-
*guard = Some(storage);
10+
fn cell() -> &'static RwLock<Option<Box<dyn ResourceStorage>>> {
11+
RESOURCE_STORAGE.get_or_init(|| RwLock::new(None))
1112
}
1213

13-
pub fn resources() -> Box<dyn Resources> {
14-
let binding = RESOURCE_STORAGE;
15-
let guard = binding.read().unwrap();
16-
Box::new(ResourceReadHandle {
17-
storage: guard.as_ref().expect("Resource storage not initialized").clone(),
18-
})
14+
/// Install (or replace) the underlying storage. Existing `ResourceReadHandle` clones automatically
15+
/// observe the new backing on their next `load`.
16+
pub fn set_resource_storage(storage: Box<dyn ResourceStorage>) {
17+
*cell().write().unwrap_or_else(|p| p.into_inner()) = Some(storage);
18+
}
19+
20+
/// Write-side accessor for the message handler only. Returns a write-locked guard that derefs to
21+
/// `&mut Option<Box<dyn ResourceStorage>>`; private on purpose — the read handle below is the only
22+
/// surface exposed to the rest of the editor.
23+
fn resources_mut() -> std::sync::RwLockWriteGuard<'static, Option<Box<dyn ResourceStorage>>> {
24+
cell().write().unwrap_or_else(|p| p.into_inner())
25+
}
26+
27+
/// Unlimited read-only handle. Each `Resources::load` call independently takes the static's read
28+
/// lock, invokes the backend's `load` (which returns a `'static` owned future), then releases the
29+
/// lock before awaiting. Crucially the lock is never held across an `.await`, so a concurrent
30+
/// write/GC from the message handler can never deadlock against a pending IDB fetch.
31+
pub fn resources() -> ResourceReadHandle {
32+
ResourceReadHandle
33+
}
34+
35+
#[derive(Clone, Copy, Default)]
36+
pub struct ResourceReadHandle;
37+
38+
impl Resources for ResourceReadHandle {
39+
fn load(&self, hash: ResourceHash) -> ResourceFuture {
40+
let future = cell().read().unwrap_or_else(|p| p.into_inner()).as_deref().map(|storage| storage.load(hash));
41+
Box::pin(async move {
42+
match future {
43+
Some(future) => future.await,
44+
None => None,
45+
}
46+
})
47+
}
1948
}
2049

2150
#[derive(Default, ExtractField)]
@@ -27,40 +56,35 @@ impl std::fmt::Debug for ResourceMessageHandler {
2756
}
2857
}
2958

30-
/// Read-only access to a `ResourceMessageHandler`'s storage. Cheap to clone.
31-
#[derive(Clone)]
32-
pub struct ResourceReadHandle {
33-
storage: Arc<RwLock<Box<dyn ResourceStorage>>>,
34-
}
35-
36-
impl Resources for ResourceReadHandle {
37-
fn load<'a>(&'a self, hash: &'a ResourceHash) -> ResourceFuture<'a> {
38-
self.storage.load(hash)
39-
}
40-
}
41-
4259
#[derive(ExtractField)]
4360
pub struct ResourceMessageContext {}
4461

4562
#[message_handler_data]
4663
impl MessageHandler<ResourceMessage, ResourceMessageContext> for ResourceMessageHandler {
4764
fn process_message(&mut self, message: ResourceMessage, responses: &mut VecDeque<Message>, _context: ResourceMessageContext) {
65+
let mut guard = resources_mut();
66+
let Some(storage) = guard.as_deref_mut() else {
67+
log::error!("Resource storage not initialized; dropping {message:?}");
68+
return;
69+
};
70+
4871
match message {
4972
ResourceMessage::Write { data } => {
50-
let _hash = self.storage.write(data.as_ref());
73+
let _hash = storage.write(data.as_ref());
5174
}
5275
ResourceMessage::GarbageCollect { used } => {
53-
self.storage.garbage_collect(&used);
76+
storage.garbage_collect(&used);
5477
}
5578
ResourceMessage::Export { document_id, resources } => {
5679
let mut exported = Vec::with_capacity(resources.len());
5780
for hash in resources.iter() {
58-
if let Some(resource) = self.storage.read(hash) {
81+
if let Some(resource) = storage.read(hash) {
5982
exported.push((*hash, resource));
6083
} else {
6184
log::error!("Resource not found for hash {hash} during export");
6285
}
6386
}
87+
drop(guard);
6488

6589
responses.add(PortfolioMessage::DocumentPassMessage {
6690
document_id,

node-graph/graph-craft/src/application_io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl ApplicationIo for PlatformApplicationIo {
6060
self.gpu_executor.as_ref()
6161
}
6262

63-
fn load_resource<'a>(&'a self, hash: &'a ResourceHash) -> graphene_application_io::ResourceFuture<'a> {
63+
fn load_resource(&self, hash: ResourceHash) -> graphene_application_io::ResourceFuture {
6464
self.resources.as_ref().expect("Resource storage not initialized").load(hash)
6565
}
6666
}

node-graph/graph-craft/src/application_io/resource.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub mod indexed_db;
33
#[cfg(not(target_family = "wasm"))]
44
pub mod mmap;
55

6-
use graphene_application_io::{Resource, ResourceFuture, ResourceHash, Resources, ResourceStorage};
6+
use graphene_application_io::{Resource, ResourceFuture, ResourceHash, ResourceStorage, Resources};
77
use std::collections::HashMap;
88
use std::sync::{Arc, Mutex};
99

@@ -19,29 +19,29 @@ impl HashMapResourceStorage {
1919
}
2020

2121
impl Resources for HashMapResourceStorage {
22-
fn load<'a>(&'a self, hash: &'a ResourceHash) -> ResourceFuture<'a> {
23-
let result = ResourceStorage::read(self, hash);
22+
fn load(&self, hash: ResourceHash) -> ResourceFuture {
23+
let result = self.resources.lock().unwrap().get(&hash).cloned();
2424
Box::pin(async move { result })
2525
}
2626
}
2727

2828
impl ResourceStorage for HashMapResourceStorage {
29-
fn read(&self, hash: &ResourceHash) -> Option<Resource> {
30-
self.resources.lock().unwrap().get(hash).cloned()
29+
fn read(&mut self, hash: &ResourceHash) -> Option<Resource> {
30+
self.resources.get_mut().unwrap().get(hash).cloned()
3131
}
3232

33-
fn write(&self, data: &[u8]) -> ResourceHash {
33+
fn write(&mut self, data: &[u8]) -> ResourceHash {
3434
let hash = ResourceHash::from(data);
35-
self.resources.lock().unwrap().insert(hash, Resource::new(Arc::<[u8]>::from(data)));
35+
self.resources.get_mut().unwrap().insert(hash, Resource::new(Arc::<[u8]>::from(data)));
3636
hash
3737
}
3838

39-
fn contains(&self, hash: &ResourceHash) -> bool {
40-
self.resources.lock().unwrap().contains_key(hash)
39+
fn contains(&mut self, hash: &ResourceHash) -> bool {
40+
self.resources.get_mut().unwrap().contains_key(hash)
4141
}
4242

43-
fn garbage_collect(&self, used: &[ResourceHash]) {
43+
fn garbage_collect(&mut self, used: &[ResourceHash]) {
4444
let used_set: std::collections::HashSet<&ResourceHash> = used.iter().collect();
45-
self.resources.lock().unwrap().retain(|hash, _| used_set.contains(hash));
45+
self.resources.get_mut().unwrap().retain(|hash, _| used_set.contains(hash));
4646
}
4747
}

node-graph/graph-craft/src/application_io/resource/indexed_db.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,33 +133,41 @@ impl IndexedDbResourceStorage {
133133
}
134134

135135
impl Resources for IndexedDbResourceStorage {
136-
fn load<'a>(&'a self, hash: &'a ResourceHash) -> ResourceFuture<'a> {
137-
// The underlying future captures `&IdbDatabase` which is not `Send`, but wasm is single
138-
// threaded so the trait's `Send` bound is purely structural here; wrap it accordingly.
136+
fn load(&self, hash: ResourceHash) -> ResourceFuture {
137+
// Clone the internal `Arc`s out so the returned future owns its state and doesn't borrow
138+
// `self`. This lets the read-side handle drop its `RwLock` read guard before awaiting, which
139+
// is what avoids the otherwise-fatal write-vs-read deadlock on single-threaded wasm.
140+
let db = self.db.clone();
141+
let store_name = self.store_name.clone();
142+
let cache = self.cache.clone();
143+
let known_keys = self.known_keys.clone();
144+
145+
// The IDB future captures non-`Send` JS handles; wasm has a single thread so re-asserting
146+
// `Send` via `UnsafeSendFuture` is sound here.
139147
Box::pin(UnsafeSendFuture(async move {
140-
if let Some(resource) = self.cache.lock().unwrap().get(hash) {
148+
if let Some(resource) = cache.lock().unwrap().get(&hash) {
141149
return Some(resource.clone());
142150
}
143-
if !self.known_keys.lock().unwrap().contains(hash) {
151+
if !known_keys.lock().unwrap().contains(&hash) {
144152
return None;
145153
}
146154

147-
// Cache miss for a known key: fetch from IndexedDB and wait for it to land before returning.
148-
match fetch_one(&self.db, &self.store_name, *hash).await {
155+
// Cache miss for a known key: fetch from IndexedDB and wait for it to land.
156+
match fetch_one(&db, &store_name, hash).await {
149157
Ok(Some(payload)) => {
150158
let recomputed = ResourceHash::from(payload.as_slice());
151-
if recomputed != *hash {
159+
if recomputed != hash {
152160
log::warn!("IndexedDB entry's hash does not match its payload: {hash} vs {recomputed}");
153-
self.known_keys.lock().unwrap().remove(hash);
161+
known_keys.lock().unwrap().remove(&hash);
154162
None
155163
} else {
156164
let resource = Resource::new(Arc::<[u8]>::from(payload));
157-
self.cache.lock().unwrap().insert(*hash, resource.clone());
165+
cache.lock().unwrap().insert(hash, resource.clone());
158166
Some(resource)
159167
}
160168
}
161169
Ok(None) => {
162-
self.known_keys.lock().unwrap().remove(hash);
170+
known_keys.lock().unwrap().remove(&hash);
163171
None
164172
}
165173
Err(error) => {
@@ -172,24 +180,24 @@ impl Resources for IndexedDbResourceStorage {
172180
}
173181

174182
impl ResourceStorage for IndexedDbResourceStorage {
175-
fn read(&self, hash: &ResourceHash) -> Option<Resource> {
183+
fn read(&mut self, hash: &ResourceHash) -> Option<Resource> {
176184
// Cache-only lookup; sync callers (e.g. document export) take what's already hydrated.
177185
self.cache.lock().unwrap().get(hash).cloned()
178186
}
179187

180-
fn write(&self, data: &[u8]) -> ResourceHash {
188+
fn write(&mut self, data: &[u8]) -> ResourceHash {
181189
let hash = ResourceHash::from(data);
182190
self.cache.lock().unwrap().insert(hash, Resource::new(Arc::<[u8]>::from(data)));
183191
self.known_keys.lock().unwrap().insert(hash);
184192
self.enqueue_put(hash, data.to_vec());
185193
hash
186194
}
187195

188-
fn contains(&self, hash: &ResourceHash) -> bool {
196+
fn contains(&mut self, hash: &ResourceHash) -> bool {
189197
self.cache.lock().unwrap().contains_key(hash) || self.known_keys.lock().unwrap().contains(hash)
190198
}
191199

192-
fn garbage_collect(&self, used: &[ResourceHash]) {
200+
fn garbage_collect(&mut self, used: &[ResourceHash]) {
193201
let used_set: std::collections::HashSet<ResourceHash> = used.iter().cloned().collect();
194202

195203
let to_delete: Vec<ResourceHash> = {

node-graph/graph-craft/src/application_io/resource/mmap.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,28 +48,29 @@ impl MmapResourceStorage {
4848
}
4949

5050
impl Resources for MmapResourceStorage {
51-
fn load<'a>(&'a self, hash: &'a ResourceHash) -> ResourceFuture<'a> {
52-
let result = ResourceStorage::read(self, hash);
51+
fn load(&self, hash: ResourceHash) -> ResourceFuture {
52+
// Cache-only lookup; the on-demand mmap path lives in `ResourceStorage::read` and is reachable
53+
// only through the message handler's mut access.
54+
let result = self.cache.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).get(&hash).cloned();
5355
Box::pin(async move { result })
5456
}
5557
}
5658

5759
impl ResourceStorage for MmapResourceStorage {
58-
fn read(&self, hash: &ResourceHash) -> Option<Resource> {
59-
let mut cache = self.cache.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
60-
if let Some(resource) = cache.get(hash) {
60+
fn read(&mut self, hash: &ResourceHash) -> Option<Resource> {
61+
if let Some(resource) = self.cache.get_mut().unwrap_or_else(|poisoned| poisoned.into_inner()).get(hash) {
6162
return Some(resource.clone());
6263
}
6364

6465
let path = self.path_for(hash);
6566
let mmap = Self::open_mmap(&path)?;
6667
let resource = Resource::new(MmappedBytes(mmap));
6768

68-
cache.insert(*hash, resource.clone());
69+
self.cache.get_mut().unwrap_or_else(|poisoned| poisoned.into_inner()).insert(*hash, resource.clone());
6970
Some(resource)
7071
}
7172

72-
fn write(&self, data: &[u8]) -> ResourceHash {
73+
fn write(&mut self, data: &[u8]) -> ResourceHash {
7374
let hash = ResourceHash::from(data);
7475
let path = self.path_for(&hash);
7576

@@ -108,13 +109,13 @@ impl ResourceStorage for MmapResourceStorage {
108109
hash
109110
}
110111

111-
fn contains(&self, hash: &ResourceHash) -> bool {
112-
self.cache.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).contains_key(hash) || self.path_for(hash).exists()
112+
fn contains(&mut self, hash: &ResourceHash) -> bool {
113+
self.cache.get_mut().unwrap_or_else(|poisoned| poisoned.into_inner()).contains_key(hash) || self.path_for(hash).exists()
113114
}
114115

115-
fn garbage_collect(&self, used: &[ResourceHash]) {
116+
fn garbage_collect(&mut self, used: &[ResourceHash]) {
116117
let used_set: std::collections::HashSet<ResourceHash> = used.iter().cloned().collect();
117-
self.cache.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).retain(|hash, _| used_set.contains(hash));
118+
self.cache.get_mut().unwrap_or_else(|poisoned| poisoned.into_inner()).retain(|hash, _| used_set.contains(hash));
118119

119120
let Ok(top_entries) = fs::read_dir(&self.root) else { return };
120121
for top_entry in top_entries.flatten() {

node-graph/libraries/application-io/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub trait ApplicationIo {
4848
fn gpu_executor(&self) -> Option<&Self::Executor> {
4949
None
5050
}
51-
fn load_resource<'a>(&'a self, hash: &'a ResourceHash) -> resource::ResourceFuture<'a>;
51+
fn load_resource(&self, hash: ResourceHash) -> resource::ResourceFuture;
5252
}
5353

5454
impl<T: ApplicationIo> ApplicationIo for &T {
@@ -58,7 +58,7 @@ impl<T: ApplicationIo> ApplicationIo for &T {
5858
(**self).gpu_executor()
5959
}
6060

61-
fn load_resource<'a>(&'a self, hash: &'a ResourceHash) -> resource::ResourceFuture<'a> {
61+
fn load_resource(&self, hash: ResourceHash) -> resource::ResourceFuture {
6262
(**self).load_resource(hash)
6363
}
6464
}

node-graph/libraries/application-io/src/resource.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,19 @@ unsafe impl StaticType for Resource {
165165
type Static = Resource;
166166
}
167167

168-
pub type ResourceFuture<'a> = Pin<Box<dyn Future<Output = Option<Resource>> + Send + 'a>>;
168+
/// Owned (`'static`) future returned by [`Resources::load`]. Ownership is what allows the read-side
169+
/// handle to drop its `RwLock` read guard before awaiting — backends must not borrow their storage
170+
/// across awaits, instead cloning out any internal `Arc`s they need.
171+
pub type ResourceFuture = Pin<Box<dyn Future<Output = Option<Resource>> + Send + 'static>>;
169172

170173
pub trait Resources: Send + Sync {
171-
fn load<'a>(&'a self, hash: &'a ResourceHash) -> ResourceFuture<'a>;
174+
fn load(&self, hash: ResourceHash) -> ResourceFuture;
172175
}
173176

174177
pub trait ResourceStorage: Resources {
178+
// Methods take `&mut self` so only the unique mutable owner (the resource message handler,
179+
// guarded behind the static `RwLock`'s write lock) can touch the write surface; the `Resources`
180+
// supertrait remains accessible through a shared `&self` borrow for any number of read handles.
175181
fn read(&mut self, hash: &ResourceHash) -> Option<Resource>;
176182
fn write(&mut self, data: &[u8]) -> ResourceHash;
177183
fn contains(&mut self, hash: &ResourceHash) -> bool;

node-graph/nodes/gstd/src/platform_application_io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ where
264264
#[node_macro::node(category(""))]
265265
pub async fn resource<'a: 'n>(_: impl Ctx, _primary: (), #[scope("editor-api")] editor_api: &'a PlatformEditorApi, hash: ResourceHash) -> Resource {
266266
let application_io = editor_api.application_io.as_ref().expect("ApplicationIo must be available when using resources");
267-
application_io.load_resource(&hash).await.unwrap_or_else(|| {
267+
application_io.load_resource(hash).await.unwrap_or_else(|| {
268268
panic!("Resource {hash} not found");
269269
})
270270
}

0 commit comments

Comments
 (0)