Evaluate live extension functions in Janus#11
Conversation
|
@copilot Can you review the PR? |
There was a problem hiding this comment.
Pull request overview
This PR moves live Janus extension-function evaluation into Janus (without changing rsp-rs) by subscribing to rsp-rs window emissions, merging window content, and running the final live SPARQL via Janus’s build_evaluator() so Janus custom functions are available in the live path.
Changes:
- Added Janus-side live window subscription + SPARQL execution (
build_evaluator()+ OxigraphStore) inLiveStreamProcessing. - Mirrored static background quads in Janus so they can be included in Janus-side live evaluation.
- Added a live integration test for a Janus extension function and documented the architecture.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| tests/live_stream_integration_test.rs | Adds an integration test that asserts a Janus extension function works in the live path. |
| src/stream/live_stream_processing.rs | Subscribes to rsp-rs windows and evaluates live SPARQL inside Janus using build_evaluator() and an Oxigraph Store. |
| docs/live_extension_function_architecture.md | Documents the new live evaluation architecture and responsibilities split with rsp-rs. |
| if let Ok(other_window) = other_window_arc.lock() { | ||
| if let Some(other_container) = | ||
| other_window.get_content_from_window(timestamp) | ||
| { | ||
| for quad in &other_container.elements { | ||
| container.add(quad.clone(), timestamp); | ||
| } |
There was a problem hiding this comment.
Inside the window subscription callback, this code locks sibling windows (other_window_arc.lock()) while executing a callback for the current window. If rsp-rs invokes callbacks concurrently for different windows, acquiring multiple window locks in iteration order can deadlock (e.g., w1 callback waits on w2 while w2 waits on w1). Consider avoiding cross-window locking in the callback (snapshot contents with try_lock, enforce a global lock ordering, or have rsp-rs emit merged window content upstream).
| if let Ok(other_window) = other_window_arc.lock() { | |
| if let Some(other_container) = | |
| other_window.get_content_from_window(timestamp) | |
| { | |
| for quad in &other_container.elements { | |
| container.add(quad.clone(), timestamp); | |
| } | |
| // Avoid blocking on sibling window locks while running a | |
| // callback for the current window. This prevents | |
| // cross-window lock cycles if rsp-rs invokes callbacks | |
| // concurrently for different windows. | |
| let sibling_elements = if let Ok(other_window) = other_window_arc.try_lock() { | |
| other_window | |
| .get_content_from_window(timestamp) | |
| .map(|other_container| other_container.elements.clone()) | |
| } else { | |
| None | |
| }; | |
| if let Some(elements) = sibling_elements { | |
| for quad in elements { | |
| container.add(quad, timestamp); |
| match Self::execute_live_query( | ||
| &container, | ||
| &sparql_query, | ||
| &static_data.lock().unwrap(), | ||
| ) { |
There was a problem hiding this comment.
static_data.lock().unwrap() is held across execute_live_query(...), which builds an Oxigraph store and runs SPARQL evaluation. Holding the mutex during this potentially expensive work can block add_static_data() and any other callback threads, and unwrap() will panic on a poisoned lock. Prefer cloning/snapshotting the static quads under a short lock (and returning an error on poison) before executing the query.
| match Self::execute_live_query( | |
| &container, | |
| &sparql_query, | |
| &static_data.lock().unwrap(), | |
| ) { | |
| let static_snapshot = match static_data.lock() { | |
| Ok(static_quads) => static_quads.clone(), | |
| Err(err) => { | |
| eprintln!("Live Janus static data lock error: {}", err); | |
| return; | |
| } | |
| }; | |
| match Self::execute_live_query(&container, &sparql_query, &static_snapshot) { |
| self.engine.add_static_data(quad.clone()); | ||
| self.static_data.lock().unwrap().insert(quad); |
There was a problem hiding this comment.
add_static_data uses self.static_data.lock().unwrap(), which will panic if the mutex is poisoned. Since this is part of the public API surface, it would be better to convert lock poisoning into a LiveStreamProcessingError (or similar) instead of panicking.
| self.engine.add_static_data(quad.clone()); | |
| self.static_data.lock().unwrap().insert(quad); | |
| let mut static_data = self.static_data.lock().map_err(|_| { | |
| LiveStreamProcessingError( | |
| "Static data store is unavailable due to a poisoned mutex".to_string(), | |
| ) | |
| })?; | |
| static_data.insert(quad.clone()); | |
| self.engine.add_static_data(quad); |
| let store = Store::new() | ||
| .map_err(|e| LiveStreamProcessingError(format!("Failed to create store: {}", e)))?; | ||
|
|
||
| for quad in &container.elements { | ||
| store.insert(quad).map_err(|e| { | ||
| LiveStreamProcessingError(format!("Failed to insert live quad into store: {}", e)) | ||
| })?; | ||
| } | ||
| for quad in static_data { | ||
| store.insert(quad).map_err(|e| { | ||
| LiveStreamProcessingError(format!( | ||
| "Failed to insert static quad into live store: {}", | ||
| e | ||
| )) | ||
| })?; | ||
| } | ||
|
|
||
| let parsed_query = build_evaluator().parse_query(query).map_err(|e| { | ||
| LiveStreamProcessingError(format!("Failed to parse live SPARQL: {}", e)) | ||
| })?; | ||
| let results = parsed_query.on_store(&store).execute().map_err(|e| { | ||
| LiveStreamProcessingError(format!("Failed to execute live SPARQL: {}", e)) | ||
| })?; |
There was a problem hiding this comment.
execute_live_query creates a new Store, re-inserts all live/static quads, rebuilds the evaluator, and re-parses the same SPARQL query on every window emission. This is likely to be a major CPU/alloc hot spot for live workloads. Consider caching the parsed query/evaluator once (e.g., build in register_live_callbacks and share via Arc), and reusing a store/dataset or using a more incremental evaluation strategy.
| processor.close_stream("http://example.org/stream1", 3000).unwrap(); | ||
| thread::sleep(Duration::from_millis(500)); | ||
|
|
||
| let results = processor.collect_results(None).unwrap(); | ||
| assert!( | ||
| results.iter().any(|result| result.bindings.contains("sensor-pass")), | ||
| "Expected at least one live result to pass the Janus extension-function filter, got {:?}", | ||
| results | ||
| ); |
There was a problem hiding this comment.
This test relies on thread::sleep(Duration::from_millis(500)) to wait for asynchronous window closure/results, which can be flaky under load or on slower CI machines. Prefer a bounded poll loop (deadline + try_receive_result()/collect_results(Some(..))) that waits until at least one expected result arrives (or times out with a clear assertion).
Here is my review of this PR. SummaryThe PR correctly solves the core problem: intercepting rsp-rs window emissions and re-running SPARQL through Janus's Issues1. window.subscribe(StreamType::RStream, move |mut container| {The RSP-QL query may declare 2. A new fn execute_live_query(...) -> Result<Vec<String>, ...> {
let store = Store::new()...; // fresh store on every window closureThis allocates and populates a full in-memory Oxigraph store on every window emission—including index creation and quad insertion for both live and static quads. For rapid-fire windows (e.g., 3. match Self::execute_live_query(
&container,
&sparql_query,
&static_data.lock().unwrap(), // lock held during entire SPARQL runThe let static_snapshot: HashSet<Quad> = static_data.lock().unwrap().clone();
Self::execute_live_query(&container, &sparql_query, &static_snapshot)4. Errors in the window callback are silently swallowed Err(err) => {
eprintln!("Live Janus evaluation error: {}", err);
}The callback has no way to propagate errors to the caller. SPARQL parse/evaluation failures (e.g., unsupported extension function, bad query syntax) are dropped silently after printing to stderr. Consider sending a sentinel error value over the channel so 5. bindings.push(format!("{:?}", solution));The 6. Multi-window merge uses triggering window's timestamp for sibling content if let Some(other_container) = other_window.get_content_from_window(timestamp) {When one window fires, its siblings are queried at Minor / Positive Notes
|
Summary
Validation