-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_query_pipeline.rs
More file actions
102 lines (90 loc) · 3.39 KB
/
test_query_pipeline.rs
File metadata and controls
102 lines (90 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use janus::{
api::janus_api::JanusApi, parsing::janusql_parser::JanusQLParser,
registry::query_registry::QueryRegistry, storage::segmented_storage::StreamingSegmentedStorage,
storage::util::StreamingConfig,
};
use std::sync::Arc;
fn main() {
let janusql = r#"
PREFIX ex: <http://example.org/>
REGISTER RStream ex:output AS
SELECT ?sensor ?temp
FROM NAMED WINDOW ex:histWindow ON STREAM ex:sensorStream [START 1000000000000 END 2000000000000]
FROM NAMED WINDOW ex:liveWindow ON STREAM ex:sensorStream [RANGE 5000 STEP 2000]
WHERE {
WINDOW ex:histWindow {
?sensor ex:temperature ?temp .
}
WINDOW ex:liveWindow {
?sensor ex:temperature ?temp .
}
}
"#
.trim();
println!("Testing query pipeline...\n");
println!("Query:\n{}\n", janusql);
let config = StreamingConfig {
segment_base_path: "./data/storage".to_string(),
max_batch_bytes: 10485760,
max_batch_age_seconds: 5,
max_batch_events: 100_000,
sparse_interval: 1000,
entries_per_index_block: 1024,
};
let storage = Arc::new(StreamingSegmentedStorage::new(config).expect("Failed to load storage"));
let events = storage.query(0, u64::MAX).expect("Storage query failed");
println!("Storage has {} events", events.len());
if !events.is_empty() {
let dict = storage.get_dictionary().read().unwrap();
println!("\nFirst 3 events decoded:");
for (i, e) in events.iter().take(3).enumerate() {
println!("Event {}:", i + 1);
println!(" subject: {:?}", dict.decode(e.subject));
println!(" predicate: {:?}", dict.decode(e.predicate));
println!(" object: {:?}", dict.decode(e.object));
println!(" graph: {:?}", dict.decode(e.graph));
println!(" timestamp: {}", e.timestamp);
}
}
let parser = JanusQLParser::new().expect("Failed to create parser");
let registry = Arc::new(QueryRegistry::new());
let api = JanusApi::new(parser, registry, storage).expect("Failed to create API");
println!("\nRegistering query...");
let query_id = "test_query".to_string();
match api.register_query(query_id.clone(), janusql) {
Ok(_) => println!("✓ Query registered"),
Err(e) => {
println!("✗ Failed to register: {}", e);
return;
}
}
println!("Starting query...");
let handle = match api.start_query(&query_id) {
Ok(handle) => {
println!("✓ Query started");
handle
}
Err(e) => {
println!("✗ Failed to start: {}", e);
return;
}
};
println!("\nWaiting for results (5 seconds)...");
let start = std::time::Instant::now();
let mut result_count = 0;
while start.elapsed().as_secs() < 5 {
if let Some(result) = handle.try_receive() {
result_count += 1;
println!("\nResult {}:", result_count);
println!(" Source: {:?}", result.source);
println!(" Timestamp: {}", result.timestamp);
println!(" Bindings ({} items):", result.bindings.len());
for (i, binding) in result.bindings.iter().take(3).enumerate() {
println!(" {}: {:?}", i + 1, binding);
}
} else {
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
println!("\nTotal: {} results received", result_count);
}