Skip to content

Commit 22da443

Browse files
Merge pull request #7 from SolidLabResearch/comparator
PR : Add Stateful Comparator and Kafka Stream Source Integration
2 parents 22f42e0 + b20a1a9 commit 22da443

22 files changed

Lines changed: 1112 additions & 385 deletions

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ serde = { version = "1.0", features = ["derive"] }
1616
bincode = "1.0"
1717
rsp-rs = "0.2.1"
1818
oxigraph = "0.5"
19+
rumqttc = "0.25.1"
20+
serde_json = "1.0.145"
21+
22+
[target.'cfg(not(windows))'.dependencies]
23+
rdkafka = "0.38.0"
1924

2025
[lib]
2126
name = "janus"

examples/comparator_demo.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use janus::stream::comparator::{ComparatorConfig, StatefulComparator};
2+
3+
fn main() {
4+
println!("=== Stateful Comparator Demo ===\n");
5+
6+
// 1. Setup Configuration
7+
let config = ComparatorConfig {
8+
abs_threshold: 5.0,
9+
rel_threshold: 0.2, // 20% change
10+
catchup_trigger: 10.0,
11+
slope_epsilon: 0.1,
12+
volatility_buffer: 2.0,
13+
window_size: 10,
14+
outlier_z_threshold: 3.0,
15+
};
16+
println!("Configuration: {:#?}\n", config);
17+
18+
// 2. Create Stateful Comparator
19+
let mut comparator = StatefulComparator::new(config);
20+
21+
// 3. Simulate streaming data over time
22+
println!("Feeding streaming data and checking for anomalies:\n");
23+
24+
// Historical baseline: stable around 100.0
25+
// Live data: starts normal, then becomes volatile and drops
26+
let data_points = vec![
27+
(0.0, 100.0, 100.0), // Both normal
28+
(1.0, 101.0, 100.1), // Both normal
29+
(2.0, 102.0, 100.2), // Both normal
30+
(3.0, 103.0, 100.3), // Both normal
31+
(4.0, 104.0, 100.4), // Both normal
32+
(5.0, 80.0, 100.5), // Live drops significantly (catch-up + outlier)
33+
(6.0, 75.0, 100.6), // Live continues dropping
34+
(7.0, 70.0, 100.7), // Live continues dropping
35+
(8.0, 65.0, 100.8), // Live continues dropping
36+
(9.0, 60.0, 100.9), // Live continues dropping
37+
];
38+
39+
for (timestamp, live_val, hist_val) in data_points {
40+
let anomalies = comparator.update_and_compare(timestamp, live_val, hist_val);
41+
42+
if !anomalies.is_empty() {
43+
println!(
44+
"T={:.0}: Live={:.1}, Hist={:.1} -> {} anomalies:",
45+
timestamp,
46+
live_val,
47+
hist_val,
48+
anomalies.len()
49+
);
50+
for anomaly in &anomalies {
51+
println!(" - {}", anomaly);
52+
}
53+
println!();
54+
} else {
55+
println!(
56+
"T={:.0}: Live={:.1}, Hist={:.1} -> No anomalies",
57+
timestamp, live_val, hist_val
58+
);
59+
}
60+
}
61+
}

src/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod query_registration;

src/api/query_registration.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

src/lib.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ pub mod store {
6363
//! RDF store implementations and interfaces
6464
}
6565

66+
pub mod sources;
67+
6668
/// Module for stream processing
6769
pub mod stream;
6870

@@ -79,8 +81,12 @@ pub mod config {
7981
/// Module for parsing JanusQL queries
8082
pub mod parsing;
8183

84+
pub mod api;
85+
8286
pub mod storage;
8387

88+
pub mod registry;
89+
8490
pub mod querying;
8591
pub mod error {
8692
//! Error types and result definitions
@@ -138,14 +144,3 @@ pub mod error {
138144

139145
// Re-export commonly used types
140146
pub use error::{Error, Result};
141-
142-
#[cfg(test)]
143-
mod tests {
144-
use super::*;
145-
146-
#[test]
147-
fn test_error_display() {
148-
let err = Error::Config("test error".to_string());
149-
assert_eq!(format!("{}", err), "Configuration error: test error");
150-
}
151-
}

src/parsing/janusql_parser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct R2SOperator {
4141
}
4242

4343
/// Parsed JanusQL query structure containing all components extracted from the query.
44-
#[derive(Debug)]
44+
#[derive(Debug, Clone)]
4545
pub struct ParsedJanusQuery {
4646
/// R2S operator if present
4747
pub r2s: Option<R2SOperator>,

src/querying/oxigraph_adapter.rs

Lines changed: 0 additions & 266 deletions
Original file line numberDiff line numberDiff line change
@@ -90,269 +90,3 @@ impl SparqlEngine for OxigraphAdapter {
9090
Ok(result_strings)
9191
}
9292
}
93-
94-
#[cfg(test)]
95-
mod tests {
96-
use super::*;
97-
use oxigraph::model::{GraphName, Literal, NamedNode};
98-
use std::collections::HashSet;
99-
100-
/// Helper function to create a test QuadContainer with sample data
101-
fn create_test_container() -> QuadContainer {
102-
let mut quads = HashSet::new();
103-
104-
// Add test quads: <http://example.org/alice> <http://example.org/knows> <http://example.org/bob>
105-
let alice = NamedNode::new("http://example.org/alice").unwrap();
106-
let bob = NamedNode::new("http://example.org/bob").unwrap();
107-
let charlie = NamedNode::new("http://example.org/charlie").unwrap();
108-
let knows = NamedNode::new("http://example.org/knows").unwrap();
109-
let age = NamedNode::new("http://example.org/age").unwrap();
110-
111-
// Alice knows Bob
112-
quads.insert(Quad::new(alice.clone(), knows.clone(), bob.clone(), GraphName::DefaultGraph));
113-
114-
// Bob knows Charlie
115-
quads.insert(Quad::new(
116-
bob.clone(),
117-
knows.clone(),
118-
charlie.clone(),
119-
GraphName::DefaultGraph,
120-
));
121-
122-
// Alice's age
123-
quads.insert(Quad::new(
124-
alice.clone(),
125-
age.clone(),
126-
Literal::new_simple_literal("30"),
127-
GraphName::DefaultGraph,
128-
));
129-
130-
// Bob's age
131-
quads.insert(Quad::new(
132-
bob.clone(),
133-
age.clone(),
134-
Literal::new_simple_literal("25"),
135-
GraphName::DefaultGraph,
136-
));
137-
138-
QuadContainer::new(quads, 1000)
139-
}
140-
141-
#[test]
142-
fn test_oxigraph_adapter_creation() {
143-
let _adapter = OxigraphAdapter::new();
144-
// Adapter created successfully
145-
}
146-
147-
#[test]
148-
fn test_execute_simple_select_query() {
149-
let adapter = OxigraphAdapter::new();
150-
let container = create_test_container();
151-
152-
// Query to select all subjects
153-
let query = "SELECT ?s WHERE { ?s ?p ?o }";
154-
155-
let results = adapter.execute_query(query, &container);
156-
assert!(results.is_ok(), "Query execution should succeed");
157-
158-
let results = results.unwrap();
159-
assert!(!results.is_empty(), "Results should not be empty");
160-
assert_eq!(results.len(), 4, "Should return 4 results (4 distinct subjects in quads)");
161-
}
162-
163-
#[test]
164-
fn test_execute_select_with_filter() {
165-
let adapter = OxigraphAdapter::new();
166-
let container = create_test_container();
167-
168-
// Query to select subjects that know someone
169-
let query = r"
170-
PREFIX ex: <http://example.org/>
171-
SELECT ?s WHERE {
172-
?s ex:knows ?o
173-
}
174-
";
175-
176-
let results = adapter.execute_query(query, &container);
177-
assert!(results.is_ok(), "Query with filter should succeed");
178-
179-
let results = results.unwrap();
180-
assert_eq!(results.len(), 2, "Should return 2 results (Alice and Bob know someone)");
181-
}
182-
183-
#[test]
184-
fn test_execute_ask_query() {
185-
let adapter = OxigraphAdapter::new();
186-
let container = create_test_container();
187-
188-
// ASK query to check if Alice knows Bob
189-
let query = r"
190-
PREFIX ex: <http://example.org/>
191-
ASK {
192-
ex:alice ex:knows ex:bob
193-
}
194-
";
195-
196-
let results = adapter.execute_query(query, &container);
197-
assert!(results.is_ok(), "ASK query should succeed");
198-
199-
let results = results.unwrap();
200-
assert_eq!(results.len(), 1, "ASK query should return one boolean result");
201-
assert_eq!(results[0], "true", "ASK query should return true");
202-
}
203-
204-
#[test]
205-
fn test_execute_ask_query_false() {
206-
let adapter = OxigraphAdapter::new();
207-
let container = create_test_container();
208-
209-
// ASK query that should return false
210-
let query = r"
211-
PREFIX ex: <http://example.org/>
212-
ASK {
213-
ex:alice ex:knows ex:charlie
214-
}
215-
";
216-
217-
let results = adapter.execute_query(query, &container);
218-
assert!(results.is_ok(), "ASK query should succeed");
219-
220-
let results = results.unwrap();
221-
assert_eq!(results.len(), 1, "ASK query should return one boolean result");
222-
assert_eq!(
223-
results[0], "false",
224-
"ASK query should return false (Alice doesn't know Charlie directly)"
225-
);
226-
}
227-
228-
#[test]
229-
fn test_execute_construct_query() {
230-
let adapter = OxigraphAdapter::new();
231-
let container = create_test_container();
232-
233-
// CONSTRUCT query to create new triples
234-
let query = r"
235-
PREFIX ex: <http://example.org/>
236-
CONSTRUCT {
237-
?s ex:knows ?o
238-
}
239-
WHERE {
240-
?s ex:knows ?o
241-
}
242-
";
243-
244-
let results = adapter.execute_query(query, &container);
245-
assert!(results.is_ok(), "CONSTRUCT query should succeed");
246-
247-
let results = results.unwrap();
248-
assert_eq!(results.len(), 2, "CONSTRUCT should return 2 triples");
249-
}
250-
251-
#[test]
252-
fn test_execute_with_empty_container() {
253-
let adapter = OxigraphAdapter::new();
254-
let empty_container = QuadContainer::new(HashSet::new(), 1000);
255-
256-
let query = "SELECT ?s WHERE { ?s ?p ?o }";
257-
258-
let results = adapter.execute_query(query, &empty_container);
259-
assert!(results.is_ok(), "Query on empty container should succeed");
260-
261-
let results = results.unwrap();
262-
assert!(results.is_empty(), "Results should be empty for empty container");
263-
}
264-
265-
#[test]
266-
fn test_execute_invalid_query() {
267-
let adapter = OxigraphAdapter::new();
268-
let container = create_test_container();
269-
270-
// Invalid SPARQL query
271-
let query = "INVALID SPARQL QUERY";
272-
273-
let results = adapter.execute_query(query, &container);
274-
assert!(results.is_err(), "Invalid query should return an error");
275-
276-
let error = results.unwrap_err();
277-
assert!(error.to_string().contains("Oxigraph error"), "Error should be an OxigraphError");
278-
}
279-
280-
#[test]
281-
fn test_execute_query_with_literal_filter() {
282-
let adapter = OxigraphAdapter::new();
283-
let container = create_test_container();
284-
285-
// Query to find people older than 25
286-
let query = r#"
287-
PREFIX ex: <http://example.org/>
288-
SELECT ?s ?age WHERE {
289-
?s ex:age ?age .
290-
FILTER(?age > "25")
291-
}
292-
"#;
293-
294-
let results = adapter.execute_query(query, &container);
295-
assert!(results.is_ok(), "Query with literal filter should succeed");
296-
297-
let results = results.unwrap();
298-
assert_eq!(results.len(), 1, "Should return 1 result (Alice is 30)");
299-
}
300-
301-
#[test]
302-
fn test_execute_count_query() {
303-
let adapter = OxigraphAdapter::new();
304-
let container = create_test_container();
305-
306-
// Query to count the number of 'knows' relationships
307-
let query = r"
308-
PREFIX ex: <http://example.org/>
309-
SELECT (COUNT(?s) AS ?count) WHERE {
310-
?s ex:knows ?o
311-
}
312-
";
313-
314-
let results = adapter.execute_query(query, &container);
315-
assert!(results.is_ok(), "COUNT query should succeed");
316-
317-
let results = results.unwrap();
318-
assert_eq!(results.len(), 1, "COUNT query should return 1 result");
319-
}
320-
321-
#[test]
322-
fn test_multiple_queries_on_same_adapter() {
323-
let adapter = OxigraphAdapter::new();
324-
let container = create_test_container();
325-
326-
// First query
327-
let query1 = "SELECT ?s WHERE { ?s ?p ?o }";
328-
let results1 = adapter.execute_query(query1, &container);
329-
assert!(results1.is_ok(), "First query should succeed");
330-
331-
// Second query
332-
let query2 = r"
333-
PREFIX ex: <http://example.org/>
334-
SELECT ?s WHERE { ?s ex:knows ?o }
335-
";
336-
let results2 = adapter.execute_query(query2, &container);
337-
assert!(results2.is_ok(), "Second query should succeed");
338-
339-
// Verify both queries returned results
340-
assert!(!results1.unwrap().is_empty());
341-
assert!(!results2.unwrap().is_empty());
342-
}
343-
344-
#[test]
345-
fn test_oxigraph_error_display() {
346-
let error = OxigraphError("Test error message".to_string());
347-
let error_string = format!("{}", error);
348-
assert_eq!(error_string, "Oxigraph error: Test error message");
349-
}
350-
351-
#[test]
352-
fn test_oxigraph_error_from_storage_error() {
353-
// This tests the From implementation for StorageError
354-
// We can't easily create a real StorageError, but we verify the trait is implemented
355-
let error = OxigraphError::from(oxigraph::store::StorageError::Other("test".into()));
356-
assert!(error.to_string().contains("Oxigraph error"));
357-
}
358-
}

src/registry/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod query_registry;

0 commit comments

Comments
 (0)