-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathminimal_rsp_test.rs
More file actions
94 lines (80 loc) · 2.83 KB
/
minimal_rsp_test.rs
File metadata and controls
94 lines (80 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//! Minimal test to verify rsp-rs 0.3.1 graph name fix
//!
//! This uses very small time windows to ensure windows close quickly
//! and we can verify that results are now being received.
use janus::core::RDFEvent;
use janus::stream::live_stream_processing::LiveStreamProcessing;
use std::thread;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== RSP-RS 0.3.1 Integration Test ===\n");
// Use small windows: 1 second range, 200ms step
let query = r#"
PREFIX ex: <http://example.org/>
REGISTER RStream <output> AS
SELECT ?s ?p ?o
FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 1000 STEP 200]
WHERE {
WINDOW ex:w1 { ?s ?p ?o }
}
"#;
println!("Query: RANGE 1000ms, STEP 200ms\n");
let mut processor = LiveStreamProcessing::new(query.to_string())?;
processor.register_stream("http://example.org/stream1")?;
processor.start_processing()?;
println!("Adding 11 events (t=0 to t=1000ms)...");
for i in 0..11 {
let timestamp = (i * 100) as u64;
let event = RDFEvent::new(
timestamp,
&format!("http://example.org/subject{}", i),
"http://example.org/predicate",
&format!("object{}", i),
"",
);
processor.add_event("http://example.org/stream1", event)?;
}
println!("✓ Events added\n");
println!("Closing stream at t=5000ms...");
processor.close_stream("http://example.org/stream1", 5000)?;
println!("✓ Stream closed\n");
println!("Waiting 1 second for processing...");
thread::sleep(Duration::from_secs(1));
println!();
println!("=== Collecting Results ===\n");
let mut count = 0;
for _ in 0..50 {
match processor.try_receive_result() {
Ok(Some(result)) => {
count += 1;
if count <= 3 {
println!(
"Result {}: t={} to t={}",
count, result.timestamp_from, result.timestamp_to
);
println!(" Bindings: {}", result.bindings);
println!();
}
}
Ok(None) => break,
Err(e) => {
println!("Error: {}", e);
break;
}
}
}
if count > 3 {
println!("... ({} more results)\n", count - 3);
}
println!("=== RESULTS ===");
println!("Total results received: {}\n", count);
if count == 0 {
println!("❌ FAILED: No results received");
println!("The graph name fix in rsp-rs 0.3.1 may not be working.");
std::process::exit(1);
} else {
println!("✅ SUCCESS: Integration working!");
println!("The rsp-rs 0.3.1 fix is confirmed working with Janus.");
}
Ok(())
}