Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 112 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,79 +1,150 @@
# Janus

Janus is a hybrid engine for unified Live and Historical RDF Stream Processing, implemented in Rust.
Janus is a Rust engine for unified historical and live RDF stream processing.

It combines:

- historical window evaluation over segmented RDF storage
- live window evaluation over incoming streams
- a single Janus-QL query model for hybrid queries
- an HTTP/WebSocket API for query lifecycle management and result delivery

The name comes from the Roman deity Janus, associated with transitions and with looking both backward and forward. That dual perspective matches Janus's goal: querying past and live RDF data together.

## What Janus Supports

- Historical windows with `START` / `END`
- Sliding live windows with `RANGE` / `STEP`
- Hybrid queries that mix historical and live windows
- Extension functions for anomaly-style predicates such as thresholds, relative change, z-score, outlier checks, and trend divergence
- Optional baseline bootstrapping for hybrid anomaly queries with `USING BASELINE <window> LAST|AGGREGATE`
- HTTP endpoints for registering, starting, stopping, listing, and deleting queries
- WebSocket result streaming for running queries

## Query Model

Janus uses Janus-QL, a hybrid query language for querying historical and live RDF data in one query.

Example:

```sparql
PREFIX ex: <http://example.org/>
PREFIX janus: <https://janus.rs/fn#>
PREFIX baseline: <https://janus.rs/baseline#>

REGISTER RStream ex:out AS
SELECT ?sensor ?reading
FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000]
FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000]
USING BASELINE ex:hist AGGREGATE
WHERE {
WINDOW ex:hist {
?sensor ex:mean ?mean .
?sensor ex:sigma ?sigma .
}
WINDOW ex:live {
?sensor ex:hasReading ?reading .
}
?sensor baseline:mean ?mean .
?sensor baseline:sigma ?sigma .
FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3))
}
```

`USING BASELINE` is optional. If present, Janus bootstraps baseline values from the named historical window before or during live execution:

The name "Janus" is inspired by the Roman deity Janus who is the guardian of doorways and transitions, and looks towards both the past and the future simultaneously. This dual perspective reflects Janus's capability to process both Historical and Live RDF streams in a unified manner utilizing a single query language and engine.
- `LAST`: use the final historical window snapshot as baseline
- `AGGREGATE`: merge the historical window outputs into one compact baseline

## Performance

Janus achieves high-throughput RDF stream processing with dictionary encoding and streaming segmented storage:
Janus uses dictionary encoding and segmented storage for high-throughput ingestion and historical reads.

- Write Throughput: 2.6-3.14 Million quads/sec
- Read Throughput: 2.7-2.77 Million quads/sec
- Point Query Latency: Sub-millisecond (0.235 ms at 1M quads)
- Space Efficiency: 40% reduction through dictionary encoding (24 bytes vs 40 bytes per event)
- Write throughput: 2.6-3.14 million quads/sec
- Read throughput: 2.7-2.77 million quads/sec
- Point query latency: 0.235 ms at 1M quads
- Space efficiency: about 40% smaller encoded events

For detailed benchmark results, see [BENCHMARK_RESULTS.md](./BENCHMARK_RESULTS.md).
Detailed benchmark data is in [BENCHMARK_RESULTS.md](./BENCHMARK_RESULTS.md).

## Development
## Quick Start

### Prerequisites

- Rust (stable toolchain)
- Rust stable
- Cargo

### Building
### Build

```bash
# Debug build
make build

# Release build (optimized)
make release
```

### Testing
### Run the HTTP API

```bash
# Run all tests
make test

# Run tests with verbose output
make test-verbose
cargo run --bin http_server -- --host 127.0.0.1 --port 8080 --storage-dir ./data/storage
```

### Code Quality
Then check the server:

Before pushing to the repository, run the CI/CD checks locally:
```bash
curl http://127.0.0.1:8080/health
```

### Try the HTTP client example

```bash
# Run all CI/CD checks (formatting, linting, tests, build)
make ci-check
cargo run --example http_client_example
```

# Or use the script directly
./scripts/ci-check.sh```
This example demonstrates:

This will run:
- **rustfmt** - Code formatting check
- **clippy** - Lint checks with warnings as errors
- **tests** - Full test suite
- **build** - Compilation check
- query registration
- query start and stop
- query inspection
- replay control
- WebSocket result consumption

Individual checks can also be run:
## Development

### Common Commands

```bash
make fmt # Format code
make fmt-check # Check formatting
make lint # Run Clippy
make check # Run formatting and linting checks
make build # debug build
make release # optimized build
make test # full test suite
make test-verbose # verbose tests
make fmt # format code
make fmt-check # check formatting
make lint # clippy with warnings as errors
make check # formatting + linting
make ci-check # local CI script
```

## Licence
### Examples

This code is copyrighted by Ghent University - imec and released under the MIT Licence
The repository includes runnable examples under [`examples/`](./examples), including:

## Contact
- [`examples/http_client_example.rs`](./examples/http_client_example.rs)
- [`examples/comparator_demo.rs`](./examples/comparator_demo.rs)
- [`examples/demo_dashboard.html`](./examples/demo_dashboard.html)

## Project Layout

For any questions, please contact [Kush](mailto:mailkushbisen@gmail.com) or create an issue in the repository.
- [`src/api`](./src/api): query lifecycle and orchestration
- [`src/parsing`](./src/parsing): Janus-QL parsing
- [`src/stream`](./src/stream): live stream processing
- [`src/execution`](./src/execution): historical execution
- [`src/storage`](./src/storage): segmented RDF storage
- [`src/http`](./src/http): REST and WebSocket API
- [`tests`](./tests): integration and parser coverage

## License

This project is released under the MIT License.

## Contact

---
For questions, open an issue or contact [Kush](mailto:mailkushbisen@gmail.com).
84 changes: 84 additions & 0 deletions docs/ANOMALY_DETECTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Anomaly Detection

Janus already supports anomaly-oriented extension functions, but they are stateless functions evaluated within one query execution context.

That distinction matters.

## What Extension Functions Are Good At

Current extension functions are sufficient for:

- fixed thresholds
- relative change checks
- z-score style checks when mean and sigma are already present
- simple outlier or divergence predicates over current bindings

This works well when the query already has everything it needs in one evaluation context.

## Where Baselines Help

Baselines help when live anomaly scoring depends on historical context such as:

- deviation from normal behavior
- per-sensor baselines
- volatility comparison
- recent historical trend

In those cases, Janus can bootstrap compact historical values into live static data and let the live query compare current readings against them.

## What Janus Does Not Do

Janus does not currently maintain a full continuously updated hybrid historical/live relation.

So if you need:

- long-running stateful models
- full seasonal context
- large retained historical buffers inside the engine

you will need either:

- external model state
- future dedicated baseline refresh logic
- more specialized stateful operators

## Recommended Pattern

For a first anomaly-detection pipeline in Janus:

1. Use a historical query that emits one compact row per anchor.
2. Materialize baseline values such as `mean` and `sigma`.
3. Join those values in the live query using `baseline:*` predicates.
4. Apply extension functions on the live side.

Example:

```sparql
PREFIX ex: <http://example.org/>
PREFIX janus: <https://janus.rs/fn#>
PREFIX baseline: <https://janus.rs/baseline#>

REGISTER RStream ex:out AS
SELECT ?sensor ?reading
FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000]
FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000]
USING BASELINE ex:hist LAST
WHERE {
WINDOW ex:hist {
?sensor ex:mean ?mean .
?sensor ex:sigma ?sigma .
}
WINDOW ex:live {
?sensor ex:hasReading ?reading .
}
?sensor baseline:mean ?mean .
?sensor baseline:sigma ?sigma .
FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3))
}
```

## Choosing LAST vs AGGREGATE

- Use `LAST` when you care about the most recent historical regime before live execution.
- Use `AGGREGATE` when you want a more stable summary across multiple historical sliding windows.
- Prefer fixed historical windows unless you have a clear reason to derive a baseline from many historical subwindows.
Loading
Loading