|
| 1 | +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; |
| 2 | +use janus::{ |
| 3 | + execution::historical_executor::HistoricalExecutor, |
| 4 | + parsing::janusql_parser::{WindowDefinition, WindowType}, |
| 5 | + querying::oxigraph_adapter::OxigraphAdapter, |
| 6 | + storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig}, |
| 7 | +}; |
| 8 | +use std::sync::{ |
| 9 | + atomic::{AtomicU64, Ordering}, |
| 10 | + Arc, |
| 11 | +}; |
| 12 | +use std::time::{SystemTime, UNIX_EPOCH}; |
| 13 | + |
| 14 | +static COUNTER: AtomicU64 = AtomicU64::new(0); |
| 15 | + |
| 16 | +fn unique_config() -> StreamingConfig { |
| 17 | + let id = COUNTER.fetch_add(1, Ordering::Relaxed); |
| 18 | + let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); |
| 19 | + StreamingConfig { |
| 20 | + segment_base_path: format!("/tmp/janus_bench_fixed_{}_{}", ts, id), |
| 21 | + max_batch_events: 1_000_000, |
| 22 | + max_batch_age_seconds: 3600, |
| 23 | + max_batch_bytes: 1_000_000_000, |
| 24 | + sparse_interval: 64, |
| 25 | + entries_per_index_block: 256, |
| 26 | + } |
| 27 | +} |
| 28 | + |
| 29 | +/// Write N events at timestamps [1000, 1000+N) into a fresh storage. |
| 30 | +/// These land in the in-memory batch buffer — no flush needed before querying. |
| 31 | +fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) { |
| 32 | + let storage = StreamingSegmentedStorage::new(unique_config()).unwrap(); |
| 33 | + for i in 0..n as u64 { |
| 34 | + storage |
| 35 | + .write_rdf( |
| 36 | + 1_000 + i, |
| 37 | + &format!("http://example.org/sensor{}", i % 5), |
| 38 | + "http://saref.etsi.org/core/hasValue", |
| 39 | + &format!("{}", 20 + (i % 10)), |
| 40 | + "http://example.org/graph", |
| 41 | + ) |
| 42 | + .unwrap(); |
| 43 | + } |
| 44 | + let window = WindowDefinition { |
| 45 | + window_name: "w".to_string(), |
| 46 | + stream_name: "http://example.org/stream".to_string(), |
| 47 | + width: n as u64, |
| 48 | + slide: n as u64, |
| 49 | + offset: None, |
| 50 | + start: Some(1_000), |
| 51 | + end: Some(1_000 + n as u64 - 1), |
| 52 | + window_type: WindowType::HistoricalFixed, |
| 53 | + }; |
| 54 | + (Arc::new(storage), window) |
| 55 | +} |
| 56 | + |
| 57 | +const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }"; |
| 58 | + |
| 59 | +fn historical_fixed(c: &mut Criterion) { |
| 60 | + let mut group = c.benchmark_group("historical/fixed_window"); |
| 61 | + |
| 62 | + for &n in &[100usize, 1_000, 10_000] { |
| 63 | + group.bench_with_input(BenchmarkId::new("events", n), &n, |b, &n| { |
| 64 | + b.iter_batched( |
| 65 | + || setup(n), |
| 66 | + |(storage, window)| { |
| 67 | + let executor = HistoricalExecutor::new(storage, OxigraphAdapter::new()); |
| 68 | + black_box(executor.execute_fixed_window(&window, SPARQL).unwrap()) |
| 69 | + }, |
| 70 | + criterion::BatchSize::SmallInput, |
| 71 | + ); |
| 72 | + }); |
| 73 | + } |
| 74 | + |
| 75 | + group.finish(); |
| 76 | +} |
| 77 | + |
| 78 | +criterion_group!(benches, historical_fixed); |
| 79 | +criterion_main!(benches); |
0 commit comments