Skip to content

Commit 98116ba

Browse files
authored
Merge pull request #36 from WilleBerg/enq_deq_pairs
Enq deq pairs
2 parents 5b522d1 + 8028794 commit 98116ba

5 files changed

Lines changed: 195 additions & 4 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ src/cpp-ring-queues-research/include/AdditionalWork.hpp
2525
*.edges
2626
*.pdf
2727
misc/*
28+
*.pdf

readme.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ You have to choose which type of benchmark you want to run on your queue. They h
4848
* `prod-con` - Measures throughput and fairness. Threads are either producers or consumers. You can choose the amount of producers and consumers using their respective flags.
4949
* `enq-deq` - Measures throughput and fairness. Threads alternate between enqueueing and dequeueing randomly. You can choose the spread of enqueuers/dequeuers using the `--spread` flag. Using the `--thread-count` flag you can decide how many threads you want to use for the benchmark.
5050
* `bfs` - Performs a parallell breadth first search on a graph of your choosing. Measures the amount of milliseconds it takes to perform the BFS. After performing the parallell BFS, the benchmark will also do it sequentially and then verify the parallell solution using the sequential solution. This can be turned off by passing the `--no-verify` flag. Choose graph file by passing the `--graph-file` flag and specifying the path. The benchmark supports `.mtx` files, but any files that follow the same structure will work as well. You can run several iterations of BFS by passing the `-i` flag, just as in the other benchmarks. The graph file will still only be loaded once, and the sequential solution will also only be generated once.
51+
* `enq-deq-pairs` - Measures throughput and fairness. Threads first enqueue an item, then immediately dequeues an item. Use `--thread-count` to change the amount of threads.
5152
## Queue implementations and features
5253
Implemented queues are:
5354
* `array_queue` - A queue from the crate [`crossbeam`](https://crates.io/crates/crossbeam).
@@ -103,6 +104,8 @@ To use specific values you can add different flags to the run command:
103104
- `enq-deq` benchmark type sub commands:
104105
* `--spread` - To specify the spread for the `enq-deq` benchmark type.
105106
* `--thread-count` - To specify the amount of threads in the `enq-deq` benchmark type.
107+
- `enq-deq-pairs` benchmark type sub commands:
108+
* `--thread-count` - To specify the amount of threads in the `enq-deq-pairs` benchmark type.
106109

107110
## Add your own queues
108111
To add your own queues to the framework, you first create a new file in `src/queues` for the source code. You then have to add the queue to the `src/queues.rs` file as a feature in the following way:

src/arguments.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub enum Benchmarks {
5151
/// A test where each thread performs both consume and produce based on a random floating point
5252
/// value. Spread is decided using the `--spread` flag.
5353
EnqDeq(EnqDeqArgs),
54+
EnqDeqPairs(EnqDeqPairsArgs),
5455
#[cfg(feature = "bfs")]
5556
BFS(BFSArgs),
5657
}
@@ -75,6 +76,12 @@ pub struct EnqDeqArgs {
7576
pub spread: f64,
7677
}
7778
#[derive(ClapArgs, Debug)]
79+
pub struct EnqDeqPairsArgs {
80+
/// Set the thread count for the pingpong benchmark.
81+
#[arg(long = "thread-count", default_value_t = 20)]
82+
pub thread_count: usize,
83+
}
84+
#[derive(ClapArgs, Debug)]
7885
pub struct BFSArgs {
7986
#[arg(short, long, default_value_t = 20)]
8087
pub thread_count: usize,
@@ -88,10 +95,11 @@ pub struct BFSArgs {
8895
impl Display for Benchmarks {
8996
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
9097
match self {
91-
Benchmarks::ProdCon(_) => write!(f, "ProdCon"),
92-
Benchmarks::EnqDeq(_) => write!(f, "EnqDeq"),
98+
Benchmarks::ProdCon(_) => write!(f, "ProdCon"),
99+
Benchmarks::EnqDeq(_) => write!(f, "EnqDeq"),
100+
Benchmarks::EnqDeqPairs(_) => write!(f, "EnqDeqPairs"),
93101
#[cfg(feature = "bfs")]
94-
Benchmarks::BFS(_) => write!(f, "BFS"),
102+
Benchmarks::BFS(_) => write!(f, "BFS"),
95103
}
96104
}
97105
}

src/benchmarks.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use sysinfo::System;
1313

1414
pub mod prod_con;
1515
pub mod enq_deq;
16+
pub mod enq_deq_pairs;
1617
#[cfg(feature = "bfs")]
1718
pub mod bfs;
1819

@@ -58,7 +59,8 @@ macro_rules! implement_benchmark {
5859
// Select which benchmark to use
5960
match $bench_conf.args.benchmark {
6061
Benchmarks::ProdCon(_) => $crate::benchmarks::prod_con::benchmark_prod_con(test_q, $bench_conf)?,
61-
Benchmarks::EnqDeq(_) => $crate::benchmarks::enq_deq::benchmark_enq_deq(test_q, $bench_conf)?,
62+
Benchmarks::EnqDeq(_) => $crate::benchmarks::enq_deq::benchmark_enq_deq(test_q, $bench_conf)?,
63+
Benchmarks::EnqDeqPairs(_) => $crate::benchmarks::enq_deq_pairs::benchmark_enq_deq_pairs(test_q, $bench_conf)?,
6264
#[cfg(feature = "bfs")]
6365
Benchmarks::BFS(_) => {
6466
$crate::benchmarks::bfs::benchmark_bfs(

src/benchmarks/enq_deq_pairs.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use core_affinity::CoreId;
2+
use log::{debug, error, info, trace};
3+
use rand::Rng;
4+
use std::sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Barrier};
5+
use crate::{traits::{ConcurrentQueue, Handle}, benchmarks::{calc_fairness, BenchConfig}};
6+
use std::fs::OpenOptions;
7+
use std::io::Write;
8+
use std::sync::{mpsc, Arc};
9+
10+
/// # Explanation:
11+
#[allow(dead_code)]
12+
pub fn benchmark_enq_deq_pairs<C, T> (cqueue: C, bench_conf: &BenchConfig) -> Result<(), std::io::Error>
13+
where
14+
C: ConcurrentQueue<T>,
15+
T: Default,
16+
for<'a> &'a C: Send
17+
{
18+
let args = match &bench_conf.args.benchmark {
19+
crate::arguments::Benchmarks::EnqDeqPairs(a) => a,
20+
_ => panic!(),
21+
};
22+
{
23+
debug!("Prefilling queue with {} items.", bench_conf.args.prefill_amount);
24+
let mut tmp_handle = cqueue.register();
25+
for _ in 0..bench_conf.args.prefill_amount {
26+
let _ = tmp_handle.push(Default::default());
27+
}
28+
}
29+
let thread_count = args.thread_count;
30+
let time_limit: u64 = bench_conf.args.time_limit;
31+
let barrier = Barrier::new(thread_count + 1);
32+
let pops = AtomicUsize::new(0);
33+
let pushes = AtomicUsize::new(0);
34+
let done = AtomicBool::new(false);
35+
let (tx, rx) = mpsc::channel();
36+
info!("Starting pingpong benchmark with {} threads", thread_count);
37+
38+
39+
40+
// Get cores for fairness of threads
41+
let available_cores: Vec<CoreId> =
42+
core_affinity::get_core_ids().unwrap_or(vec![CoreId { id: 0 }]);
43+
let mut core_iter = available_cores.into_iter().cycle();
44+
45+
// Shared atomic bool for when a thread fails
46+
let thread_failed = Arc::new(AtomicBool::new(false));
47+
48+
49+
let _ = std::thread::scope(|s| -> Result<(), std::io::Error>{
50+
let queue = &cqueue;
51+
let thread_failed = &thread_failed; // Every thread clones the thread_failed bool
52+
let pushes = &pushes;
53+
let pops = &pops;
54+
let done = &done;
55+
let barrier = &barrier;
56+
let &thread_count = &thread_count;
57+
let is_one_socket = &bench_conf.args.one_socket;
58+
let tx = &tx;
59+
for _i in 0..thread_count{
60+
let mut core : CoreId = core_iter.next().unwrap();
61+
// if is_one_socket is true, make all thread ids even
62+
// (this was used for our testing enviroment to get one socket)
63+
if *is_one_socket {
64+
core = core_iter.next().unwrap();
65+
}
66+
// println!("{:?}", core);
67+
s.spawn(move || {
68+
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
69+
core_affinity::set_for_current(core);
70+
let mut handle = queue.register();
71+
let mut l_pushes = 0;
72+
let mut l_pops = 0;
73+
let _thread_failed = thread_failed.clone();
74+
barrier.wait();
75+
while !done.load(Ordering::Relaxed) {
76+
let _ = handle.push(T::default());
77+
l_pushes += 1;
78+
let _ = handle.pop();
79+
l_pops += 1;
80+
for _ in 0..bench_conf.args.delay {
81+
let _some_num = rand::rng().random::<f64>();
82+
}
83+
}
84+
pushes.fetch_add(l_pushes, Ordering::Relaxed);
85+
pops.fetch_add(l_pops, Ordering::Relaxed);
86+
tx.send(l_pops + l_pushes).unwrap();
87+
trace!("{}: Pushed: {}, Popped: {}", _i, l_pushes, l_pops);
88+
}));
89+
// A thread panicked, aborting the benchmark...
90+
if let Err(e) = result {
91+
error!("Thread {} panicked: {:?}. Aborting benchmark, padding results to zero", _i, e);
92+
thread_failed.store(true, Ordering::Relaxed);
93+
done.store(true, Ordering::Relaxed);
94+
}
95+
});
96+
97+
}
98+
barrier.wait();
99+
std::thread::sleep(std::time::Duration::from_secs(time_limit));
100+
done.store(true, Ordering::Relaxed);
101+
Ok(())
102+
});
103+
drop(tx);
104+
let pops = pops.into_inner();
105+
let pushes = pushes.into_inner();
106+
// Fairness
107+
let ops_per_thread = {
108+
let mut vals = vec![];
109+
for received in rx {
110+
vals.push(received);
111+
}
112+
vals
113+
};
114+
let fairness = calc_fairness(ops_per_thread);
115+
116+
// If a thread crashed, pad the results with zero-values
117+
let formatted = if thread_failed.load(Ordering::Relaxed) {
118+
format!("0,0,0,-1,-1,{},{},{},{},0,{},{}",
119+
thread_count,
120+
cqueue.get_id(),
121+
bench_conf.args.benchmark,
122+
bench_conf.benchmark_id,
123+
-1,
124+
bench_conf.args.queue_size
125+
)
126+
}
127+
else {
128+
format!("{},{},{},{},{},{},{},{},{},{},{},{}",
129+
(pushes + pops) as f64 / time_limit as f64,
130+
pushes,
131+
pops,
132+
-1,
133+
-1,
134+
thread_count,
135+
cqueue.get_id(),
136+
bench_conf.args.benchmark,
137+
bench_conf.benchmark_id,
138+
fairness,
139+
-1,
140+
bench_conf.args.queue_size)
141+
};
142+
// Write to file or stdout depending on flag
143+
if !bench_conf.args.write_to_stdout {
144+
let mut file = OpenOptions::new()
145+
.append(true)
146+
.create(true)
147+
.open(&bench_conf.output_filename)?;
148+
writeln!(file, "{}", formatted)?;
149+
} else {
150+
println!("{}", formatted);
151+
}
152+
Ok(())
153+
}
154+
155+
mod tests {
156+
#[cfg(feature = "basic_queue")]
157+
#[test]
158+
fn test_enq_deq_pairs() {
159+
use crate::benchmarks::enq_deq_pairs::benchmark_enq_deq_pairs;
160+
use crate::benchmarks::*;
161+
use crate::arguments::*;
162+
use crate::queues::basic_queue::*;
163+
164+
let args = Args {
165+
benchmark: Benchmarks::EnqDeqPairs(EnqDeqPairsArgs { thread_count: 10 }),
166+
..Default::default()
167+
};
168+
let bench_conf = BenchConfig {
169+
args,
170+
date_time: "".to_string(),
171+
benchmark_id: "test2".to_string(),
172+
output_filename: "".to_string()
173+
};
174+
let q: BasicQueue<usize> = BasicQueue::new(0);
175+
assert!(benchmark_enq_deq_pairs(q, &bench_conf).is_ok());
176+
}
177+
}

0 commit comments

Comments
 (0)