Skip to content

Commit ca02249

Browse files
committed
[WARP] Demote locking surrounding container function fetching
By demoting the containers lock to read only for fetching we can prevent blocking the main ui thread while waiting for the network requests to finish
1 parent f94dfb5 commit ca02249

File tree

3 files changed

+108
-48
lines changed

3 files changed

+108
-48
lines changed

plugins/warp/src/container.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub enum ContainerError {
3939
SearchFailed(String),
4040
#[error("failed to commit source '{0}': {1}")]
4141
CommitFailed(SourceId, String),
42+
#[error("container error encountered: {0}")]
43+
Custom(String),
4244
}
4345

4446
/// Represents the ID for a single container source.
@@ -224,6 +226,7 @@ pub trait Container: Send + Sync + Display + Debug {
224226
/// to verify the permissions of the source.
225227
fn add_source(&mut self, path: SourcePath) -> ContainerResult<SourceId>;
226228

229+
// TODO: Make interior mutable.
227230
/// Flush changes made to a source.
228231
///
229232
/// Because writing to a source can require file or network operations, we let the container
@@ -293,7 +296,7 @@ pub trait Container: Send + Sync + Display + Debug {
293296
/// will do nothing. This function is blocking, so assume it will take a few seconds for a container
294297
/// that intends to fetch over the network.
295298
fn fetch_functions(
296-
&mut self,
299+
&self,
297300
_target: &Target,
298301
_tags: &[SourceTag],
299302
_functions: &[FunctionGUID],

plugins/warp/src/container/network.rs

Lines changed: 102 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ use crate::container::{
33
Container, ContainerError, ContainerResult, ContainerSearchQuery, ContainerSearchResponse,
44
SourceId, SourcePath, SourceTag,
55
};
6+
use dashmap::DashMap;
67
use directories::ProjectDirs;
78
use std::collections::{HashMap, HashSet};
89
use std::fmt::{Debug, Display, Formatter};
910
use std::path::PathBuf;
11+
use std::sync::RwLock;
1012
use warp::chunk::{Chunk, ChunkKind, CompressionType};
1113
use warp::r#type::chunk::TypeChunk;
1214
use warp::r#type::guid::TypeGUID;
@@ -28,15 +30,21 @@ pub struct NetworkContainer {
2830
client: NetworkClient,
2931
/// This is the store that the interface will write to; then we have special functions for pulling
3032
/// and pushing to the network source.
31-
cache: DiskContainer,
33+
cache: RwLock<DiskContainer>,
3234
/// Where to place newly created sources.
3335
///
3436
/// This is typically a directory inside [`NetworkContainer::root_cache_location`].
3537
cache_path: PathBuf,
3638
/// Populated when targets are queried.
37-
known_targets: HashMap<Target, Option<NetworkTargetId>>,
39+
///
40+
/// NOTE: This is a [`DashMap`] purely for the sake of interior mutability as we do not wish to hold
41+
/// a write lock on the entire container while performing network operations.
42+
known_targets: DashMap<Target, Option<NetworkTargetId>>,
3843
/// Populated when function sources are queried.
39-
known_function_sources: HashMap<FunctionGUID, Vec<SourceId>>,
44+
///
45+
/// NOTE: This is a [`DashMap`] purely for the sake of interior mutability as we do not wish to hold
46+
/// a write lock on the entire container while performing network operations.
47+
known_function_sources: DashMap<FunctionGUID, Vec<SourceId>>,
4048
/// Populated when user adds function, this is used for writing back to the server.
4149
added_chunks: HashMap<SourceId, Vec<Chunk<'static>>>,
4250
/// Populated when connecting to the server, this is used to determine which sources are writable.
@@ -47,12 +55,12 @@ pub struct NetworkContainer {
4755

4856
impl NetworkContainer {
4957
pub fn new(client: NetworkClient, cache_path: PathBuf, writable_sources: &[SourceId]) -> Self {
50-
let mut container = Self {
51-
cache: DiskContainer::new_from_dir(cache_path.clone()),
58+
let container = Self {
59+
cache: RwLock::new(DiskContainer::new_from_dir(cache_path.clone())),
5260
cache_path,
5361
client,
54-
known_targets: HashMap::new(),
55-
known_function_sources: HashMap::new(),
62+
known_targets: DashMap::new(),
63+
known_function_sources: DashMap::new(),
5664
added_chunks: HashMap::new(),
5765
writable_sources: writable_sources.into_iter().copied().collect(),
5866
};
@@ -74,7 +82,7 @@ impl NetworkContainer {
7482
/// # Caching policy
7583
///
7684
/// The [`NetworkTargetId`] is unique and immutable, so they will be persisted indefinitely.
77-
pub fn get_target_id(&mut self, target: &Target) -> Option<NetworkTargetId> {
85+
pub fn get_target_id(&self, target: &Target) -> Option<NetworkTargetId> {
7886
// It's highly probable we have previously queried the target, check that first.
7987
if let Some(target_id) = self.known_targets.get(target) {
8088
return target_id.clone();
@@ -96,7 +104,7 @@ impl NetworkContainer {
96104
/// for now as the requests for functions come at the request of some user interaction. Any guid
97105
/// with no sources will still be cached.
98106
pub fn get_unseen_functions_source(
99-
&mut self,
107+
&self,
100108
target: Option<&Target>,
101109
tags: &[SourceTag],
102110
guids: &[FunctionGUID],
@@ -157,12 +165,7 @@ impl NetworkContainer {
157165
/// Every request we store the returned objects on disk, this means that users will first
158166
/// query against the disk objects, then the server. This also means we need to cache functions f
159167
/// or which we have not received any functions for, as otherwise we would keep trying to query it.
160-
pub fn pull_functions(
161-
&mut self,
162-
target: &Target,
163-
source: &SourceId,
164-
functions: &[FunctionGUID],
165-
) {
168+
pub fn pull_functions(&self, target: &Target, source: &SourceId, functions: &[FunctionGUID]) {
166169
let target_id = self.get_target_id(target);
167170
let file = match self
168171
.client
@@ -182,18 +185,25 @@ impl NetworkContainer {
182185
let functions: Vec<_> = sc.functions().collect();
183186
// Probe the source before attempting to access it, as it might not exist locally.
184187
self.probe_source(*source);
185-
match self.cache.add_functions(target, source, &functions) {
186-
Ok(_) => tracing::debug!(
187-
"Added {} functions into cached source '{}'",
188-
functions.len(),
189-
source
190-
),
191-
Err(err) => tracing::error!(
192-
"Failed to add {} function into cached source '{}': {}",
193-
functions.len(),
194-
source,
195-
err
196-
),
188+
189+
match self.cache.write() {
190+
Ok(mut cache) => match cache.add_functions(target, source, &functions) {
191+
Ok(_) => tracing::debug!(
192+
"Added {} functions into cached source '{}'",
193+
functions.len(),
194+
source
195+
),
196+
Err(err) => tracing::error!(
197+
"Failed to add {} function into cached source '{}': {}",
198+
functions.len(),
199+
source,
200+
err
201+
),
202+
},
203+
Err(err) => {
204+
tracing::error!("Failed to write to cache: {}", err);
205+
return;
206+
}
197207
}
198208
}
199209
// TODO; Probably want to pull type in with this.
@@ -214,8 +224,13 @@ impl NetworkContainer {
214224
/// Probe the source to make sure it exists in the cache. Retrieving the name from the server.
215225
///
216226
/// **This is blocking**
217-
pub fn probe_source(&mut self, source_id: SourceId) {
218-
if !self.cache.source_path(&source_id).is_ok() {
227+
pub fn probe_source(&self, source_id: SourceId) {
228+
let Ok(mut cache) = self.cache.write() else {
229+
tracing::error!("Cannot probe source '{}', cache is poisoned", source_id);
230+
return;
231+
};
232+
233+
if !cache.source_path(&source_id).is_ok() {
219234
// Add the source to the cache. Using the source id and source name as the source path.
220235
match self.client.source_name(source_id) {
221236
Ok(source_name) => {
@@ -224,7 +239,7 @@ impl NetworkContainer {
224239
.cache_path
225240
.join(source_id.to_string())
226241
.join(source_name);
227-
let _ = self.cache.insert_source(source_id, SourcePath(source_path));
242+
let _ = cache.insert_source(source_id, SourcePath(source_path));
228243
}
229244
Err(e) => {
230245
tracing::error!("Failed to probe source '{}': {}", source_id, e);
@@ -251,7 +266,10 @@ impl NetworkContainer {
251266

252267
impl Container for NetworkContainer {
253268
fn sources(&self) -> ContainerResult<Vec<SourceId>> {
254-
self.cache.sources()
269+
self.cache
270+
.read()
271+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
272+
.sources()
255273
}
256274

257275
fn add_source(&mut self, path: SourcePath) -> ContainerResult<SourceId> {
@@ -295,11 +313,17 @@ impl Container for NetworkContainer {
295313
}
296314

297315
fn source_tags(&self, source: &SourceId) -> ContainerResult<HashSet<SourceTag>> {
298-
self.cache.source_tags(source)
316+
self.cache
317+
.read()
318+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
319+
.source_tags(source)
299320
}
300321

301322
fn source_path(&self, source: &SourceId) -> ContainerResult<SourcePath> {
302-
self.cache.source_path(source)
323+
self.cache
324+
.read()
325+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
326+
.source_path(source)
303327
}
304328

305329
fn add_computed_types(
@@ -310,7 +334,10 @@ impl Container for NetworkContainer {
310334
// NOTE: We must `add_computed_types` to the cache before we add the chunk, as `added_chunks` is
311335
// not consulted when retrieving types from the cache, if we fail to add the types to
312336
// the cache, we will not see them show up in the UI or when matching.
313-
self.cache.add_computed_types(source, types)?;
337+
self.cache
338+
.write()
339+
.map_err(|e| ContainerError::Custom(format!("Cache write error: {}", e)))?
340+
.add_computed_types(source, types)?;
314341
let type_chunk = TypeChunk::new_with_computed(types).ok_or(
315342
ContainerError::CorruptedData("signature chunk failed to validate"),
316343
)?;
@@ -320,7 +347,10 @@ impl Container for NetworkContainer {
320347
}
321348

322349
fn remove_types(&mut self, source: &SourceId, guids: &[TypeGUID]) -> ContainerResult<()> {
323-
self.cache.remove_types(source, guids)
350+
self.cache
351+
.write()
352+
.map_err(|e| ContainerError::Custom(format!("Cache write error: {}", e)))?
353+
.remove_types(source, guids)
324354
}
325355

326356
fn add_functions(
@@ -332,7 +362,10 @@ impl Container for NetworkContainer {
332362
// NOTE: We must `add_functions` to the cache before we add the chunk, as `added_chunks` is
333363
// not consulted when retrieving functions from the cache, if we fail to add the functions to
334364
// the cache, we will not see them show up in the UI or when matching.
335-
self.cache.add_functions(target, source, functions)?;
365+
self.cache
366+
.write()
367+
.map_err(|e| ContainerError::Custom(format!("Cache write error: {}", e)))?
368+
.add_functions(target, source, functions)?;
336369
let signature_chunk = SignatureChunk::new(functions).ok_or(
337370
ContainerError::CorruptedData("signature chunk failed to validate"),
338371
)?;
@@ -352,11 +385,14 @@ impl Container for NetworkContainer {
352385
functions: &[Function],
353386
) -> ContainerResult<()> {
354387
// TODO: Wont persist, need to add remote removal.
355-
self.cache.remove_functions(target, source, functions)
388+
self.cache
389+
.write()
390+
.map_err(|e| ContainerError::Custom(format!("Cache write error: {}", e)))?
391+
.remove_functions(target, source, functions)
356392
}
357393

358394
fn fetch_functions(
359-
&mut self,
395+
&self,
360396
target: &Target,
361397
tags: &[SourceTag],
362398
functions: &[FunctionGUID],
@@ -376,42 +412,60 @@ impl Container for NetworkContainer {
376412
}
377413

378414
fn sources_with_type_guid(&self, guid: &TypeGUID) -> ContainerResult<Vec<SourceId>> {
379-
self.cache.sources_with_type_guid(guid)
415+
self.cache
416+
.read()
417+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
418+
.sources_with_type_guid(guid)
380419
}
381420

382421
fn sources_with_type_guids(
383422
&self,
384423
guids: &[TypeGUID],
385424
) -> ContainerResult<HashMap<TypeGUID, Vec<SourceId>>> {
386-
self.cache.sources_with_type_guids(guids)
425+
self.cache
426+
.read()
427+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
428+
.sources_with_type_guids(guids)
387429
}
388430

389431
fn type_guids_with_name(
390432
&self,
391433
source: &SourceId,
392434
name: &str,
393435
) -> ContainerResult<Vec<TypeGUID>> {
394-
self.cache.type_guids_with_name(source, name)
436+
self.cache
437+
.read()
438+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
439+
.type_guids_with_name(source, name)
395440
}
396441

397442
fn type_with_guid(&self, source: &SourceId, guid: &TypeGUID) -> ContainerResult<Option<Type>> {
398-
self.cache.type_with_guid(source, guid)
443+
self.cache
444+
.read()
445+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
446+
.type_with_guid(source, guid)
399447
}
400448

401449
fn sources_with_function_guid(
402450
&self,
403451
target: &Target,
404452
guid: &FunctionGUID,
405453
) -> ContainerResult<Vec<SourceId>> {
406-
self.cache.sources_with_function_guid(target, guid)
454+
self.cache
455+
.read()
456+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
457+
.sources_with_function_guid(target, guid)
407458
}
408459

409460
fn sources_with_function_guids(
410461
&self,
411462
target: &Target,
412463
guids: &[FunctionGUID],
413464
) -> ContainerResult<HashMap<FunctionGUID, Vec<SourceId>>> {
414-
self.cache.sources_with_function_guids(target, guids)
465+
self.cache
466+
.read()
467+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
468+
.sources_with_function_guids(target, guids)
415469
}
416470

417471
fn functions_with_guid(
@@ -420,7 +474,10 @@ impl Container for NetworkContainer {
420474
source: &SourceId,
421475
guid: &FunctionGUID,
422476
) -> ContainerResult<Vec<Function>> {
423-
self.cache.functions_with_guid(target, source, guid)
477+
self.cache
478+
.read()
479+
.map_err(|e| ContainerError::Custom(format!("Cache read error: {}", e)))?
480+
.functions_with_guid(target, source, guid)
424481
}
425482

426483
fn search(&self, query: &ContainerSearchQuery) -> ContainerResult<ContainerSearchResponse> {

plugins/warp/src/plugin/ffi/container.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ pub unsafe extern "C" fn BNWARPContainerFetchFunctions(
220220
count: usize,
221221
) {
222222
let arc_container = ManuallyDrop::new(Arc::from_raw(container));
223-
let Ok(mut container) = arc_container.write() else {
223+
let Ok(container) = arc_container.read() else {
224224
return;
225225
};
226226

@@ -246,7 +246,7 @@ pub unsafe extern "C" fn BNWARPContainerGetSources(
246246
count: *mut usize,
247247
) -> *mut BNWARPSource {
248248
let arc_container = ManuallyDrop::new(Arc::from_raw(container));
249-
let Ok(container) = arc_container.write() else {
249+
let Ok(container) = arc_container.read() else {
250250
return std::ptr::null_mut();
251251
};
252252

0 commit comments

Comments
 (0)