44//! It integrates RSP-QL query execution with Janus's RDFEvent data model.
55
66use crate :: core:: RDFEvent ;
7+ use crate :: extensions:: query_options:: build_evaluator;
78use oxigraph:: model:: { GraphName , NamedNode , Quad , Term } ;
8- use rsp_rs:: { BindingWithTimestamp , RDFStream , RSPEngine } ;
9- use std:: collections:: HashMap ;
9+ use oxigraph:: sparql:: QueryResults ;
10+ use oxigraph:: store:: Store ;
11+ use rsp_rs:: { BindingWithTimestamp , RDFStream , RSPEngine , StreamType } ;
12+ use std:: collections:: { HashMap , HashSet } ;
1013use std:: sync:: mpsc:: { Receiver , RecvError } ;
14+ use std:: sync:: { mpsc, Arc , Mutex } ;
1115
1216/// Live stream processing engine for RSP-QL queries
1317pub struct LiveStreamProcessing {
@@ -17,6 +21,8 @@ pub struct LiveStreamProcessing {
1721 streams : HashMap < String , RDFStream > ,
1822 /// Result receiver for query results
1923 result_receiver : Option < Receiver < BindingWithTimestamp > > ,
24+ /// Static quads mirrored in Janus for Janus-side live query evaluation.
25+ static_data : Arc < Mutex < HashSet < Quad > > > ,
2026 /// Flag indicating if processing has started
2127 processing_started : bool ,
2228}
@@ -81,6 +87,7 @@ impl LiveStreamProcessing {
8187 engine,
8288 streams : HashMap :: new ( ) ,
8389 result_receiver : None ,
90+ static_data : Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ,
8491 processing_started : false ,
8592 } )
8693 }
@@ -117,7 +124,7 @@ impl LiveStreamProcessing {
117124 return Err ( LiveStreamProcessingError ( "Processing already started" . to_string ( ) ) ) ;
118125 }
119126
120- let receiver = self . engine . start_processing ( ) ;
127+ let receiver = self . register_live_callbacks ( ) ? ;
121128 self . result_receiver = Some ( receiver) ;
122129 self . processing_started = true ;
123130
@@ -265,7 +272,8 @@ impl LiveStreamProcessing {
265272 /// * `event` - RDFEvent representing static knowledge
266273 pub fn add_static_data ( & mut self , event : RDFEvent ) -> Result < ( ) , LiveStreamProcessingError > {
267274 let quad = self . rdf_event_to_quad ( & event) ?;
268- self . engine . add_static_data ( quad) ;
275+ self . engine . add_static_data ( quad. clone ( ) ) ;
276+ self . static_data . lock ( ) . unwrap ( ) . insert ( quad) ;
269277 Ok ( ( ) )
270278 }
271279
@@ -420,6 +428,129 @@ impl LiveStreamProcessing {
420428 Ok ( Quad :: new ( subject, predicate, object, graph) )
421429 }
422430
431+ fn register_live_callbacks (
432+ & self ,
433+ ) -> Result < Receiver < BindingWithTimestamp > , LiveStreamProcessingError > {
434+ let parsed_query = self . engine . parsed_query ( ) . clone ( ) ;
435+ let sparql_query = Arc :: new ( parsed_query. sparql_query . clone ( ) ) ;
436+ let ( tx, rx) = mpsc:: channel ( ) ;
437+
438+ let mut windows = HashMap :: new ( ) ;
439+ for window_def in & parsed_query. s2r {
440+ let window = self . engine . get_window ( & window_def. window_name ) . ok_or_else ( || {
441+ LiveStreamProcessingError ( format ! (
442+ "Window '{}' not found in engine" ,
443+ window_def. window_name
444+ ) )
445+ } ) ?;
446+ windows. insert ( window_def. window_name . clone ( ) , window) ;
447+ }
448+ let windows = Arc :: new ( windows) ;
449+ let static_data = Arc :: clone ( & self . static_data ) ;
450+
451+ for window_def in parsed_query. s2r {
452+ let window_arc = windows. get ( & window_def. window_name ) . cloned ( ) . ok_or_else ( || {
453+ LiveStreamProcessingError ( format ! (
454+ "Window '{}' not available for subscription" ,
455+ window_def. window_name
456+ ) )
457+ } ) ?;
458+ let tx_clone = tx. clone ( ) ;
459+ let sparql_query = Arc :: clone ( & sparql_query) ;
460+ let all_windows = Arc :: clone ( & windows) ;
461+ let static_data = Arc :: clone ( & static_data) ;
462+ let window_name = window_def. window_name . clone ( ) ;
463+ let window_width = window_def. width ;
464+
465+ let mut window = window_arc. lock ( ) . unwrap ( ) ;
466+ window. subscribe ( StreamType :: RStream , move |mut container| {
467+ let timestamp = container. last_timestamp_changed ;
468+
469+ for ( other_name, other_window_arc) in all_windows. iter ( ) {
470+ if other_name == & window_name {
471+ continue ;
472+ }
473+ if let Ok ( other_window) = other_window_arc. lock ( ) {
474+ if let Some ( other_container) =
475+ other_window. get_content_from_window ( timestamp)
476+ {
477+ for quad in & other_container. elements {
478+ container. add ( quad. clone ( ) , timestamp) ;
479+ }
480+ }
481+ }
482+ }
483+
484+ match Self :: execute_live_query (
485+ & container,
486+ & sparql_query,
487+ & static_data. lock ( ) . unwrap ( ) ,
488+ ) {
489+ Ok ( bindings) => {
490+ for binding in bindings {
491+ let result = BindingWithTimestamp {
492+ bindings : binding,
493+ timestamp_from : timestamp,
494+ timestamp_to : timestamp + window_width,
495+ } ;
496+ let _ = tx_clone. send ( result) ;
497+ }
498+ }
499+ Err ( err) => {
500+ eprintln ! ( "Live Janus evaluation error: {}" , err) ;
501+ }
502+ }
503+ } ) ;
504+ }
505+
506+ Ok ( rx)
507+ }
508+
509+ fn execute_live_query (
510+ container : & rsp_rs:: QuadContainer ,
511+ query : & str ,
512+ static_data : & HashSet < Quad > ,
513+ ) -> Result < Vec < String > , LiveStreamProcessingError > {
514+ let store = Store :: new ( )
515+ . map_err ( |e| LiveStreamProcessingError ( format ! ( "Failed to create store: {}" , e) ) ) ?;
516+
517+ for quad in & container. elements {
518+ store. insert ( quad) . map_err ( |e| {
519+ LiveStreamProcessingError ( format ! ( "Failed to insert live quad into store: {}" , e) )
520+ } ) ?;
521+ }
522+ for quad in static_data {
523+ store. insert ( quad) . map_err ( |e| {
524+ LiveStreamProcessingError ( format ! (
525+ "Failed to insert static quad into live store: {}" ,
526+ e
527+ ) )
528+ } ) ?;
529+ }
530+
531+ let parsed_query = build_evaluator ( ) . parse_query ( query) . map_err ( |e| {
532+ LiveStreamProcessingError ( format ! ( "Failed to parse live SPARQL: {}" , e) )
533+ } ) ?;
534+ let results = parsed_query. on_store ( & store) . execute ( ) . map_err ( |e| {
535+ LiveStreamProcessingError ( format ! ( "Failed to execute live SPARQL: {}" , e) )
536+ } ) ?;
537+
538+ let mut bindings = Vec :: new ( ) ;
539+ if let QueryResults :: Solutions ( solutions) = results {
540+ for solution in solutions {
541+ let solution = solution. map_err ( |e| {
542+ LiveStreamProcessingError ( format ! (
543+ "Failed to evaluate live solution binding: {}" ,
544+ e
545+ ) )
546+ } ) ?;
547+ bindings. push ( format ! ( "{:?}" , solution) ) ;
548+ }
549+ }
550+
551+ Ok ( bindings)
552+ }
553+
423554 /// Returns the list of registered stream URIs
424555 pub fn get_registered_streams ( & self ) -> Vec < String > {
425556 self . streams . keys ( ) . cloned ( ) . collect ( )
0 commit comments