Skip to content

Commit c7163d7

Browse files
committed
Add benchmarks for historical fixed, historical sliding, live injection, and storage write throughput
1 parent fa8d673 commit c7163d7

8 files changed

Lines changed: 416 additions & 35 deletions

File tree

.zed/settings.json

Lines changed: 0 additions & 10 deletions
This file was deleted.

Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,25 @@ path = "src/main.rs"
4242
name = "http_server"
4343
path = "src/bin/http_server.rs"
4444

45+
[dev-dependencies]
46+
criterion = { version = "0.5", features = ["html_reports"] }
47+
48+
[[bench]]
49+
name = "storage_write"
50+
harness = false
51+
52+
[[bench]]
53+
name = "historical_fixed"
54+
harness = false
55+
56+
[[bench]]
57+
name = "historical_sliding"
58+
harness = false
59+
60+
[[bench]]
61+
name = "live_injection"
62+
harness = false
63+
4564
[profile.release]
4665
opt-level = 3
4766
lto = true

benches/historical_fixed.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
2+
use janus::{
3+
execution::historical_executor::HistoricalExecutor,
4+
parsing::janusql_parser::{WindowDefinition, WindowType},
5+
querying::oxigraph_adapter::OxigraphAdapter,
6+
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
7+
};
8+
use std::sync::{
9+
atomic::{AtomicU64, Ordering},
10+
Arc,
11+
};
12+
use std::time::{SystemTime, UNIX_EPOCH};
13+
14+
static COUNTER: AtomicU64 = AtomicU64::new(0);
15+
16+
fn unique_config() -> StreamingConfig {
17+
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
18+
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
19+
StreamingConfig {
20+
segment_base_path: format!("/tmp/janus_bench_fixed_{}_{}", ts, id),
21+
max_batch_events: 1_000_000,
22+
max_batch_age_seconds: 3600,
23+
max_batch_bytes: 1_000_000_000,
24+
sparse_interval: 64,
25+
entries_per_index_block: 256,
26+
}
27+
}
28+
29+
/// Write N events at timestamps [1000, 1000+N) into a fresh storage.
30+
/// These land in the in-memory batch buffer — no flush needed before querying.
31+
fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
32+
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
33+
for i in 0..n as u64 {
34+
storage
35+
.write_rdf(
36+
1_000 + i,
37+
&format!("http://example.org/sensor{}", i % 5),
38+
"http://saref.etsi.org/core/hasValue",
39+
&format!("{}", 20 + (i % 10)),
40+
"http://example.org/graph",
41+
)
42+
.unwrap();
43+
}
44+
let window = WindowDefinition {
45+
window_name: "w".to_string(),
46+
stream_name: "http://example.org/stream".to_string(),
47+
width: n as u64,
48+
slide: n as u64,
49+
offset: None,
50+
start: Some(1_000),
51+
end: Some(1_000 + n as u64 - 1),
52+
window_type: WindowType::HistoricalFixed,
53+
};
54+
(Arc::new(storage), window)
55+
}
56+
57+
const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
58+
59+
fn historical_fixed(c: &mut Criterion) {
60+
let mut group = c.benchmark_group("historical/fixed_window");
61+
62+
for &n in &[100usize, 1_000, 10_000] {
63+
group.bench_with_input(BenchmarkId::new("events", n), &n, |b, &n| {
64+
b.iter_batched(
65+
|| setup(n),
66+
|(storage, window)| {
67+
let executor = HistoricalExecutor::new(storage, OxigraphAdapter::new());
68+
black_box(executor.execute_fixed_window(&window, SPARQL).unwrap())
69+
},
70+
criterion::BatchSize::SmallInput,
71+
);
72+
});
73+
}
74+
75+
group.finish();
76+
}
77+
78+
criterion_group!(benches, historical_fixed);
79+
criterion_main!(benches);

benches/historical_sliding.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
2+
use janus::{
3+
execution::historical_executor::HistoricalExecutor,
4+
parsing::janusql_parser::{WindowDefinition, WindowType},
5+
querying::oxigraph_adapter::OxigraphAdapter,
6+
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
7+
};
8+
use std::sync::{
9+
atomic::{AtomicU64, Ordering},
10+
Arc,
11+
};
12+
use std::time::{SystemTime, UNIX_EPOCH};
13+
14+
static COUNTER: AtomicU64 = AtomicU64::new(0);
15+
16+
fn unique_config() -> StreamingConfig {
17+
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
18+
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
19+
StreamingConfig {
20+
segment_base_path: format!("/tmp/janus_bench_sliding_{}_{}", ts, id),
21+
max_batch_events: 1_000_000,
22+
max_batch_age_seconds: 3600,
23+
max_batch_bytes: 1_000_000_000,
24+
sparse_interval: 64,
25+
entries_per_index_block: 256,
26+
}
27+
}
28+
29+
// Window config: OFFSET=10_000ms, RANGE=2_000ms, SLIDE=1_000ms
30+
// SlidingWindowIterator scans [now-10000, now] with 8 overlapping windows.
31+
// Data is written at [now-8000, now-2000] — solidly within the scan range.
32+
const OFFSET_MS: u64 = 10_000;
33+
const RANGE_MS: u64 = 2_000;
34+
const SLIDE_MS: u64 = 1_000;
35+
const DATA_START_BEFORE_NOW_MS: u64 = 8_000;
36+
const DATA_SPAN_MS: u64 = 6_000;
37+
38+
fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
39+
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
40+
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
41+
let n64 = n as u64;
42+
for i in 0..n64 {
43+
let ts = now - DATA_START_BEFORE_NOW_MS + i * DATA_SPAN_MS / n64.max(1);
44+
storage
45+
.write_rdf(
46+
ts,
47+
&format!("http://example.org/sensor{}", i % 5),
48+
"http://saref.etsi.org/core/hasValue",
49+
&format!("{}", 20 + (i % 10)),
50+
"http://example.org/graph",
51+
)
52+
.unwrap();
53+
}
54+
let window = WindowDefinition {
55+
window_name: "w".to_string(),
56+
stream_name: "http://example.org/stream".to_string(),
57+
width: RANGE_MS,
58+
slide: SLIDE_MS,
59+
offset: Some(OFFSET_MS),
60+
start: None,
61+
end: None,
62+
window_type: WindowType::HistoricalSliding,
63+
};
64+
(Arc::new(storage), window)
65+
}
66+
67+
const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
68+
69+
fn historical_sliding(c: &mut Criterion) {
70+
let mut group = c.benchmark_group("historical/sliding_window");
71+
72+
for &n in &[100usize, 1_000, 10_000] {
73+
group.bench_with_input(BenchmarkId::new("events", n), &n, |b, &n| {
74+
b.iter_batched(
75+
|| setup(n),
76+
|(storage, window)| {
77+
let executor = HistoricalExecutor::new(storage, OxigraphAdapter::new());
78+
// Collect all window results — the iterator is finite and exits naturally
79+
let results: Vec<_> =
80+
executor.execute_sliding_windows(&window, SPARQL).collect();
81+
black_box(results)
82+
},
83+
criterion::BatchSize::SmallInput,
84+
);
85+
});
86+
}
87+
88+
group.finish();
89+
}
90+
91+
criterion_group!(benches, historical_sliding);
92+
criterion_main!(benches);

benches/live_injection.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
2+
use janus::{core::RDFEvent, stream::live_stream_processing::LiveStreamProcessing};
3+
use std::time::Instant;
4+
5+
const STREAM_URI: &str = "http://example.org/stream1";
6+
7+
// RSP-QL query: 10s range, 1s step window over stream1
8+
const RSPQL: &str = r#"
9+
PREFIX ex: <http://example.org/>
10+
REGISTER RStream <output> AS
11+
SELECT ?s ?p ?o
12+
FROM NAMED WINDOW ex:w ON STREAM ex:stream1 [RANGE 10000 STEP 1000]
13+
WHERE {
14+
WINDOW ex:w { ?s ?p ?o }
15+
}
16+
"#;
17+
18+
fn make_event(timestamp_ms: u64, i: u64) -> RDFEvent {
19+
RDFEvent::new(
20+
timestamp_ms,
21+
&format!("http://example.org/sensor{}", i % 5),
22+
"http://saref.etsi.org/core/hasValue",
23+
&format!("{}", 20 + (i % 10)),
24+
"",
25+
)
26+
}
27+
28+
/// Wait for the first live result with a 10-second hard deadline.
29+
/// Panics with a clear message if nothing arrives — indicates the RSP engine
30+
/// is not emitting results for the injected events.
31+
fn wait_for_result(proc: &LiveStreamProcessing) -> rsp_rs::BindingWithTimestamp {
32+
let deadline = Instant::now() + std::time::Duration::from_secs(10);
33+
loop {
34+
if let Some(result) = proc.try_receive_result().unwrap() {
35+
return result;
36+
}
37+
assert!(
38+
Instant::now() < deadline,
39+
"live_injection: no result within 10s — RSP engine did not emit for injected events"
40+
);
41+
std::thread::yield_now();
42+
}
43+
}
44+
45+
fn live_injection(c: &mut Criterion) {
46+
let mut group = c.benchmark_group("live/event_injection");
47+
// Lower sample size: each iteration spawns an RSP engine thread
48+
group.sample_size(20);
49+
50+
for &n in &[1usize, 10, 100] {
51+
group.bench_with_input(BenchmarkId::new("events_per_window", n), &n, |b, &n| {
52+
b.iter_batched(
53+
|| {
54+
let mut proc = LiveStreamProcessing::new(RSPQL.to_string()).unwrap();
55+
proc.register_stream(STREAM_URI).unwrap();
56+
proc.start_processing().unwrap();
57+
proc
58+
},
59+
|proc| {
60+
// Spread N events evenly across [0, 9000] ms (inside the RANGE 10000 window)
61+
let n64 = n as u64;
62+
for i in 0..n64 {
63+
let ts = if n64 > 1 { i * 9_000 / (n64 - 1) } else { 0 };
64+
proc.add_event(STREAM_URI, make_event(ts, i)).unwrap();
65+
}
66+
// Sentinel at 20_000 ms closes all open windows
67+
proc.add_event(STREAM_URI, make_event(20_000, 999)).unwrap();
68+
// Block until first result arrives
69+
black_box(wait_for_result(&proc))
70+
},
71+
criterion::BatchSize::SmallInput,
72+
);
73+
});
74+
}
75+
76+
group.finish();
77+
}
78+
79+
criterion_group!(benches, live_injection);
80+
criterion_main!(benches);

benches/storage_write.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
2+
use janus::storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig};
3+
use std::sync::atomic::{AtomicU64, Ordering};
4+
use std::time::{SystemTime, UNIX_EPOCH};
5+
6+
static COUNTER: AtomicU64 = AtomicU64::new(0);
7+
8+
fn unique_config() -> StreamingConfig {
9+
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
10+
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
11+
StreamingConfig {
12+
segment_base_path: format!("/tmp/janus_bench_write_{}_{}", ts, id),
13+
// Large thresholds so no flush happens during measurement
14+
max_batch_events: 1_000_000,
15+
max_batch_age_seconds: 3600,
16+
max_batch_bytes: 1_000_000_000,
17+
sparse_interval: 64,
18+
entries_per_index_block: 256,
19+
}
20+
}
21+
22+
fn storage_write(c: &mut Criterion) {
23+
let mut group = c.benchmark_group("storage/write_throughput");
24+
25+
for &n in &[100usize, 1_000, 10_000, 100_000] {
26+
group.throughput(Throughput::Elements(n as u64));
27+
group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, &n| {
28+
b.iter_batched(
29+
|| StreamingSegmentedStorage::new(unique_config()).unwrap(),
30+
|storage| {
31+
for i in 0..n as u64 {
32+
storage
33+
.write_rdf(
34+
black_box(1_000 + i),
35+
&format!("http://example.org/sensor{}", i % 5),
36+
"http://saref.etsi.org/core/hasValue",
37+
&format!("{}", 20 + (i % 10)),
38+
"http://example.org/graph",
39+
)
40+
.unwrap();
41+
}
42+
},
43+
criterion::BatchSize::SmallInput,
44+
);
45+
});
46+
}
47+
48+
group.finish();
49+
}
50+
51+
criterion_group!(benches, storage_write);
52+
criterion_main!(benches);

0 commit comments

Comments
 (0)