Skip to content

Evaluate live extension functions in Janus#11

Merged
argahsuknesib merged 1 commit intomainfrom
codex/rsp-rs-extension-investigation
Apr 8, 2026
Merged

Evaluate live extension functions in Janus#11
argahsuknesib merged 1 commit intomainfrom
codex/rsp-rs-extension-investigation

Conversation

@argahsuknesib
Copy link
Copy Markdown
Collaborator

Summary

  • move live Janus extension-function evaluation into Janus while keeping rsp-rs unchanged
  • subscribe to rsp-rs window emissions, merge window content, and execute final live SPARQL with Janus build_evaluator()
  • add a live integration test for Janus custom functions and document the architecture in docs/live_extension_function_architecture.md

Validation

  • cargo fmt
  • cargo clippy --all-targets --all-features -- -D warnings
  • cargo test --all-features

@argahsuknesib
Copy link
Copy Markdown
Collaborator Author

@copilot Can you review the PR?

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR moves live Janus extension-function evaluation into Janus (without changing rsp-rs) by subscribing to rsp-rs window emissions, merging window content, and running the final live SPARQL via Janus’s build_evaluator() so Janus custom functions are available in the live path.

Changes:

  • Added Janus-side live window subscription + SPARQL execution (build_evaluator() + Oxigraph Store) in LiveStreamProcessing.
  • Mirrored static background quads in Janus so they can be included in Janus-side live evaluation.
  • Added a live integration test for a Janus extension function and documented the architecture.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
tests/live_stream_integration_test.rs Adds an integration test that asserts a Janus extension function works in the live path.
src/stream/live_stream_processing.rs Subscribes to rsp-rs windows and evaluates live SPARQL inside Janus using build_evaluator() and an Oxigraph Store.
docs/live_extension_function_architecture.md Documents the new live evaluation architecture and responsibilities split with rsp-rs.

Comment on lines +473 to +479
if let Ok(other_window) = other_window_arc.lock() {
if let Some(other_container) =
other_window.get_content_from_window(timestamp)
{
for quad in &other_container.elements {
container.add(quad.clone(), timestamp);
}
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the window subscription callback, this code locks sibling windows (other_window_arc.lock()) while executing a callback for the current window. If rsp-rs invokes callbacks concurrently for different windows, acquiring multiple window locks in iteration order can deadlock (e.g., w1 callback waits on w2 while w2 waits on w1). Consider avoiding cross-window locking in the callback (snapshot contents with try_lock, enforce a global lock ordering, or have rsp-rs emit merged window content upstream).

Suggested change
if let Ok(other_window) = other_window_arc.lock() {
if let Some(other_container) =
other_window.get_content_from_window(timestamp)
{
for quad in &other_container.elements {
container.add(quad.clone(), timestamp);
}
// Avoid blocking on sibling window locks while running a
// callback for the current window. This prevents
// cross-window lock cycles if rsp-rs invokes callbacks
// concurrently for different windows.
let sibling_elements = if let Ok(other_window) = other_window_arc.try_lock() {
other_window
.get_content_from_window(timestamp)
.map(|other_container| other_container.elements.clone())
} else {
None
};
if let Some(elements) = sibling_elements {
for quad in elements {
container.add(quad, timestamp);

Copilot uses AI. Check for mistakes.
Comment on lines +484 to +488
match Self::execute_live_query(
&container,
&sparql_query,
&static_data.lock().unwrap(),
) {
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_data.lock().unwrap() is held across execute_live_query(...), which builds an Oxigraph store and runs SPARQL evaluation. Holding the mutex during this potentially expensive work can block add_static_data() and any other callback threads, and unwrap() will panic on a poisoned lock. Prefer cloning/snapshotting the static quads under a short lock (and returning an error on poison) before executing the query.

Suggested change
match Self::execute_live_query(
&container,
&sparql_query,
&static_data.lock().unwrap(),
) {
let static_snapshot = match static_data.lock() {
Ok(static_quads) => static_quads.clone(),
Err(err) => {
eprintln!("Live Janus static data lock error: {}", err);
return;
}
};
match Self::execute_live_query(&container, &sparql_query, &static_snapshot) {

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +276
self.engine.add_static_data(quad.clone());
self.static_data.lock().unwrap().insert(quad);
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_static_data uses self.static_data.lock().unwrap(), which will panic if the mutex is poisoned. Since this is part of the public API surface, it would be better to convert lock poisoning into a LiveStreamProcessingError (or similar) instead of panicking.

Suggested change
self.engine.add_static_data(quad.clone());
self.static_data.lock().unwrap().insert(quad);
let mut static_data = self.static_data.lock().map_err(|_| {
LiveStreamProcessingError(
"Static data store is unavailable due to a poisoned mutex".to_string(),
)
})?;
static_data.insert(quad.clone());
self.engine.add_static_data(quad);

Copilot uses AI. Check for mistakes.
Comment on lines +514 to +536
let store = Store::new()
.map_err(|e| LiveStreamProcessingError(format!("Failed to create store: {}", e)))?;

for quad in &container.elements {
store.insert(quad).map_err(|e| {
LiveStreamProcessingError(format!("Failed to insert live quad into store: {}", e))
})?;
}
for quad in static_data {
store.insert(quad).map_err(|e| {
LiveStreamProcessingError(format!(
"Failed to insert static quad into live store: {}",
e
))
})?;
}

let parsed_query = build_evaluator().parse_query(query).map_err(|e| {
LiveStreamProcessingError(format!("Failed to parse live SPARQL: {}", e))
})?;
let results = parsed_query.on_store(&store).execute().map_err(|e| {
LiveStreamProcessingError(format!("Failed to execute live SPARQL: {}", e))
})?;
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_live_query creates a new Store, re-inserts all live/static quads, rebuilds the evaluator, and re-parses the same SPARQL query on every window emission. This is likely to be a major CPU/alloc hot spot for live workloads. Consider caching the parsed query/evaluator once (e.g., build in register_live_callbacks and share via Arc), and reusing a store/dataset or using a more incremental evaluation strategy.

Copilot uses AI. Check for mistakes.
Comment on lines +308 to +316
processor.close_stream("http://example.org/stream1", 3000).unwrap();
thread::sleep(Duration::from_millis(500));

let results = processor.collect_results(None).unwrap();
assert!(
results.iter().any(|result| result.bindings.contains("sensor-pass")),
"Expected at least one live result to pass the Janus extension-function filter, got {:?}",
results
);
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test relies on thread::sleep(Duration::from_millis(500)) to wait for asynchronous window closure/results, which can be flaky under load or on slower CI machines. Prefer a bounded poll loop (deadline + try_receive_result()/collect_results(Some(..))) that waits until at least one expected result arrives (or times out with a clear assertion).

Copilot uses AI. Check for mistakes.
@argahsuknesib argahsuknesib merged commit 5daa3c0 into main Apr 8, 2026
18 of 19 checks passed
@argahsuknesib argahsuknesib deleted the codex/rsp-rs-extension-investigation branch April 8, 2026 14:51
Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 8, 2026

@copilot Can you review the PR?

Here is my review of this PR.

Summary

The PR correctly solves the core problem: intercepting rsp-rs window emissions and re-running SPARQL through Janus's build_evaluator(), so Janus-specific extension functions work on live streams without forking rsp-rs. The overall approach is sound. A few issues are worth addressing before merging:


Issues

1. StreamType::RStream is hardcoded in register_live_callbacks

window.subscribe(StreamType::RStream, move |mut container| {

The RSP-QL query may declare IStream, DStream, or RStream as the output type. Hardcoding RStream means queries using REGISTER IStream or REGISTER DStream will silently produce wrong semantics. The stream type should be derived from parsed_query.output_stream_type (or equivalent) and mapped to the correct StreamType variant.


2. A new Store is created for every window emission

fn execute_live_query(...) -> Result<Vec<String>, ...> {
    let store = Store::new()...;  // fresh store on every window closure

This allocates and populates a full in-memory Oxigraph store on every window emission—including index creation and quad insertion for both live and static quads. For rapid-fire windows (e.g., STEP 100) or large static datasets this will be a throughput bottleneck. Consider pre-populating a base store with static data once (at add_static_data time) and bulk-loading or forking it per emission.


3. static_data Mutex is held across SPARQL evaluation

match Self::execute_live_query(
    &container,
    &sparql_query,
    &static_data.lock().unwrap(),   // lock held during entire SPARQL run

The MutexGuard lives for the duration of execute_live_query, which includes store population and full SPARQL execution. Any concurrent add_static_data call will block for the full duration of the query. Prefer copying the set before calling:

let static_snapshot: HashSet<Quad> = static_data.lock().unwrap().clone();
Self::execute_live_query(&container, &sparql_query, &static_snapshot)

4. Errors in the window callback are silently swallowed

Err(err) => {
    eprintln!("Live Janus evaluation error: {}", err);
}

The callback has no way to propagate errors to the caller. SPARQL parse/evaluation failures (e.g., unsupported extension function, bad query syntax) are dropped silently after printing to stderr. Consider sending a sentinel error value over the channel so collect_results / receive_result can surface it, or at minimum use a logging framework instead of eprintln!.


5. format!("{:?}", solution) produces unstable debug output

bindings.push(format!("{:?}", solution));

The bindings: String field in BindingWithTimestamp is filled with the Rust Debug format of an oxigraph::sparql::QuerySolution. This is an implementation detail of Oxigraph and may change between releases. The integration test relies on substring matching (result.bindings.contains("sensor-pass")), which is fragile. A more stable representation (e.g., JSON, N-Triples, or a structured map) would make downstream consumers more robust.


6. Multi-window merge uses triggering window's timestamp for sibling content

if let Some(other_container) = other_window.get_content_from_window(timestamp) {

When one window fires, its siblings are queried at timestamp (the triggering window's last_timestamp_changed). If sibling windows have a different STEP or RANGE and haven't emitted at that exact timestamp, get_content_from_window may return empty or stale content. This should be documented as a known limitation, or the merge logic should use each window's own last-emitted timestamp.


Minor / Positive Notes

  • The architecture doc is clear and the Mermaid diagram is helpful.
  • The integration test test_live_query_with_janus_extension_function directly validates the extension function path end-to-end — good addition.
  • The static_data mirror pattern (keeping a parallel HashSet<Quad> alongside the rsp-rs engine's static store) is the right approach given rsp-rs doesn't expose its internal store.
  • unwrap() on lock acquisition in add_static_data (line 276) will panic on a poisoned mutex — consistent with how it's used in the callback, but worth noting both should handle this gracefully.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants