diff --git a/README.md b/README.md index c0f4d9e..ff2c4ff 100644 --- a/README.md +++ b/README.md @@ -1,79 +1,150 @@ # Janus -Janus is a hybrid engine for unified Live and Historical RDF Stream Processing, implemented in Rust. +Janus is a Rust engine for unified historical and live RDF stream processing. + +It combines: + +- historical window evaluation over segmented RDF storage +- live window evaluation over incoming streams +- a single Janus-QL query model for hybrid queries +- an HTTP/WebSocket API for query lifecycle management and result delivery + +The name comes from the Roman deity Janus, associated with transitions and with looking both backward and forward. That dual perspective matches Janus's goal: querying past and live RDF data together. + +## What Janus Supports + +- Historical windows with `START` / `END` +- Sliding live windows with `RANGE` / `STEP` +- Hybrid queries that mix historical and live windows +- Extension functions for anomaly-style predicates such as thresholds, relative change, z-score, outlier checks, and trend divergence +- Optional baseline bootstrapping for hybrid anomaly queries with `USING BASELINE LAST|AGGREGATE` +- HTTP endpoints for registering, starting, stopping, listing, and deleting queries +- WebSocket result streaming for running queries + +## Query Model + +Janus uses Janus-QL, a hybrid query language for querying historical and live RDF data in one query. + +Example: + +```sparql +PREFIX ex: +PREFIX janus: +PREFIX baseline: + +REGISTER RStream ex:out AS +SELECT ?sensor ?reading +FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000] +FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000] +USING BASELINE ex:hist AGGREGATE +WHERE { + WINDOW ex:hist { + ?sensor ex:mean ?mean . + ?sensor ex:sigma ?sigma . + } + WINDOW ex:live { + ?sensor ex:hasReading ?reading . + } + ?sensor baseline:mean ?mean . + ?sensor baseline:sigma ?sigma . + FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3)) +} +``` + +`USING BASELINE` is optional. If present, Janus bootstraps baseline values from the named historical window before or during live execution: -The name "Janus" is inspired by the Roman deity Janus who is the guardian of doorways and transitions, and looks towards both the past and the future simultaneously. This dual perspective reflects Janus's capability to process both Historical and Live RDF streams in a unified manner utilizing a single query language and engine. +- `LAST`: use the final historical window snapshot as baseline +- `AGGREGATE`: merge the historical window outputs into one compact baseline ## Performance -Janus achieves high-throughput RDF stream processing with dictionary encoding and streaming segmented storage: +Janus uses dictionary encoding and segmented storage for high-throughput ingestion and historical reads. -- Write Throughput: 2.6-3.14 Million quads/sec -- Read Throughput: 2.7-2.77 Million quads/sec -- Point Query Latency: Sub-millisecond (0.235 ms at 1M quads) -- Space Efficiency: 40% reduction through dictionary encoding (24 bytes vs 40 bytes per event) +- Write throughput: 2.6-3.14 million quads/sec +- Read throughput: 2.7-2.77 million quads/sec +- Point query latency: 0.235 ms at 1M quads +- Space efficiency: about 40% smaller encoded events -For detailed benchmark results, see [BENCHMARK_RESULTS.md](./BENCHMARK_RESULTS.md). +Detailed benchmark data is in [BENCHMARK_RESULTS.md](./BENCHMARK_RESULTS.md). -## Development +## Quick Start ### Prerequisites -- Rust (stable toolchain) +- Rust stable - Cargo -### Building +### Build ```bash -# Debug build make build - -# Release build (optimized) make release ``` -### Testing +### Run the HTTP API ```bash -# Run all tests -make test - -# Run tests with verbose output -make test-verbose +cargo run --bin http_server -- --host 127.0.0.1 --port 8080 --storage-dir ./data/storage ``` -### Code Quality +Then check the server: -Before pushing to the repository, run the CI/CD checks locally: +```bash +curl http://127.0.0.1:8080/health +``` + +### Try the HTTP client example ```bash -# Run all CI/CD checks (formatting, linting, tests, build) -make ci-check +cargo run --example http_client_example +``` -# Or use the script directly - ./scripts/ci-check.sh``` +This example demonstrates: -This will run: -- **rustfmt** - Code formatting check -- **clippy** - Lint checks with warnings as errors -- **tests** - Full test suite -- **build** - Compilation check +- query registration +- query start and stop +- query inspection +- replay control +- WebSocket result consumption -Individual checks can also be run: +## Development + +### Common Commands ```bash -make fmt # Format code -make fmt-check # Check formatting -make lint # Run Clippy -make check # Run formatting and linting checks +make build # debug build +make release # optimized build +make test # full test suite +make test-verbose # verbose tests +make fmt # format code +make fmt-check # check formatting +make lint # clippy with warnings as errors +make check # formatting + linting +make ci-check # local CI script ``` -## Licence +### Examples -This code is copyrighted by Ghent University - imec and released under the MIT Licence +The repository includes runnable examples under [`examples/`](./examples), including: -## Contact +- [`examples/http_client_example.rs`](./examples/http_client_example.rs) +- [`examples/comparator_demo.rs`](./examples/comparator_demo.rs) +- [`examples/demo_dashboard.html`](./examples/demo_dashboard.html) + +## Project Layout -For any questions, please contact [Kush](mailto:mailkushbisen@gmail.com) or create an issue in the repository. +- [`src/api`](./src/api): query lifecycle and orchestration +- [`src/parsing`](./src/parsing): Janus-QL parsing +- [`src/stream`](./src/stream): live stream processing +- [`src/execution`](./src/execution): historical execution +- [`src/storage`](./src/storage): segmented RDF storage +- [`src/http`](./src/http): REST and WebSocket API +- [`tests`](./tests): integration and parser coverage + +## License + +This project is released under the MIT License. + +## Contact ---- +For questions, open an issue or contact [Kush](mailto:mailkushbisen@gmail.com). diff --git a/docs/ANOMALY_DETECTION.md b/docs/ANOMALY_DETECTION.md new file mode 100644 index 0000000..cb65023 --- /dev/null +++ b/docs/ANOMALY_DETECTION.md @@ -0,0 +1,84 @@ +# Anomaly Detection + +Janus already supports anomaly-oriented extension functions, but they are stateless functions evaluated within one query execution context. + +That distinction matters. + +## What Extension Functions Are Good At + +Current extension functions are sufficient for: + +- fixed thresholds +- relative change checks +- z-score style checks when mean and sigma are already present +- simple outlier or divergence predicates over current bindings + +This works well when the query already has everything it needs in one evaluation context. + +## Where Baselines Help + +Baselines help when live anomaly scoring depends on historical context such as: + +- deviation from normal behavior +- per-sensor baselines +- volatility comparison +- recent historical trend + +In those cases, Janus can bootstrap compact historical values into live static data and let the live query compare current readings against them. + +## What Janus Does Not Do + +Janus does not currently maintain a full continuously updated hybrid historical/live relation. + +So if you need: + +- long-running stateful models +- full seasonal context +- large retained historical buffers inside the engine + +you will need either: + +- external model state +- future dedicated baseline refresh logic +- more specialized stateful operators + +## Recommended Pattern + +For a first anomaly-detection pipeline in Janus: + +1. Use a historical query that emits one compact row per anchor. +2. Materialize baseline values such as `mean` and `sigma`. +3. Join those values in the live query using `baseline:*` predicates. +4. Apply extension functions on the live side. + +Example: + +```sparql +PREFIX ex: +PREFIX janus: +PREFIX baseline: + +REGISTER RStream ex:out AS +SELECT ?sensor ?reading +FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000] +FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000] +USING BASELINE ex:hist LAST +WHERE { + WINDOW ex:hist { + ?sensor ex:mean ?mean . + ?sensor ex:sigma ?sigma . + } + WINDOW ex:live { + ?sensor ex:hasReading ?reading . + } + ?sensor baseline:mean ?mean . + ?sensor baseline:sigma ?sigma . + FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3)) +} +``` + +## Choosing LAST vs AGGREGATE + +- Use `LAST` when you care about the most recent historical regime before live execution. +- Use `AGGREGATE` when you want a more stable summary across multiple historical sliding windows. +- Prefer fixed historical windows unless you have a clear reason to derive a baseline from many historical subwindows. diff --git a/docs/BASELINES.md b/docs/BASELINES.md new file mode 100644 index 0000000..4e0f7c5 --- /dev/null +++ b/docs/BASELINES.md @@ -0,0 +1,128 @@ +# Baselines + +Baseline support in Janus is meant for hybrid anomaly-style queries where historical data initializes context for live scoring. + +It is not a full hybrid-state engine. + +## What Baseline Bootstrap Does + +When a query has: + +- at least one historical window +- at least one live window +- a baseline-aware query shape, typically with `baseline:*` joins + +Janus can evaluate the historical side, collapse the result into compact baseline statements, and insert those statements into the live processor as static data. + +The live query then joins against those static triples. + +## How It Is Enabled + +Preferred query-level form: + +```sparql +USING BASELINE ex:hist LAST +``` + +or: + +```sparql +USING BASELINE ex:hist AGGREGATE +``` + +If the clause is missing, registration can still provide: + +- `baseline_mode = aggregate` +- `baseline_mode = last` + +The query-level clause takes precedence when present. + +## LAST vs AGGREGATE + +### LAST + +For a historical sliding window: + +- only the final sliding-window result snapshot is retained +- earlier window outputs are discarded for baseline collapse + +This is useful when you want: + +- the most recent historical regime +- a low-ambiguity startup baseline + +### AGGREGATE + +For a historical sliding window: + +- all historical sliding-window outputs are folded into one compact baseline +- numeric values are averaged per `(anchor, variable)` +- non-numeric values fall back to the latest seen value + +This is useful when you want: + +- a broader recent historical summary +- less sensitivity to the last historical subwindow + +## Fixed Historical Windows + +For a fixed historical window, the distinction between `LAST` and `AGGREGATE` is much smaller because there is only one historical result set. + +In practice: + +- fixed historical baseline is usually the simplest and clearest baseline path +- historical sliding baseline is more advanced and can cost more at startup + +## Async Warm-Up + +Janus now warms baseline state asynchronously. + +Behavior: + +1. live execution starts immediately +2. query status becomes `WarmingBaseline` +3. baseline bootstrap runs in a background thread +4. baseline triples are inserted into live static data +5. query status moves to `Running` + +Effect on query results: + +- a live query that depends on baseline joins typically produces no matches until the baseline is ready +- once baseline static data exists, future live evaluations can match those joins + +## What Janus Stores + +Janus does not retain all historical events or all historical sliding-window outputs as permanent runtime state. + +For baseline bootstrap it retains: + +- a compact accumulator keyed by `(anchor, variable)` during bootstrap +- then final static baseline triples inside live processing + +It does not retain: + +- all raw historical events in memory +- all sliding-window result batches after bootstrap +- a continuously merged historical/live relation + +## Anchor Selection + +Baseline values are materialized per anchor subject. + +The current implementation prefers binding variables named: + +- `sensor` +- `subject` +- `entity` +- `s` + +If none of those exist, Janus falls back to the first IRI-like binding it can find. + +This means historical baseline queries work best when they explicitly return a stable anchor variable such as `?sensor`. + +## Recommended Usage + +- Prefer fixed historical windows first. +- Use historical sliding windows only when you need a baseline derived from multiple historical subwindows. +- Keep baseline queries compact, ideally one row per anchor. +- Start with baseline values such as `mean` and `sigma`; add `slope` or quantiles later if needed. diff --git a/docs/DOCUMENTATION_INDEX.md b/docs/DOCUMENTATION_INDEX.md index 51916b0..26713f7 100644 --- a/docs/DOCUMENTATION_INDEX.md +++ b/docs/DOCUMENTATION_INDEX.md @@ -1,87 +1,62 @@ -# Janus HTTP API - Documentation Index - -## Getting Started - -1. **START_HERE.md** - 🚀 BEGIN HERE - Quick start guide -2. **scripts/test_setup.sh** - Automated setup script -3. **docker-compose.yml** - MQTT broker configuration - -## Quick Reference - -4. **QUICK_REFERENCE.md** - One-page cheat sheet -5. **FINAL_TEST.md** - Test verification steps -6. **RUNTIME_FIX_SUMMARY.md** - Runtime panic fix explanation - -## Complete Guides - -7. **SETUP_GUIDE.md** - Comprehensive setup with MQTT -8. **README_HTTP_API.md** - Complete API documentation -9. **COMPLETE_SOLUTION.md** - Full implementation details -10. **HTTP_API_IMPLEMENTATION.md** - Technical architecture - -## Code - -11. **src/http/server.rs** - HTTP server implementation (537 lines) -12. **src/http/mod.rs** - Module exports -13. **src/bin/http_server.rs** - Server binary (111 lines) -14. **examples/http_client_example.rs** - Client example (370 lines) -15. **examples/demo_dashboard.html** - Interactive dashboard (670 lines) - -## Configuration - -16. **docker/mosquitto/config/mosquitto.conf** - MQTT broker config -17. **Cargo.toml** - Dependencies (axum, tower-http, tokio-tungstenite, etc.) - -## How to Use This Documentation - -### If you're brand new: -→ Read **START_HERE.md** - -### If you want quick commands: -→ Read **QUICK_REFERENCE.md** - -### If you see runtime panics: -→ Read **RUNTIME_FIX_SUMMARY.md** - -### If you need detailed setup: -→ Read **SETUP_GUIDE.md** - -### If you want to understand the API: -→ Read **README_HTTP_API.md** - -### If you need implementation details: -→ Read **COMPLETE_SOLUTION.md** or **HTTP_API_IMPLEMENTATION.md** - -### If you want to verify everything works: -→ Follow **FINAL_TEST.md** - -## File Sizes - -``` -START_HERE.md ~1 KB (Quick start) -QUICK_REFERENCE.md ~2 KB (Cheat sheet) -RUNTIME_FIX_SUMMARY.md ~3 KB (Fix explanation) -FINAL_TEST.md ~3 KB (Testing guide) -SETUP_GUIDE.md ~18 KB (Detailed setup) -README_HTTP_API.md ~15 KB (API guide) -COMPLETE_SOLUTION.md ~9 KB (Solution summary) -HTTP_API_IMPLEMENTATION.md ~19 KB (Technical details) - -src/http/server.rs ~15 KB (Server code) -examples/demo_dashboard.html ~20 KB (Dashboard) -examples/http_client_example.rs ~11 KB (Client example) -``` - -## Priority Reading Order - -1. START_HERE.md -2. QUICK_REFERENCE.md -3. SETUP_GUIDE.md (if needed) -4. README_HTTP_API.md (for API details) - -The rest are reference materials for specific needs. - ---- - -**Total: ~115 KB of documentation + ~50 KB of code** -**Everything you need to use Janus HTTP API successfully!** +# Janus Documentation Index + +This is the shortest path to understanding the current Janus implementation. + +## Core Reading Order + +1. [../README.md](../README.md) +2. [JANUSQL.md](./JANUSQL.md) +3. [QUERY_EXECUTION.md](./QUERY_EXECUTION.md) +4. [BASELINES.md](./BASELINES.md) +5. [HTTP_API_CURRENT.md](./HTTP_API_CURRENT.md) +6. [ANOMALY_DETECTION.md](./ANOMALY_DETECTION.md) + +## What Each File Covers + +- [JANUSQL.md](./JANUSQL.md) + - query structure + - supported window types + - `USING BASELINE LAST|AGGREGATE` + - how live and historical queries are derived + +- [QUERY_EXECUTION.md](./QUERY_EXECUTION.md) + - registration and parsed metadata + - `start_query()` flow + - historical workers + - live workers and MQTT subscription + - result multiplexing and runtime status + +- [BASELINES.md](./BASELINES.md) + - what baseline bootstrap does + - `LAST` vs `AGGREGATE` + - async warm-up behavior + - what state is and is not retained + +- [HTTP_API_CURRENT.md](./HTTP_API_CURRENT.md) + - current REST endpoints + - WebSocket result flow + - request and response shapes + - `baseline_mode` registration fallback + +- [ANOMALY_DETECTION.md](./ANOMALY_DETECTION.md) + - when extension functions are enough + - when baseline state helps + - recommended query patterns + +## Legacy Material + +The following files remain useful as background, but they are not the main entrypoint for the current code: + +- [ARCHITECTURE.md](./ARCHITECTURE.md) +- [EXECUTION_ARCHITECTURE.md](./EXECUTION_ARCHITECTURE.md) +- [HTTP_API.md](./HTTP_API.md) +- [README_HTTP_API.md](./README_HTTP_API.md) +- [SETUP_GUIDE.md](./SETUP_GUIDE.md) + +## Related Code + +- [../src/parsing/janusql_parser.rs](../src/parsing/janusql_parser.rs) +- [../src/api/janus_api.rs](../src/api/janus_api.rs) +- [../src/http/server.rs](../src/http/server.rs) +- [../src/stream/live_stream_processing.rs](../src/stream/live_stream_processing.rs) +- [../src/execution/historical_executor.rs](../src/execution/historical_executor.rs) diff --git a/docs/HTTP_API_CURRENT.md b/docs/HTTP_API_CURRENT.md new file mode 100644 index 0000000..40af504 --- /dev/null +++ b/docs/HTTP_API_CURRENT.md @@ -0,0 +1,124 @@ +# HTTP API + +This document describes the current Janus HTTP and WebSocket API. + +The server binary is: + +```bash +cargo run --bin http_server -- --host 127.0.0.1 --port 8080 --storage-dir ./data/storage +``` + +## Endpoints + +### Health + +`GET /health` + +Returns a simple success payload. + +### Register Query + +`POST /api/queries` + +Request body: + +```json +{ + "query_id": "anomaly_q1", + "janusql": "PREFIX ex: ...", + "baseline_mode": "aggregate" +} +``` + +`baseline_mode` is optional and accepts: + +- `aggregate` +- `last` + +If the Janus-QL query contains `USING BASELINE ...`, that query-level clause overrides this registration default at execution time. + +### List Queries + +`GET /api/queries` + +Response shape: + +```json +{ + "queries": ["q1", "q2"], + "total": 2 +} +``` + +### Get Query Details + +`GET /api/queries/:id` + +Response includes: + +- `query_id` +- `query_text` +- `baseline_mode` +- `registered_at` +- `execution_count` +- `is_running` +- `status` + +Possible `status` values include: + +- `Registered` +- `WarmingBaseline` +- `Running` +- `Stopped` +- `Failed(...)` + +### Start Query + +`POST /api/queries/:id/start` + +Starts execution and creates the internal forwarder used for WebSocket subscribers. + +### Stop Query + +`POST /api/queries/:id/stop` + +Stops a running query. + +### Delete Query + +`DELETE /api/queries/:id` + +Deletes a stopped query from the registry. + +### Stream Results + +`WS /api/queries/:id/results` + +WebSocket messages are JSON-encoded query results containing: + +- `query_id` +- `timestamp` +- `source` +- `bindings` + +`source` is either: + +- `Historical` +- `Live` + +## Typical Flow + +1. `POST /api/queries` +2. `POST /api/queries/:id/start` +3. Connect `WS /api/queries/:id/results` +4. Read query results +5. `POST /api/queries/:id/stop` +6. `DELETE /api/queries/:id` + +## Notes on Baseline Queries + +For baseline-backed hybrid queries: + +- the query may enter `WarmingBaseline` after start +- live execution still starts immediately +- results that depend on baseline joins may appear only after warm-up finishes diff --git a/docs/JANUSQL.md b/docs/JANUSQL.md new file mode 100644 index 0000000..9ec1a6d --- /dev/null +++ b/docs/JANUSQL.md @@ -0,0 +1,132 @@ +# Janus-QL + +Janus-QL is the query language Janus uses to describe historical windows, live windows, and hybrid queries. + +## Query Shape + +A Janus-QL query typically contains: + +- `PREFIX` declarations +- a `REGISTER` clause +- one or more `FROM NAMED WINDOW` clauses +- an optional `USING BASELINE` clause +- a `WHERE` clause with `WINDOW { ... }` blocks + +Example: + +```sparql +PREFIX ex: +PREFIX janus: +PREFIX baseline: + +REGISTER RStream ex:out AS +SELECT ?sensor ?reading +FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000] +FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000] +USING BASELINE ex:hist AGGREGATE +WHERE { + WINDOW ex:hist { + ?sensor ex:mean ?mean . + ?sensor ex:sigma ?sigma . + } + WINDOW ex:live { + ?sensor ex:hasReading ?reading . + } + ?sensor baseline:mean ?mean . + ?sensor baseline:sigma ?sigma . + FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3)) +} +``` + +## Supported Window Types + +### Live Sliding Window + +Use `ON STREAM` with `RANGE` and `STEP`. + +```sparql +FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000] +``` + +This becomes part of the generated RSP-QL query. + +### Historical Fixed Window + +Use `ON LOG` with `START` and `END`. + +```sparql +FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000] +``` + +This becomes a one-shot historical SPARQL execution over storage. + +### Historical Sliding Window + +Use `ON LOG` with `OFFSET`, `RANGE`, and `STEP`. + +```sparql +FROM NAMED WINDOW ex:hist ON LOG ex:store [OFFSET 3600000 RANGE 300000 STEP 300000] +``` + +This becomes a sequence of historical SPARQL executions over overlapping or stepped windows. + +## Baseline Clause + +Janus supports an optional clause: + +```sparql +USING BASELINE ex:hist LAST +``` + +or: + +```sparql +USING BASELINE ex:hist AGGREGATE +``` + +Semantics: + +- the clause must reference a historical window +- that historical window is used to bootstrap baseline values for the live query +- `LAST` and `AGGREGATE` control how historical sliding-window results are collapsed before they are exposed to live evaluation + +If the clause is absent, the HTTP/API registration-level `baseline_mode` is used as a fallback. + +## What Janus Generates Internally + +The parser splits the query into: + +- one live RSP-QL query built from live windows +- one SPARQL query per historical window + +Important detail: + +- non-window patterns in the `WHERE` clause are preserved in the live query +- this is what makes baseline joins like `?sensor baseline:mean ?mean` work during live execution + +## Baseline Predicates + +Baseline values are exposed to the live side as static triples under: + +```text +https://janus.rs/baseline# +``` + +So a historical binding: + +- `?sensor = ex:s1` +- `?mean = 21.5` + +becomes the static triple: + +```text +ex:s1 "21.5" +``` + +This is why live queries join on `baseline:*` predicates rather than directly reusing historical bindings. + +## Practical Guidance + +- Use fixed historical windows when you want one clean baseline snapshot. +- Use historical sliding windows only when you really need a baseline derived from multiple historical subwindows. +- Keep historical baseline queries compact. Prefer one row per anchor such as one row per sensor. diff --git a/docs/QUERY_EXECUTION.md b/docs/QUERY_EXECUTION.md new file mode 100644 index 0000000..1070bd3 --- /dev/null +++ b/docs/QUERY_EXECUTION.md @@ -0,0 +1,109 @@ +# Query Execution + +This document describes how Janus executes a registered query. + +## Main Components + +- `JanusQLParser`: parses Janus-QL and derives live and historical query fragments +- `QueryRegistry`: stores registered query metadata +- `JanusApi`: coordinates startup, workers, status, and result delivery +- `HistoricalExecutor`: executes SPARQL over stored historical data +- `LiveStreamProcessing`: executes the live RSP-QL query and holds live/static data +- HTTP server: exposes registration, control, inspection, and WebSocket result streaming + +## Registration + +Registration stores: + +- the raw Janus-QL query text +- the parsed query representation +- the default `baseline_mode` +- timestamps and execution counters + +Registration does not start execution. + +## Startup Flow + +`JanusApi::start_query()` does the following: + +1. Loads query metadata from the registry. +2. Checks whether the query is already running. +3. Creates one result channel for both historical and live results. +4. Spawns one historical worker per historical window. +5. Starts the live processor if the query has live windows. +6. Starts async baseline warm-up if the query has both historical and live windows. +7. Stores runtime handles and returns a `QueryHandle`. + +## Historical Execution + +Historical execution is per historical window. + +### Fixed Historical Window + +- one SPARQL execution +- one batch of historical bindings +- one `QueryResult` sent with source `Historical` + +### Sliding Historical Window + +- multiple SPARQL executions, one per computed window +- one result batch per window +- each batch sent with source `Historical` + +Historical execution is not merged into live state automatically unless baseline bootstrap is used. + +## Live Execution + +Live execution uses `LiveStreamProcessing`. + +Startup does the following: + +- creates the live processor from generated RSP-QL +- registers all live streams referenced by live windows +- starts the live processor +- spawns MQTT subscribers for each live stream +- spawns a worker that forwards emitted live bindings as `QueryResult { source: Live }` + +## Result Delivery + +Janus multiplexes historical and live results onto a single channel. + +Each `QueryResult` contains: + +- `query_id` +- `timestamp` +- `source`: `Historical` or `Live` +- `bindings` + +The HTTP server forwards these results into a broadcast channel and exposes them over WebSocket. + +## Runtime Status + +Current execution states are: + +- `Registered` +- `WarmingBaseline` +- `Running` +- `Stopped` +- `Failed(String)` +- `Completed` + +Important behavior: + +- hybrid live + historical baseline queries begin in `WarmingBaseline` +- live execution starts immediately +- status flips to `Running` when baseline warm-up finishes successfully + +## What State Janus Keeps + +Janus keeps runtime state for: + +- worker thread handles +- shutdown channels +- live processors +- MQTT subscribers +- query execution status + +Janus does not keep a fully merged historical + live relation as one continuously maintained execution state. + +If you use baselines, Janus keeps only the compact materialized baseline triples inside live static data, not all historical result rows. diff --git a/docs/README.md b/docs/README.md index 69adb5c..190afd8 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,112 +1,26 @@ # Janus Documentation -This directory contains comprehensive documentation for the Janus RDF Stream Processing engine. +This directory contains the project documentation for Janus. -## Core Documentation +Some older files in this directory are design notes, implementation logs, or milestone-specific writeups. The files below are the current starting point for understanding how Janus works today. -### Architecture & Design -- **[ARCHITECTURE.md](ARCHITECTURE.md)** - High-level system architecture and design principles -- **[MVP_ARCHITECTURE.md](MVP_ARCHITECTURE.md)** - Minimum Viable Product architecture details -- **[RSP_INTEGRATION_COMPLETE.md](RSP_INTEGRATION_COMPLETE.md)** - RSP-RS integration documentation +## Start Here -### Performance & Benchmarking -- **[BENCHMARK_RESULTS.md](BENCHMARK_RESULTS.md)** - Performance metrics and benchmark results -- **[WRITING_BENCHMARKS.md](WRITING_BENCHMARKS.md)** - Guide for writing performance benchmarks +- [DOCUMENTATION_INDEX.md](./DOCUMENTATION_INDEX.md): canonical reading order +- [JANUSQL.md](./JANUSQL.md): Janus-QL syntax and semantics +- [QUERY_EXECUTION.md](./QUERY_EXECUTION.md): how registration, startup, historical execution, live execution, and result delivery work +- [BASELINES.md](./BASELINES.md): `USING BASELINE`, `LAST`, `AGGREGATE`, and async warm-up +- [HTTP_API_CURRENT.md](./HTTP_API_CURRENT.md): current REST and WebSocket API +- [ANOMALY_DETECTION.md](./ANOMALY_DETECTION.md): recommended anomaly-detection patterns and limitations -### Features & Components -- **[STREAM_BUS_CLI.md](STREAM_BUS_CLI.md)** - Command-line interface documentation -- **[SPARQL_BINDINGS_UPGRADE.md](SPARQL_BINDINGS_UPGRADE.md)** - SPARQL structured bindings feature -- **[EXECUTION_ARCHITECTURE.md](EXECUTION_ARCHITECTURE.md)** - ✨ Query execution architecture (NEW) +## Supporting Material -### Getting Started -- **[MVP_QUICKSTART.md](MVP_QUICKSTART.md)** - Quick start guide for MVP features -- **[MVP_TODO.md](MVP_TODO.md)** - Current development roadmap and TODOs +- [ARCHITECTURE.md](./ARCHITECTURE.md): older high-level architecture notes +- [EXECUTION_ARCHITECTURE.md](./EXECUTION_ARCHITECTURE.md): historical execution design notes +- [HTTP_API.md](./HTTP_API.md): earlier HTTP API writeup +- [BENCHMARK_RESULTS.md](./BENCHMARK_RESULTS.md): benchmark data -## Recent Updates +## Notes -### Execution Architecture (Latest) -Built internal execution layer for historical and live query processing: -- `HistoricalExecutor` for querying historical RDF data with SPARQL -- `ResultConverter` for unified result formatting -- Supports both fixed and sliding windows -- Thread-safe with message passing architecture -- 12 comprehensive unit tests -- See [EXECUTION_ARCHITECTURE.md](EXECUTION_ARCHITECTURE.md) for details - -### SPARQL Structured Bindings -Enhanced `OxigraphAdapter` with `execute_query_bindings()` method for structured SPARQL results: -- Returns `Vec>` instead of debug format strings -- 12 comprehensive tests covering all query types -- Full backward compatibility maintained -- See [SPARQL_BINDINGS_UPGRADE.md](SPARQL_BINDINGS_UPGRADE.md) for details - -## Quick Links - -### Development -```bash -# Build project -make build - -# Run tests -make test - -# Format code -make fmt - -# Run clippy -make clippy -``` - -### Testing -```bash -# Run all tests -cargo test - -# Run specific test file -cargo test --test oxigraph_adapter_test - -# Run with output -cargo test -- --nocapture -``` - -### Documentation -```bash -# Build and view docs -cargo doc --no-deps --open - -# Check docs build -cargo doc --no-deps --package janus -``` - -## Project Structure - -``` -janus/ -├── src/ -│ ├── core/ # Core RDF event types and encoding -│ ├── storage/ # Storage engine and indexing -│ ├── execution/ # Query execution (historical + live) -│ ├── querying/ # SPARQL query processing (Oxigraph) -│ ├── parsing/ # JanusQL parser -│ ├── api/ # Public API layer -│ └── stream_bus/ # Event streaming infrastructure -├── tests/ # Integration tests -├── examples/ # Benchmark examples -└── docs/ # This directory -``` - -## Contributing - -When adding new features: -1. Follow patterns in existing code -2. Add comprehensive tests (aim for >80% coverage) -3. Update relevant documentation -4. Run `make fmt` and `make clippy` before committing -5. Add changelog entry to this README if significant - -## Support - -For questions or issues: -- Check existing documentation first -- Review test files for usage examples -- See `.github/copilot-instructions.md` for coding standards \ No newline at end of file +- The canonical docs above are intended to describe the current implementation on `main` once merged. +- Older files are still useful for background, but they may describe previous milestones or implementation states. diff --git a/examples/http_client_example.rs b/examples/http_client_example.rs index 91effde..ddebc8b 100644 --- a/examples/http_client_example.rs +++ b/examples/http_client_example.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; struct RegisterQueryRequest { query_id: String, janusql: String, + baseline_mode: Option, } #[derive(Debug, Deserialize)] @@ -44,6 +45,7 @@ struct ListQueriesResponse { struct QueryDetailsResponse { query_id: String, query_text: String, + baseline_mode: String, registered_at: u64, execution_count: u64, is_running: bool, @@ -121,6 +123,7 @@ async fn main() -> Result<(), Box> { } "# .to_string(), + baseline_mode: None, }; let response = client @@ -156,6 +159,7 @@ async fn main() -> Result<(), Box> { } "# .to_string(), + baseline_mode: None, }; let response = client @@ -199,6 +203,7 @@ async fn main() -> Result<(), Box> { println!(" ✓ Query ID: {}", body.query_id); println!(" ✓ Registered at: {}", body.registered_at); println!(" ✓ Execution count: {}", body.execution_count); + println!(" ✓ Baseline mode: {}", body.baseline_mode); println!(" ✓ Is running: {}", body.is_running); println!(" ✓ Status: {}", body.status); } else { diff --git a/src/api/janus_api.rs b/src/api/janus_api.rs index 942a7e0..aaebd25 100644 --- a/src/api/janus_api.rs +++ b/src/api/janus_api.rs @@ -1,8 +1,9 @@ use crate::{ + core::RDFEvent, execution::{HistoricalExecutor, ResultConverter}, parsing::janusql_parser::{JanusQLParser, WindowType}, querying::oxigraph_adapter::OxigraphAdapter, - registry::query_registry::{QueryId, QueryMetadata, QueryRegistry}, + registry::query_registry::{BaselineBootstrapMode, QueryId, QueryMetadata, QueryRegistry}, storage::segmented_storage::StreamingSegmentedStorage, stream::{ live_stream_processing::LiveStreamProcessing, @@ -10,6 +11,7 @@ use crate::{ }, }; use std::{ + cmp::Ordering, collections::HashMap, sync::{ mpsc::{self, Receiver, Sender}, @@ -18,6 +20,16 @@ use std::{ thread, }; +const JANUS_BASELINE_NS: &str = "https://janus.rs/baseline#"; + +#[derive(Debug, Clone)] +struct BaselineAggregate { + last_value: String, + numeric_sum: f64, + numeric_count: usize, + all_numeric: bool, +} + /// The Query Result created from a query execution of a JanusQL query. #[derive(Debug, Clone)] pub struct QueryResult { @@ -85,6 +97,7 @@ struct RunningQuery { subscribers: Vec>, // thread handles for historical and live workers historical_handles: Vec>, + baseline_handle: Option>, live_handle: Option>, mqtt_subscriber_handle: Option>, // shutdown sender signals used to stop the workers @@ -96,6 +109,7 @@ struct RunningQuery { #[allow(dead_code)] #[derive(Debug, Clone, PartialEq)] pub enum ExecutionStatus { + WarmingBaseline, Running, Stopped, Failed(String), @@ -129,13 +143,22 @@ impl JanusApi { &self, query_id: QueryId, janusql: &str, + ) -> Result { + self.register_query_with_baseline_mode(query_id, janusql, BaselineBootstrapMode::Aggregate) + } + + pub fn register_query_with_baseline_mode( + &self, + query_id: QueryId, + janusql: &str, + baseline_mode: BaselineBootstrapMode, ) -> Result { let parsed = self.parser.parse(janusql).map_err(|e| { JanusApiError::ParseError(format!("Failed to parse JanusQL query: {}", e)) })?; let metadata = self .registry - .register(query_id.clone(), janusql.to_string(), parsed) + .register(query_id.clone(), janusql.to_string(), parsed, baseline_mode) .map_err(|e| { JanusApiError::RegistryError(format!("Failed to register query: {}", e)) })?; @@ -192,8 +215,22 @@ impl JanusApi { let (result_tx, result_rx) = mpsc::channel::(); let parsed = &metadata.parsed; + let effective_baseline_mode = parsed + .baseline + .as_ref() + .map(|baseline| baseline.mode) + .unwrap_or(metadata.baseline_mode); + let effective_baseline_window = + parsed.baseline.as_ref().map(|baseline| baseline.window_name.clone()); let mut historical_handles = Vec::new(); let mut shutdown_senders = Vec::new(); + let status = Arc::new(RwLock::new( + if !parsed.live_windows.is_empty() && !parsed.historical_windows.is_empty() { + ExecutionStatus::WarmingBaseline + } else { + ExecutionStatus::Running + }, + )); // 4. Spawn historical worker threads (one per historical window) for (i, window) in parsed.historical_windows.iter().enumerate() { @@ -272,6 +309,7 @@ impl JanusApi { // 5. Spawn live worker thread and MQTT subscribers (if there are live windows) let mut mqtt_subscribers = Vec::new(); let mut mqtt_subscriber_handle = None; + let mut baseline_handle = None; let live_handle = if !parsed.live_windows.is_empty() && !parsed.rspql_query.is_empty() { let tx = result_tx.clone(); @@ -311,6 +349,62 @@ impl JanusApi { } } + if !parsed.historical_windows.is_empty() { + let storage = Arc::clone(&self.storage); + let parsed_clone = parsed.clone(); + let processor_for_baseline = Arc::clone(&live_processor); + let status_for_baseline = Arc::clone(&status); + let baseline_mode = effective_baseline_mode; + let baseline_window = effective_baseline_window.clone(); + let (baseline_shutdown_tx, baseline_shutdown_rx) = mpsc::channel::<()>(); + + baseline_handle = + Some(thread::spawn(move || { + match collect_query_baseline_statements( + &storage, + &parsed_clone, + baseline_mode, + baseline_window.as_deref(), + &baseline_shutdown_rx, + ) { + Ok(statements) => { + if baseline_shutdown_rx.try_recv().is_ok() { + return; + } + + if let Ok(mut processor) = processor_for_baseline.lock() { + if let Err(err) = materialize_static_baseline_statements( + &mut processor, + &statements, + ) { + eprintln!("Async baseline materialization error: {}", err); + if let Ok(mut state) = status_for_baseline.write() { + *state = ExecutionStatus::Failed(err.to_string()); + } + return; + } + } + + if let Ok(mut state) = status_for_baseline.write() { + if *state == ExecutionStatus::WarmingBaseline { + *state = ExecutionStatus::Running; + } + } + } + Err(err) => { + eprintln!("Async baseline warm-up error: {}", err); + if let Ok(mut state) = status_for_baseline.write() { + *state = ExecutionStatus::Failed(err.to_string()); + } + } + } + })); + + shutdown_senders.push(baseline_shutdown_tx); + } else if let Ok(mut state) = status.write() { + *state = ExecutionStatus::Running; + } + // Spawn MQTT subscriber for each live window for window in &live_windows { let (host, port, topic) = parse_mqtt_uri(&window.stream_name); @@ -379,10 +473,11 @@ impl JanusApi { // 6. Store running query information let running = RunningQuery { metadata, - status: Arc::new(RwLock::new(ExecutionStatus::Running)), + status, primary_sender: result_tx, subscribers: vec![], historical_handles, + baseline_handle, live_handle, mqtt_subscriber_handle, shutdown_senders, @@ -453,6 +548,257 @@ impl JanusApi { } } +fn collect_query_baseline_statements( + storage: &Arc, + parsed: &crate::parsing::janusql_parser::ParsedJanusQuery, + baseline_mode: BaselineBootstrapMode, + baseline_window_name: Option<&str>, + shutdown_rx: &Receiver<()>, +) -> Result, JanusApiError> { + if parsed.live_windows.is_empty() || parsed.historical_windows.is_empty() { + return Ok(Vec::new()); + } + + let executor = HistoricalExecutor::new(Arc::clone(storage), OxigraphAdapter::new()); + let mut statements = Vec::new(); + + for (index, window) in parsed.historical_windows.iter().enumerate() { + if shutdown_rx.try_recv().is_ok() { + return Ok(Vec::new()); + } + if baseline_window_name.is_some_and(|name| name != window.window_name) { + continue; + } + + let sparql_query = parsed.sparql_queries.get(index).ok_or_else(|| { + JanusApiError::ExecutionError(format!( + "Missing SPARQL query for historical window {}", + index + )) + })?; + + match window.window_type { + WindowType::HistoricalFixed => { + let bindings = executor.execute_fixed_window(window, sparql_query)?; + statements.extend(baseline_statements_from_bindings(&bindings)); + } + WindowType::HistoricalSliding => { + statements.extend(collect_sliding_window_baseline_statements( + &executor, + window, + sparql_query, + baseline_mode, + shutdown_rx, + )?); + } + WindowType::Live => {} + } + } + + Ok(statements) +} + +fn collect_sliding_window_baseline_statements( + executor: &HistoricalExecutor, + window: &crate::parsing::janusql_parser::WindowDefinition, + sparql_query: &str, + mode: BaselineBootstrapMode, + shutdown_rx: &Receiver<()>, +) -> Result, JanusApiError> { + let mut accumulator = HashMap::new(); + let mut saw_window = false; + + for window_result in executor.execute_sliding_windows(window, sparql_query) { + if shutdown_rx.try_recv().is_ok() { + return Ok(Vec::new()); + } + let bindings = window_result?; + saw_window = true; + + if mode == BaselineBootstrapMode::Last { + accumulator.clear(); + } + + accumulate_bindings_into_baseline(&mut accumulator, &bindings); + } + + if !saw_window { + return Ok(Vec::new()); + } + + Ok(baseline_statements_from_accumulator(&accumulator)) +} + +#[cfg(test)] +fn materialize_bindings_as_static_baseline( + processor: &mut LiveStreamProcessing, + bindings: &[HashMap], +) -> Result<(), JanusApiError> { + let statements = baseline_statements_from_bindings(bindings); + materialize_static_baseline_statements(processor, &statements) +} + +fn materialize_static_baseline_statements( + processor: &mut LiveStreamProcessing, + statements: &[(String, String, String)], +) -> Result<(), JanusApiError> { + for (subject, predicate, object) in statements { + processor + .add_static_data(RDFEvent::new(0, subject, predicate, object, "")) + .map_err(|e| { + JanusApiError::LiveProcessingError(format!( + "Failed to materialize baseline statement '{} {} {}': {}", + subject, predicate, object, e + )) + })?; + } + Ok(()) +} + +fn baseline_statements_from_bindings( + bindings: &[HashMap], +) -> Vec<(String, String, String)> { + let mut accumulator = HashMap::new(); + accumulate_bindings_into_baseline(&mut accumulator, bindings); + baseline_statements_from_accumulator(&accumulator) +} + +fn accumulate_bindings_into_baseline( + accumulator: &mut HashMap<(String, String), BaselineAggregate>, + bindings: &[HashMap], +) { + for binding in bindings { + let Some((anchor_var, anchor_subject)) = select_binding_anchor(binding) else { + continue; + }; + + let mut variables = binding.keys().cloned().collect::>(); + variables.sort_unstable(); + + for var in variables { + if var == anchor_var { + continue; + } + + let Some(raw_value) = binding.get(&var) else { + continue; + }; + + let normalized = normalize_binding_term(raw_value); + let key = (anchor_subject.clone(), var); + let entry = accumulator.entry(key).or_insert_with(|| BaselineAggregate { + last_value: normalized.clone(), + numeric_sum: 0.0, + numeric_count: 0, + all_numeric: true, + }); + + entry.last_value.clone_from(&normalized); + if let Ok(value) = normalized.parse::() { + entry.numeric_sum += value; + entry.numeric_count += 1; + } else { + entry.all_numeric = false; + } + } + } +} + +fn baseline_statements_from_accumulator( + accumulator: &HashMap<(String, String), BaselineAggregate>, +) -> Vec<(String, String, String)> { + let mut entries = accumulator.iter().collect::>(); + entries.sort_by(|((left_subject, left_var), _), ((right_subject, right_var), _)| { + match left_subject.cmp(right_subject) { + Ordering::Equal => left_var.cmp(right_var), + other => other, + } + }); + + entries + .into_iter() + .map(|((subject, var), aggregate)| { + let predicate = format!("{JANUS_BASELINE_NS}{var}"); + let object = if aggregate.all_numeric && aggregate.numeric_count > 0 { + (aggregate.numeric_sum / aggregate.numeric_count as f64).to_string() + } else { + aggregate.last_value.clone() + }; + (subject.clone(), predicate, object) + }) + .collect() +} + +fn select_binding_anchor(binding: &HashMap) -> Option<(String, String)> { + for preferred in ["sensor", "subject", "entity", "s"] { + if let Some(value) = binding.get(preferred).and_then(|raw| normalize_iri_term(raw)) { + return Some((preferred.to_string(), value)); + } + } + + let mut entries = binding.iter().collect::>(); + entries.sort_by(|(left_name, _), (right_name, _)| { + if left_name == right_name { + Ordering::Equal + } else { + left_name.cmp(right_name) + } + }); + + entries + .into_iter() + .find_map(|(name, raw)| normalize_iri_term(raw).map(|value| (name.clone(), value))) +} + +fn normalize_binding_term(raw: &str) -> String { + normalize_iri_term(raw) + .or_else(|| normalize_literal_term(raw)) + .unwrap_or_else(|| raw.trim().to_string()) +} + +fn normalize_iri_term(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.starts_with('<') && trimmed.ends_with('>') && trimmed.len() > 2 { + Some(trimmed[1..trimmed.len() - 1].to_string()) + } else if trimmed.starts_with("http://") || trimmed.starts_with("https://") { + Some(trimmed.to_string()) + } else { + None + } +} + +fn normalize_literal_term(raw: &str) -> Option { + let trimmed = raw.trim(); + if !trimmed.starts_with('"') { + return None; + } + + let mut escaped = false; + for (index, ch) in trimmed.char_indices().skip(1) { + if escaped { + escaped = false; + continue; + } + + match ch { + '\\' => escaped = true, + '"' => { + let lexical = &trimmed[1..index]; + return Some( + lexical + .replace("\\\"", "\"") + .replace("\\\\", "\\") + .replace("\\n", "\n") + .replace("\\t", "\t"), + ); + } + _ => {} + } + } + + None +} + /// Parses an MQTT stream URI into `(host, port, topic)`. /// /// Handles `mqtt://host:port/topic` and `mqtts://host:port/topic` directly. @@ -498,7 +844,12 @@ fn parse_mqtt_uri(stream_uri: &str) -> (String, u16, String) { #[cfg(test)] mod tests { - use super::parse_mqtt_uri; + use super::{ + baseline_statements_from_bindings, materialize_bindings_as_static_baseline, + normalize_binding_term, parse_mqtt_uri, JANUS_BASELINE_NS, + }; + use crate::{core::RDFEvent, stream::live_stream_processing::LiveStreamProcessing}; + use std::{collections::HashMap, thread, time::Duration}; #[test] fn test_parse_mqtt_uri_with_port() { @@ -539,4 +890,144 @@ mod tests { assert_eq!(port, 1883); assert_eq!(topic, "sensors"); } + + #[test] + fn test_normalize_binding_term_strips_iri_and_literal_wrappers() { + assert_eq!( + normalize_binding_term(""), + "http://example.org/sensor1" + ); + assert_eq!(normalize_binding_term("\"42.5\""), "42.5"); + assert_eq!( + normalize_binding_term("\"42.5\"^^"), + "42.5" + ); + } + + #[test] + fn test_materialized_baseline_static_data_can_drive_live_extension_functions() { + let query = format!( + r#" + PREFIX ex: + PREFIX janus: + PREFIX baseline: <{}> + REGISTER RStream AS + SELECT ?sensor ?reading + FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 1000 STEP 500] + WHERE {{ + WINDOW ex:w1 {{ + ?sensor ex:hasReading ?reading . + }} + ?sensor baseline:mean ?mean . + ?sensor baseline:sigma ?sigma . + FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3)) + }} + "#, + JANUS_BASELINE_NS + ); + + let mut processor = LiveStreamProcessing::new(query).unwrap(); + processor.register_stream("http://example.org/stream1").unwrap(); + + let mut binding = HashMap::new(); + binding.insert("sensor".to_string(), "".to_string()); + binding.insert( + "mean".to_string(), + "\"25\"^^".to_string(), + ); + binding.insert( + "sigma".to_string(), + "\"2\"^^".to_string(), + ); + + materialize_bindings_as_static_baseline(&mut processor, &[binding]).unwrap(); + processor.start_processing().unwrap(); + processor + .add_event( + "http://example.org/stream1", + RDFEvent::new( + 0, + "http://example.org/sensor1", + "http://example.org/hasReading", + "40", + "", + ), + ) + .unwrap(); + processor.close_stream("http://example.org/stream1", 3000).unwrap(); + thread::sleep(Duration::from_millis(300)); + + let results = processor.collect_results(None).unwrap(); + assert!( + results.iter().any(|result| result.bindings.contains("sensor1")), + "expected live result to join with materialized baseline static data, got {:?}", + results + ); + } + + #[test] + fn test_baseline_statements_from_bindings_aggregate_numeric_values() { + let bindings = vec![ + HashMap::from([ + ("sensor".to_string(), "".to_string()), + ( + "mean".to_string(), + "\"10\"^^".to_string(), + ), + ]), + HashMap::from([ + ("sensor".to_string(), "".to_string()), + ( + "mean".to_string(), + "\"20\"^^".to_string(), + ), + ]), + ]; + + let statements = baseline_statements_from_bindings(&bindings); + assert_eq!( + statements, + vec![( + "http://example.org/s1".to_string(), + format!("{JANUS_BASELINE_NS}mean"), + "15".to_string() + )] + ); + } + + #[test] + fn test_last_window_mode_overwrites_previous_window_values() { + let mut accumulator = HashMap::new(); + super::accumulate_bindings_into_baseline( + &mut accumulator, + &[HashMap::from([ + ("sensor".to_string(), "".to_string()), + ( + "mean".to_string(), + "\"10\"^^".to_string(), + ), + ])], + ); + accumulator.clear(); + super::accumulate_bindings_into_baseline( + &mut accumulator, + &[HashMap::from([ + ("sensor".to_string(), "".to_string()), + ( + "mean".to_string(), + "\"30\"^^".to_string(), + ), + ])], + ); + + let statements = super::baseline_statements_from_accumulator(&accumulator); + assert_eq!( + statements, + vec![( + "http://example.org/s1".to_string(), + format!("{JANUS_BASELINE_NS}mean"), + "30".to_string() + )] + ); + } } diff --git a/src/http/server.rs b/src/http/server.rs index bd8d0bc..8c1fead 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -5,7 +5,7 @@ use crate::{ api::janus_api::{JanusApi, JanusApiError, QueryHandle, QueryResult, ResultSource}, - registry::query_registry::{QueryId, QueryRegistry}, + registry::query_registry::{BaselineBootstrapMode, QueryId, QueryRegistry}, storage::segmented_storage::StreamingSegmentedStorage, stream_bus::{BrokerType, MqttConfig, StreamBus, StreamBusConfig}, }; @@ -38,6 +38,7 @@ const RESULT_BROADCAST_CAPACITY: usize = 1024; pub struct RegisterQueryRequest { pub query_id: String, pub janusql: String, + pub baseline_mode: Option, } /// Response after registering a query @@ -54,6 +55,7 @@ pub struct RegisterQueryResponse { pub struct QueryDetailsResponse { pub query_id: String, pub query_text: String, + pub baseline_mode: String, pub registered_at: u64, pub execution_count: u64, pub is_running: bool, @@ -176,6 +178,7 @@ impl Default for ReplayState { } /// Custom error type for API errors +#[derive(Debug)] pub enum ApiError { JanusError(JanusApiError), NotFound(String), @@ -257,7 +260,12 @@ async fn register_query( State(state): State>, Json(payload): Json, ) -> Result, ApiError> { - let metadata = state.janus_api.register_query(payload.query_id.clone(), &payload.janusql)?; + let baseline_mode = parse_baseline_mode(payload.baseline_mode.as_deref())?; + let metadata = state.janus_api.register_query_with_baseline_mode( + payload.query_id.clone(), + &payload.janusql, + baseline_mode, + )?; Ok(Json(RegisterQueryResponse { query_id: metadata.query_id, @@ -301,6 +309,7 @@ async fn get_query( Ok(Json(QueryDetailsResponse { query_id: metadata.query_id, query_text: metadata.query_text, + baseline_mode: format!("{:?}", metadata.baseline_mode), registered_at: metadata.registered_at, execution_count: metadata.execution_count, is_running, @@ -308,6 +317,17 @@ async fn get_query( })) } +fn parse_baseline_mode(raw: Option<&str>) -> Result { + match raw { + None | Some("aggregate" | "AGGREGATE") => Ok(BaselineBootstrapMode::Aggregate), + Some("last" | "LAST") => Ok(BaselineBootstrapMode::Last), + Some(other) => Err(ApiError::BadRequest(format!( + "Unsupported baseline_mode '{}'. Use 'aggregate' or 'last'", + other + ))), + } +} + /// POST /api/queries/:id/start - Start executing a query async fn start_query( State(state): State>, @@ -627,3 +647,19 @@ pub async fn start_server( Ok(()) } + +#[cfg(test)] +mod tests { + use super::parse_baseline_mode; + use crate::registry::query_registry::BaselineBootstrapMode; + + #[test] + fn test_parse_baseline_mode_defaults_to_aggregate() { + assert_eq!(parse_baseline_mode(None).unwrap(), BaselineBootstrapMode::Aggregate); + } + + #[test] + fn test_parse_baseline_mode_accepts_last() { + assert_eq!(parse_baseline_mode(Some("last")).unwrap(), BaselineBootstrapMode::Last); + } +} diff --git a/src/parsing/janusql_parser.rs b/src/parsing/janusql_parser.rs index b44cc57..923f49d 100644 --- a/src/parsing/janusql_parser.rs +++ b/src/parsing/janusql_parser.rs @@ -64,6 +64,13 @@ pub struct RegisterClause { pub name: String, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum BaselineBootstrapMode { + Last, + #[default] + Aggregate, +} + #[derive(Debug, Clone, PartialEq)] /// Structured window specification used by the AST. pub enum WindowSpec { @@ -81,6 +88,12 @@ pub struct WindowClause { pub spec: WindowSpec, } +#[derive(Debug, Clone, PartialEq)] +pub struct BaselineClause { + pub window_name: String, + pub mode: BaselineBootstrapMode, +} + #[derive(Debug, Clone, PartialEq)] /// One parsed `WINDOW foo { ... }` block from the WHERE clause. pub struct WhereWindowClause { @@ -93,6 +106,7 @@ pub struct WhereWindowClause { pub struct JanusQueryAst { pub prefixes: Vec, pub register: Option, + pub baseline: Option, pub select_clause: String, pub windows: Vec, pub where_clause: String, @@ -104,6 +118,8 @@ pub struct JanusQueryAst { pub struct ParsedJanusQuery { /// Structured AST representation of the parsed JanusQL query. pub ast: JanusQueryAst, + /// Optional baseline clause selecting a historical window and bootstrap mode. + pub baseline: Option, /// R2S operator if present pub r2s: Option, /// Live windows defined in the query @@ -136,6 +152,7 @@ impl JanusQLParser { let mut prefixes = Vec::new(); let mut prefix_mapper = HashMap::new(); let mut register = None; + let mut baseline = None; let mut select_clause = String::new(); let mut windows = Vec::new(); let mut in_where_clause = false; @@ -161,6 +178,8 @@ impl JanusQLParser { if trimmed_line.starts_with("REGISTER") { register = Some(self.parse_register_clause(trimmed_line, &prefix_mapper)?); + } else if trimmed_line.starts_with("USING BASELINE") { + baseline = Some(self.parse_baseline_clause(trimmed_line, &prefix_mapper)?); } else if trimmed_line.starts_with("PREFIX") { let prefix = self.parse_prefix_declaration(trimmed_line)?; prefix_mapper.insert(prefix.prefix.clone(), prefix.namespace.clone()); @@ -191,6 +210,7 @@ impl JanusQLParser { Ok(JanusQueryAst { prefixes, register, + baseline, select_clause, windows, where_clause, @@ -230,8 +250,21 @@ impl JanusQLParser { .clone() .map(|register| R2SOperator { operator: register.operator, name: register.name }); + if let Some(baseline) = &ast.baseline { + let has_matching_historical_window = historical_windows + .iter() + .any(|window| window.window_name == baseline.window_name); + if !has_matching_historical_window { + return Err(self.parse_error(format!( + "USING BASELINE references unknown historical window '{}'", + baseline.window_name + ))); + } + } + let mut parsed = ParsedJanusQuery { ast: ast.clone(), + baseline: ast.baseline.clone(), r2s, live_windows, historical_windows, @@ -250,6 +283,29 @@ impl JanusQLParser { Ok(parsed) } + fn parse_baseline_clause( + &self, + line: &str, + prefix_mapper: &HashMap, + ) -> Result> { + let parts = line.split_whitespace().collect::>(); + if parts.len() != 4 || parts[0] != "USING" || parts[1] != "BASELINE" { + return Err(self.parse_error(format!("Invalid USING BASELINE clause: {line}"))); + } + + let mode = match parts[3] { + "LAST" => BaselineBootstrapMode::Last, + "AGGREGATE" => BaselineBootstrapMode::Aggregate, + other => { + return Err(self.parse_error(format!( + "Unsupported baseline mode '{other}'. Use LAST or AGGREGATE" + ))) + } + }; + + Ok(BaselineClause { window_name: self.unwrap_iri(parts[2], prefix_mapper), mode }) + } + fn parse_register_clause( &self, line: &str, @@ -544,6 +600,11 @@ impl JanusQLParser { prefixes: &HashMap, ) -> String { let mut where_patterns = Vec::new(); + let non_window_patterns = self.extract_non_window_where_patterns(where_clause); + + if !non_window_patterns.is_empty() { + where_patterns.push(non_window_patterns); + } for window in live_windows { if let Some(inner_pattern) = self.find_window_body(where_windows, window, prefixes) { @@ -560,6 +621,90 @@ impl JanusQLParser { } } + fn extract_non_window_where_patterns(&self, where_clause: &str) -> String { + let inner = self.extract_where_inner(where_clause); + if inner.is_empty() { + return String::new(); + } + + let mut preserved = String::new(); + let mut offset = 0usize; + + while let Some(found) = inner[offset..].find("WINDOW") { + let start = offset + found; + preserved.push_str(&inner[offset..start]); + + let after_keyword = start + "WINDOW".len(); + let mut cursor = after_keyword; + + while let Some(ch) = inner[cursor..].chars().next() { + if ch.is_whitespace() { + cursor += ch.len_utf8(); + } else { + break; + } + } + + while let Some(ch) = inner[cursor..].chars().next() { + if ch.is_whitespace() || ch == '{' { + break; + } + cursor += ch.len_utf8(); + } + + while let Some(ch) = inner[cursor..].chars().next() { + if ch.is_whitespace() { + cursor += ch.len_utf8(); + } else { + break; + } + } + + if !inner[cursor..].starts_with('{') { + preserved.push_str("WINDOW"); + offset = after_keyword; + continue; + } + + let Some(body_end) = self.find_matching_brace(&inner, cursor) else { + preserved.push_str(&inner[start..]); + offset = inner.len(); + break; + }; + + offset = body_end + 1; + } + + if offset < inner.len() { + preserved.push_str(&inner[offset..]); + } + + preserved + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .collect::>() + .join("\n ") + } + + fn extract_where_inner(&self, where_clause: &str) -> String { + let trimmed = where_clause.trim(); + let without_where = trimmed + .strip_prefix("WHERE") + .or_else(|| trimmed.strip_prefix("where")) + .map_or(trimmed, str::trim); + + if without_where.starts_with('{') { + if let Some(end) = self.find_matching_brace(without_where, 0) { + if end == without_where.len() - 1 { + return without_where[1..end].trim().to_string(); + } + } + } + + without_where.to_string() + } + fn find_window_body<'a>( &self, where_windows: &'a [WhereWindowClause], diff --git a/src/registry/query_registry.rs b/src/registry/query_registry.rs index 0e951e7..3f0d716 100644 --- a/src/registry/query_registry.rs +++ b/src/registry/query_registry.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +pub use crate::parsing::janusql_parser::BaselineBootstrapMode; use crate::parsing::janusql_parser::ParsedJanusQuery; pub type QueryId = String; @@ -11,6 +12,7 @@ pub struct QueryMetadata { pub query_id: QueryId, pub query_text: String, pub parsed: ParsedJanusQuery, + pub baseline_mode: BaselineBootstrapMode, pub registered_at: u64, pub execution_count: u64, pub subscribers: Vec, @@ -75,6 +77,7 @@ impl QueryRegistry { query_id: QueryId, query_text: String, parsed: ParsedJanusQuery, + baseline_mode: BaselineBootstrapMode, ) -> Result { // Check if query ID already exists { @@ -96,6 +99,7 @@ impl QueryRegistry { query_id: query_id.clone(), query_text, parsed, + baseline_mode, registered_at: Self::current_timestamp(), execution_count: 0, subscribers: Vec::new(), diff --git a/tests/janusql_parser_test.rs b/tests/janusql_parser_test.rs index 12bbfab..a48570b 100644 --- a/tests/janusql_parser_test.rs +++ b/tests/janusql_parser_test.rs @@ -3,7 +3,9 @@ //! Tests for the JanusQL query parser, verifying parsing of window definitions, //! R2S operators, and query generation. -use janus::parsing::janusql_parser::{JanusQLParser, SourceKind, WindowSpec}; +use janus::parsing::janusql_parser::{ + BaselineBootstrapMode, JanusQLParser, SourceKind, WindowSpec, +}; #[test] fn test_basic_live_window() { @@ -213,3 +215,76 @@ fn test_parse_ast_extracts_window_body_with_nested_braces() { assert!(ast.where_windows[0].body.contains("FILTER(EXISTS")); assert!(ast.where_windows[0].body.contains("?sensor ex:meta ?meta")); } + +#[test] +fn test_live_query_preserves_non_window_patterns_for_static_joins() { + let parser = JanusQLParser::new().unwrap(); + let query = r#" + PREFIX ex: + PREFIX janus: + PREFIX baseline: + REGISTER RStream ex:out AS + SELECT ?sensor ?reading + FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1000 END 2000] + FROM NAMED WINDOW ex:live ON STREAM ex:stream [RANGE 500 STEP 100] + WHERE { + WINDOW ex:hist { + ?sensor ex:reading ?histReading . + } + WINDOW ex:live { + ?sensor ex:reading ?reading . + } + ?sensor baseline:mean ?mean . + ?sensor baseline:sigma ?sigma . + FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3)) + } + "#; + + let parsed = parser.parse(query).unwrap(); + assert!(parsed.rspql_query.contains("?sensor baseline:mean ?mean")); + assert!(parsed.rspql_query.contains("?sensor baseline:sigma ?sigma")); + assert!(parsed + .rspql_query + .contains("FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3))")); + assert!(parsed.rspql_query.contains("WINDOW ex:live")); + assert!(!parsed.rspql_query.contains("WINDOW ex:hist")); +} + +#[test] +fn test_parse_using_baseline_clause() { + let parser = JanusQLParser::new().unwrap(); + let query = r#" + PREFIX ex: + REGISTER RStream ex:out AS + SELECT ?sensor ?reading + FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1000 END 2000] + FROM NAMED WINDOW ex:live ON STREAM ex:stream [RANGE 500 STEP 100] + USING BASELINE ex:hist AGGREGATE + WHERE { + WINDOW ex:hist { ?sensor ex:mean ?mean } + WINDOW ex:live { ?sensor ex:reading ?reading } + } + "#; + + let parsed = parser.parse(query).unwrap(); + let baseline = parsed.baseline.expect("expected baseline clause"); + assert_eq!(baseline.window_name, "http://example.org/hist"); + assert_eq!(baseline.mode, BaselineBootstrapMode::Aggregate); +} + +#[test] +fn test_using_baseline_requires_known_historical_window() { + let parser = JanusQLParser::new().unwrap(); + let query = r#" + PREFIX ex: + SELECT ?sensor + FROM NAMED WINDOW ex:live ON STREAM ex:stream [RANGE 500 STEP 100] + USING BASELINE ex:missing LAST + WHERE { + WINDOW ex:live { ?sensor ex:value ?value } + } + "#; + + let result = parser.parse(query); + assert!(result.is_err()); +}