Skip to content

Commit d0f3978

Browse files
Merge pull request #20 from SolidLabResearch/benchmark-paper-hybrid-scaling
Benchmark paper hybrid scaling
2 parents e9db2a6 + 6f7f9b6 commit d0f3978

121 files changed

Lines changed: 936015 additions & 309 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ janus-dashboard/.vscode/
3838
debug_*.py
3939
tests/reproduction_test.rs
4040
tests/user_query_repro.rs
41+
/logs/

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ tower-http = { version = "0.5", features = ["cors", "trace"] }
2525
tokio-tungstenite = "0.21"
2626
reqwest = { version = "0.11", features = ["json"] }
2727
futures-util = "0.3"
28+
sha2 = "0.10"
29+
plotters = "0.3.7"
30+
sysinfo = "0.31.4"
2831

2932
[lib]
3033
name = "janus"

README.md

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The name comes from the Roman deity Janus, associated with transitions and with
1818
- Hybrid queries that mix historical and live windows
1919
- Extension functions for anomaly-style predicates such as thresholds, relative change, z-score, outlier checks, and trend divergence
2020
- Optional baseline bootstrapping for hybrid anomaly queries with `USING BASELINE <window> LAST|AGGREGATE`
21+
- Query-defined baselines with `DEFINE BASELINE ... ON WINDOW ... AS SELECT ...`, `USING BASELINE :name`, and `GRAPH :name { ... }` materialization templates
2122
- HTTP endpoints for registering, starting, stopping, listing, and deleting queries
2223
- WebSocket result streaming for running queries
2324

@@ -28,30 +29,45 @@ Janus uses Janus-QL, a hybrid query language for querying historical and live RD
2829
Example:
2930

3031
```sparql
31-
PREFIX ex: <http://example.org/>
32-
PREFIX janus: <https://janus.rs/fn#>
33-
PREFIX baseline: <https://janus.rs/baseline#>
34-
35-
REGISTER RStream ex:out AS
36-
SELECT ?sensor ?reading
37-
FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000]
38-
FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000]
39-
USING BASELINE ex:hist AGGREGATE
32+
PREFIX : <http://example.org/>
33+
34+
FROM NAMED WINDOW :liveMinute ON STREAM :stream [RANGE 60000 STEP 1000]
35+
FROM NAMED WINDOW :historyDay ON LOG :stream [START 0 END 86400000]
36+
37+
DEFINE BASELINE :dayBaseline ON WINDOW :historyDay AS
38+
SELECT ?sensor
39+
(AVG(?value) AS ?dayAvgValue)
40+
WHERE {
41+
?sensor :hasValue ?value .
42+
}
43+
GROUP BY ?sensor
44+
45+
REGISTER RStream :output AS
46+
USING BASELINE :dayBaseline
47+
SELECT ?sensor
48+
(AVG(?value) AS ?minuteAvgValue)
49+
?dayAvgValue
50+
((AVG(?value) - ?dayAvgValue) AS ?difference)
4051
WHERE {
41-
WINDOW ex:hist {
42-
?sensor ex:mean ?mean .
43-
?sensor ex:sigma ?sigma .
52+
WINDOW :liveMinute {
53+
?sensor :hasValue ?value .
4454
}
45-
WINDOW ex:live {
46-
?sensor ex:hasReading ?reading .
55+
GRAPH :dayBaseline {
56+
?sensor :dayAvgValue ?dayAvgValue .
4757
}
48-
?sensor baseline:mean ?mean .
49-
?sensor baseline:sigma ?sigma .
50-
FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3))
5158
}
59+
GROUP BY ?sensor ?dayAvgValue
60+
HAVING(AVG(?value) > ?dayAvgValue)
5261
```
5362

54-
`USING BASELINE` is optional. If present, Janus bootstraps baseline values from the named historical window before or during live execution:
63+
For query-defined baselines:
64+
65+
- `DEFINE BASELINE` evaluates the historical baseline query before live startup over the source `LOG` window
66+
- `USING BASELINE :dayBaseline` tells Janus to prepare that baseline and inject the resulting quads into the live engine
67+
- `GRAPH :dayBaseline { ... }` is the materialization template; its concrete predicates and projected variables define the quads that are inserted
68+
- the live query can then use baseline variables in `SELECT`, `GROUP BY`, `HAVING`, and arithmetic expressions
69+
70+
Legacy `USING BASELINE <window> LAST|AGGREGATE` remains available:
5571

5672
- `LAST`: use the final historical window snapshot as baseline
5773
- `AGGREGATE`: merge the historical window outputs into one compact baseline
@@ -80,6 +96,7 @@ Janus uses dictionary encoding and segmented storage for high-throughput ingesti
8096
- Space efficiency: about 40% smaller encoded events
8197

8298
Detailed benchmark data is in [docs/BENCHMARK_RESULTS.md](./docs/BENCHMARK_RESULTS.md).
99+
Current benchmark commands and scope are in [docs/BENCHMARKING.md](./docs/BENCHMARKING.md).
83100

84101
## Quick Start
85102

benches/historical_fixed.rs

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,24 @@
1+
mod support;
2+
13
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
24
use janus::{
35
execution::historical_executor::HistoricalExecutor,
46
parsing::janusql_parser::{SourceKind, WindowDefinition, WindowType},
57
querying::oxigraph_adapter::OxigraphAdapter,
6-
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
7-
};
8-
use std::sync::{
9-
atomic::{AtomicU64, Ordering},
10-
Arc,
8+
storage::segmented_storage::StreamingSegmentedStorage,
119
};
12-
use std::time::{SystemTime, UNIX_EPOCH};
13-
14-
static COUNTER: AtomicU64 = AtomicU64::new(0);
15-
16-
fn unique_config() -> StreamingConfig {
17-
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
18-
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
19-
StreamingConfig {
20-
segment_base_path: format!("/tmp/janus_bench_fixed_{}_{}", ts, id),
21-
max_batch_events: 1_000_000,
22-
max_batch_age_seconds: 3600,
23-
max_batch_bytes: 1_000_000_000,
24-
sparse_interval: 64,
25-
entries_per_index_block: 256,
26-
}
27-
}
10+
use std::sync::Arc;
11+
use support::{populate_storage, unique_config, GRAPH_URI};
2812

2913
/// Write N events at timestamps [1000, 1000+N) into a fresh storage.
3014
/// These land in the in-memory batch buffer — no flush needed before querying.
3115
fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
32-
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
33-
for i in 0..n as u64 {
34-
storage
35-
.write_rdf(
36-
1_000 + i,
37-
&format!("http://example.org/sensor{}", i % 5),
38-
"http://saref.etsi.org/core/hasValue",
39-
&format!("{}", 20 + (i % 10)),
40-
"http://example.org/graph",
41-
)
42-
.unwrap();
43-
}
16+
let storage = StreamingSegmentedStorage::new(unique_config("historical_fixed")).unwrap();
17+
populate_storage(&storage, n, 1_000, 1, GRAPH_URI);
4418
let window = WindowDefinition {
4519
window_name: "w".to_string(),
4620
source_kind: SourceKind::Stream,
47-
stream_name: "http://example.org/stream".to_string(),
21+
stream_name: GRAPH_URI.to_string(),
4822
width: n as u64,
4923
slide: n as u64,
5024
offset: None,
@@ -55,7 +29,15 @@ fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
5529
(Arc::new(storage), window)
5630
}
5731

58-
const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
32+
const SPARQL: &str = r#"
33+
PREFIX ex: <http://example.org/>
34+
SELECT ?sensor ?temp
35+
WHERE {
36+
GRAPH ex:graph1 {
37+
?sensor ex:temperature ?temp .
38+
}
39+
}
40+
"#;
5941

6042
fn historical_fixed(c: &mut Criterion) {
6143
let mut group = c.benchmark_group("historical/fixed_window");

benches/historical_sliding.rs

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,14 @@
1+
mod support;
2+
13
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
24
use janus::{
35
execution::historical_executor::HistoricalExecutor,
46
parsing::janusql_parser::{SourceKind, WindowDefinition, WindowType},
57
querying::oxigraph_adapter::OxigraphAdapter,
6-
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
7-
};
8-
use std::sync::{
9-
atomic::{AtomicU64, Ordering},
10-
Arc,
8+
storage::segmented_storage::StreamingSegmentedStorage,
119
};
12-
use std::time::{SystemTime, UNIX_EPOCH};
13-
14-
static COUNTER: AtomicU64 = AtomicU64::new(0);
15-
16-
fn unique_config() -> StreamingConfig {
17-
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
18-
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
19-
StreamingConfig {
20-
segment_base_path: format!("/tmp/janus_bench_sliding_{}_{}", ts, id),
21-
max_batch_events: 1_000_000,
22-
max_batch_age_seconds: 3600,
23-
max_batch_bytes: 1_000_000_000,
24-
sparse_interval: 64,
25-
entries_per_index_block: 256,
26-
}
27-
}
10+
use std::sync::Arc;
11+
use support::{populate_storage, recent_base_timestamp, unique_config, GRAPH_URI};
2812

2913
// Window config: OFFSET=10_000ms, RANGE=2_000ms, SLIDE=1_000ms
3014
// SlidingWindowIterator scans [now-10000, now] with 8 overlapping windows.
@@ -36,25 +20,14 @@ const DATA_START_BEFORE_NOW_MS: u64 = 8_000;
3620
const DATA_SPAN_MS: u64 = 6_000;
3721

3822
fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
39-
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
40-
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
41-
let n64 = n as u64;
42-
for i in 0..n64 {
43-
let ts = now - DATA_START_BEFORE_NOW_MS + i * DATA_SPAN_MS / n64.max(1);
44-
storage
45-
.write_rdf(
46-
ts,
47-
&format!("http://example.org/sensor{}", i % 5),
48-
"http://saref.etsi.org/core/hasValue",
49-
&format!("{}", 20 + (i % 10)),
50-
"http://example.org/graph",
51-
)
52-
.unwrap();
53-
}
23+
let start_ts = recent_base_timestamp(DATA_START_BEFORE_NOW_MS);
24+
let storage = StreamingSegmentedStorage::new(unique_config("historical_sliding")).unwrap();
25+
let step_ms = (DATA_SPAN_MS / n.max(1) as u64).max(1);
26+
populate_storage(&storage, n, start_ts, step_ms, GRAPH_URI);
5427
let window = WindowDefinition {
5528
window_name: "w".to_string(),
5629
source_kind: SourceKind::Stream,
57-
stream_name: "http://example.org/stream".to_string(),
30+
stream_name: GRAPH_URI.to_string(),
5831
width: RANGE_MS,
5932
slide: SLIDE_MS,
6033
offset: Some(OFFSET_MS),
@@ -65,7 +38,15 @@ fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
6538
(Arc::new(storage), window)
6639
}
6740

68-
const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
41+
const SPARQL: &str = r#"
42+
PREFIX ex: <http://example.org/>
43+
SELECT ?sensor ?temp
44+
WHERE {
45+
GRAPH ex:graph1 {
46+
?sensor ex:temperature ?temp .
47+
}
48+
}
49+
"#;
6950

7051
fn historical_sliding(c: &mut Criterion) {
7152
let mut group = c.benchmark_group("historical/sliding_window");

benches/hybrid_baseline.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
mod support;
2+
3+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
4+
use janus::{core::RDFEvent, stream::live_stream_processing::LiveStreamProcessing};
5+
use std::time::Duration;
6+
use support::{make_sensor_event, wait_for_live_result, BASELINE_PREDICATE, STREAM_URI};
7+
8+
const HYBRID_RSPQL: &str = r#"
9+
PREFIX ex: <http://example.org/>
10+
REGISTER RStream <output> AS
11+
SELECT ?sensor ?liveTemp ?baselineTemp
12+
FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 10000 STEP 1000]
13+
WHERE {
14+
WINDOW ex:live {
15+
?sensor ex:temperature ?liveTemp .
16+
}
17+
?sensor ex:baselineTemperature ?baselineTemp .
18+
}
19+
"#;
20+
21+
fn setup_processor() -> LiveStreamProcessing {
22+
let mut proc = LiveStreamProcessing::new(HYBRID_RSPQL.to_string()).unwrap();
23+
proc.register_stream(STREAM_URI).unwrap();
24+
25+
for i in 0..5u64 {
26+
proc.add_static_data(RDFEvent::new(
27+
0,
28+
&format!("http://example.org/sensor{i}"),
29+
BASELINE_PREDICATE,
30+
&format!("{}", 20 + i),
31+
"",
32+
))
33+
.unwrap();
34+
}
35+
36+
proc.start_processing().unwrap();
37+
proc
38+
}
39+
40+
fn hybrid_baseline_join(c: &mut Criterion) {
41+
let mut group = c.benchmark_group("hybrid/baseline_join");
42+
group.sample_size(20);
43+
44+
for &n in &[1usize, 10, 100] {
45+
group.bench_with_input(BenchmarkId::new("events_per_window", n), &n, |b, &n| {
46+
b.iter_batched(
47+
setup_processor,
48+
|proc| {
49+
let n64 = n as u64;
50+
for i in 0..n64 {
51+
let ts = if n64 > 1 { i * 9_000 / (n64 - 1) } else { 0 };
52+
proc.add_event(STREAM_URI, make_sensor_event(ts, i, "")).unwrap();
53+
}
54+
55+
proc.close_stream(STREAM_URI, 20_000).unwrap();
56+
let result = wait_for_live_result(&proc, Duration::from_secs(10));
57+
black_box(result)
58+
},
59+
criterion::BatchSize::SmallInput,
60+
);
61+
});
62+
}
63+
64+
group.finish();
65+
}
66+
67+
criterion_group!(benches, hybrid_baseline_join);
68+
criterion_main!(benches);

0 commit comments

Comments
 (0)