Skip to content

Commit 084979d

Browse files
committed
perf(dl,core): fix mutex contention in parallel downloads and database
1 parent 08d7c18 commit 084979d

3 files changed

Lines changed: 48 additions & 54 deletions

File tree

crates/soar-cli/src/progress.rs

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -293,36 +293,34 @@ pub fn spawn_event_handler(receiver: Receiver<SoarEvent>) -> ProgressGuard {
293293
pkg_name,
294294
pkg_id,
295295
stage,
296-
} => {
297-
if stage != InstallStage::Complete {
298-
let msg = match &stage {
299-
InstallStage::Extracting => {
300-
format!("{pkg_name}#{pkg_id}: extracting")
301-
}
302-
InstallStage::ExtractingNested => {
303-
format!("{pkg_name}#{pkg_id}: extracting nested")
304-
}
305-
InstallStage::LinkingBinaries => {
306-
format!("{pkg_name}#{pkg_id}: linking binaries")
307-
}
308-
InstallStage::DesktopIntegration => {
309-
format!("{pkg_name}#{pkg_id}: desktop integration")
310-
}
311-
InstallStage::SetupPortable => {
312-
format!("{pkg_name}#{pkg_id}: setting up portable")
313-
}
314-
InstallStage::RecordingDatabase => {
315-
format!("{pkg_name}#{pkg_id}: recording to db")
316-
}
317-
InstallStage::RunningHook(hook) => {
318-
format!("{pkg_name}#{pkg_id}: running {hook}")
319-
}
320-
InstallStage::Complete => unreachable!(),
321-
};
322-
let pb = jobs.entry(op_id).or_insert_with(|| create_op_spinner(&msg));
323-
pb.set_style(spinner_style());
324-
pb.set_message(msg);
325-
}
296+
} if stage != InstallStage::Complete => {
297+
let msg = match &stage {
298+
InstallStage::Extracting => {
299+
format!("{pkg_name}#{pkg_id}: extracting")
300+
}
301+
InstallStage::ExtractingNested => {
302+
format!("{pkg_name}#{pkg_id}: extracting nested")
303+
}
304+
InstallStage::LinkingBinaries => {
305+
format!("{pkg_name}#{pkg_id}: linking binaries")
306+
}
307+
InstallStage::DesktopIntegration => {
308+
format!("{pkg_name}#{pkg_id}: desktop integration")
309+
}
310+
InstallStage::SetupPortable => {
311+
format!("{pkg_name}#{pkg_id}: setting up portable")
312+
}
313+
InstallStage::RecordingDatabase => {
314+
format!("{pkg_name}#{pkg_id}: recording to db")
315+
}
316+
InstallStage::RunningHook(hook) => {
317+
format!("{pkg_name}#{pkg_id}: running {hook}")
318+
}
319+
InstallStage::Complete => unreachable!(),
320+
};
321+
let pb = jobs.entry(op_id).or_insert_with(|| create_op_spinner(&msg));
322+
pb.set_style(spinner_style());
323+
pb.set_message(msg);
326324
}
327325

328326
// ── Removal stages ─────────────────────────────────────

crates/soar-core/src/database/connection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ impl DieselDatabase {
6060
/// Gets a mutable reference to the underlying connection.
6161
/// Locks the mutex and returns a guard.
6262
pub fn conn(&self) -> Result<std::sync::MutexGuard<'_, DbConnection>> {
63-
self.conn.lock().map_err(|_| SoarError::PoisonError)
63+
Ok(self.conn.lock().unwrap_or_else(|e| e.into_inner()))
6464
}
6565

6666
/// Executes a function with the connection.
6767
pub fn with_conn<F, T>(&self, f: F) -> Result<T>
6868
where
6969
F: FnOnce(&mut diesel::SqliteConnection) -> diesel::QueryResult<T>,
7070
{
71-
let mut conn = self.conn.lock().map_err(|_| SoarError::PoisonError)?;
71+
let mut conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
7272
f(conn.conn()).map_err(|e| SoarError::Custom(format!("database error: {}", e)))
7373
}
7474

@@ -77,7 +77,7 @@ impl DieselDatabase {
7777
where
7878
F: FnOnce(&mut diesel::SqliteConnection) -> diesel::QueryResult<T>,
7979
{
80-
let mut conn = self.conn.lock().map_err(|_| SoarError::PoisonError)?;
80+
let mut conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
8181
conn.conn()
8282
.transaction(f)
8383
.map_err(|e| SoarError::Custom(format!("transaction error: {}", e)))

crates/soar-dl/src/oci.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::{
33
io::{Read as _, Seek as _, SeekFrom, Write as _},
44
os::unix::fs::PermissionsExt as _,
55
path::{Path, PathBuf},
6-
sync::{Arc, Mutex},
6+
sync::{
7+
atomic::{AtomicU64, Ordering},
8+
Arc, Mutex,
9+
},
710
thread,
811
};
912

@@ -532,7 +535,7 @@ impl OciDownload {
532535
layers: &[&OciLayer],
533536
output_dir: &Path,
534537
) -> Result<Vec<PathBuf>, DownloadError> {
535-
let downloaded = Arc::new(Mutex::new(0u64));
538+
let downloaded = Arc::new(AtomicU64::new(0));
536539
let paths = Arc::new(Mutex::new(Vec::new()));
537540
let errors = Arc::new(Mutex::new(Vec::new()));
538541

@@ -568,16 +571,14 @@ impl OciDownload {
568571
if path.is_file() {
569572
if let Ok(metadata) = path.metadata() {
570573
if metadata.len() == layer.size {
571-
{
572-
let mut shared = downloaded.lock().unwrap();
573-
*shared += layer.size;
574-
let current = *shared;
575-
if let Some(ref cb) = on_progress {
576-
cb(Progress::Chunk {
577-
current,
578-
total: total_size,
579-
});
580-
}
574+
let current = downloaded
575+
.fetch_add(layer.size, Ordering::Relaxed)
576+
+ layer.size;
577+
if let Some(ref cb) = on_progress {
578+
cb(Progress::Chunk {
579+
current,
580+
total: total_size,
581+
});
581582
}
582583
paths.lock().unwrap().push(path);
583584
continue;
@@ -795,7 +796,7 @@ impl OciDownload {
795796
path,
796797
downloaded,
797798
self.on_progress.as_ref(),
798-
&Arc::new(Mutex::new(0u64)),
799+
&Arc::new(AtomicU64::new(0)),
799800
total_size,
800801
)
801802
}
@@ -819,7 +820,7 @@ fn download_layer_impl(
819820
path: &Path,
820821
local_downloaded: &mut u64,
821822
on_progress: Option<&Arc<dyn Fn(Progress) + Send + Sync>>,
822-
shared_downloaded: &Arc<Mutex<u64>>,
823+
shared_downloaded: &Arc<AtomicU64>,
823824
total_size: u64,
824825
) -> Result<(), DownloadError> {
825826
let url = format!(
@@ -901,8 +902,7 @@ fn download_layer_impl(
901902

902903
// Add resume offset to shared counter so progress shows correct cumulative total
903904
if is_resuming {
904-
let mut shared = shared_downloaded.lock().unwrap();
905-
*shared += resume_offset;
905+
shared_downloaded.fetch_add(resume_offset, Ordering::Relaxed);
906906
}
907907

908908
loop {
@@ -914,11 +914,7 @@ fn download_layer_impl(
914914
file.write_all(&buffer[..n])?;
915915
*local_downloaded += n as u64;
916916

917-
let current_total = {
918-
let mut shared = shared_downloaded.lock().unwrap();
919-
*shared += n as u64;
920-
*shared
921-
};
917+
let current_total = { shared_downloaded.fetch_add(n as u64, Ordering::Relaxed) + n as u64 };
922918

923919
let checkpoint = *local_downloaded / (1024 * 1024);
924920
if checkpoint > last_checkpoint {

0 commit comments

Comments
 (0)