Skip to content

Commit 18d02f0

Browse files
Fix memory leak (#59)
1 parent 663c948 commit 18d02f0

1 file changed

Lines changed: 46 additions & 13 deletions

File tree

src/worker/script.rs

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
io::prelude::*,
99
net::{Shutdown, TcpStream},
1010
process::Command,
11+
sync::Arc,
1112
thread, time,
1213
};
1314

@@ -22,6 +23,7 @@ use llvm::execution_engine::*;
2223
use llvm::target::*;
2324
use llvm_sys::LLVMType;
2425
use llvm_sys::prelude::*;
26+
use std::cell::RefCell;
2527
use std::ffi::{CStr, CString, c_void};
2628
use std::mem;
2729

@@ -129,6 +131,11 @@ pub unsafe extern "C" fn task(name: *const i8, random: bool) -> u64 {
129131
0
130132
}
131133

134+
thread_local! {
135+
static LAST_RANDOM_PATH: RefCell<CString> =
136+
RefCell::new(CString::new("").unwrap());
137+
}
138+
132139
/// Return a randomly generated path.
133140
///
134141
/// # Safety
@@ -144,7 +151,11 @@ pub unsafe extern "C" fn random_path(base: *const i8) -> *const i8 {
144151
.map(char::from)
145152
.collect();
146153

147-
CString::new(format!("{base}/{uniq}")).unwrap().into_raw()
154+
LAST_RANDOM_PATH.with(|last| {
155+
let mut last = last.borrow_mut();
156+
*last = CString::new(format!("{base}/{uniq}")).unwrap();
157+
last.as_ptr()
158+
})
148159
}
149160

150161
#[derive(Debug, Clone)]
@@ -506,21 +517,43 @@ impl Worker for ScriptWorker {
506517
debug!("Distribution {:?}", d);
507518
let Dist::Exp { rate } = d else { todo!() };
508519

509-
loop {
510-
let worker = self.clone();
511-
thread::spawn(move || {
512-
(worker.jit)();
513-
});
514-
515-
let interval: f64 =
516-
thread_rng().sample(Exp::new(*rate).unwrap());
517-
debug!("Interval {}", interval);
518-
thread::sleep(time::Duration::from_secs_f64(interval));
519-
}
520+
const MAX_CONCURRENT: usize = 16;
521+
let semaphore = Arc::new((
522+
std::sync::Mutex::new(0usize),
523+
std::sync::Condvar::new(),
524+
));
525+
526+
thread::scope(|s| {
527+
loop {
528+
{
529+
let (lock, cvar) = &*semaphore;
530+
let mut count = cvar
531+
.wait_while(lock.lock().unwrap(), |c| {
532+
*c >= MAX_CONCURRENT
533+
})
534+
.unwrap();
535+
*count += 1;
536+
}
537+
538+
let worker = self.clone();
539+
let sem = Arc::clone(&semaphore);
540+
s.spawn(move || {
541+
(worker.jit)();
542+
let (lock, cvar) = &*sem;
543+
*lock.lock().unwrap() -= 1;
544+
cvar.notify_one();
545+
});
546+
547+
let interval: f64 =
548+
thread_rng().sample(Exp::new(*rate).unwrap());
549+
debug!("Interval {}", interval);
550+
thread::sleep(time::Duration::from_secs_f64(interval));
551+
}
552+
});
520553
}
521554
None => {
522555
debug!("Single unit");
523-
(self.jit)()
556+
(self.jit)();
524557
}
525558
};
526559

0 commit comments

Comments
 (0)