The Janus HTTP API provides REST endpoints for query management and WebSocket streaming for real-time results. It also includes stream bus replay control endpoints for demo and testing purposes.
Base URL: http://localhost:8080
# Build and run the HTTP server
cargo run --bin http_server
# With custom configuration
cargo run --bin http_server -- --host 0.0.0.0 --port 8080 --storage-dir ./data/storage# Run the comprehensive client example
cargo run --example http_client_exampleThe HTTP API server provides:
- REST Endpoints: JSON-based HTTP endpoints for query registration, lifecycle management, and replay control
- WebSocket Streaming: Real-time streaming of query results (both historical and live)
- CORS Support: Cross-Origin Resource Sharing enabled for dashboard integration
- Thread-Safe State: Shared state using
Arcfor concurrent access across async tasks
Health check endpoint to verify server is running.
Response:
{
"message": "Janus HTTP API is running"
}Register a new JanusQL query.
Request Body:
{
"query_id": "sensor_query_1",
"janusql": "PREFIX ex: <http://example.org/> SELECT ?sensor ?temp FROM NAMED WINDOW ex:histWindow ON STREAM ex:sensorStream [START 1704067200 END 1704153600] WHERE { WINDOW ex:histWindow { ?sensor ex:temperature ?temp . } }"
}Response (200 OK):
{
"query_id": "sensor_query_1",
"query_text": "SELECT ?sensor ?temp FROM...",
"registered_at": 1704067200,
"message": "Query registered successfully"
}Error Response (400 Bad Request):
{
"error": "Parse Error: Failed to parse JanusQL query: ..."
}List all registered queries.
Response:
{
"queries": [
"sensor_query_1",
"live_sensor_query",
"historical_analysis"
],
"total": 3
}Get details for a specific query.
Parameters:
id(path): Query identifier
Response:
{
"query_id": "sensor_query_1",
"query_text": "SELECT ?sensor ?temp FROM...",
"registered_at": 1704067200,
"execution_count": 5,
"is_running": true,
"status": "Running"
}Status Values:
Registered- Query registered but not startedRunning- Query is currently executingStopped- Query was stoppedFailed- Query execution failedCompleted- Query execution completed
Error Response (404 Not Found):
{
"error": "Query 'nonexistent' not found"
}Start executing a registered query.
Parameters:
id(path): Query identifier
Response:
{
"message": "Query 'sensor_query_1' started successfully"
}Error Responses:
Already Running (400):
{
"error": "Execution Error: Query 'sensor_query_1' is already running"
}Not Found (404):
{
"error": "Query 'sensor_query_1' not found"
}Stop a running query.
Parameters:
id(path): Query identifier
Response:
{
"message": "Query 'sensor_query_1' stopped successfully"
}Error Response (400 Bad Request):
{
"error": "Execution Error: Query 'sensor_query_1' is not running"
}Delete a stopped query from the registry.
Parameters:
id(path): Query identifier
Response:
{
"message": "Query 'sensor_query_1' deleted successfully"
}Error Response (400 Bad Request):
{
"error": "Query 'sensor_query_1' is running. Stop it before deleting."
}WebSocket endpoint for streaming query results in real-time.
Connection URL:
ws://localhost:8080/api/queries/sensor_query_1/results
Message Format:
{
"query_id": "sensor_query_1",
"timestamp": 1704067200000,
"source": "historical",
"bindings": [
{
"sensor": "http://example.org/sensor1",
"temp": "23.5"
}
]
}Source Types:
historical- Results from historical data processinglive- Results from live stream processing
JavaScript Example:
const ws = new WebSocket('ws://localhost:8080/api/queries/sensor_query_1/results');
ws.onmessage = (event) => {
const result = JSON.parse(event.data);
console.log(`[${result.source}] Query: ${result.query_id}`);
console.log(`Timestamp: ${result.timestamp}`);
console.log('Bindings:', result.bindings);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('WebSocket connection closed');
};Start the stream bus replay for ingesting RDF data.
Request Body:
{
"input_file": "data/sensors.nq",
"broker_type": "none",
"topics": ["sensors"],
"rate_of_publishing": 1000,
"loop_file": false,
"add_timestamps": true,
"kafka_config": null,
"mqtt_config": null
}Request Parameters:
input_file(required): Path to the N-Quads input filebroker_type(optional, default: "none"): Broker type - "kafka", "mqtt", or "none"topics(optional, default: ["janus"]): List of topic namesrate_of_publishing(optional, default: 1000): Events per second rate limitloop_file(optional, default: false): Whether to loop the file continuouslyadd_timestamps(optional, default: true): Add timestamps to eventskafka_config(optional): Kafka broker configurationmqtt_config(optional): MQTT broker configuration
Kafka Config:
{
"kafka_config": {
"bootstrap_servers": "localhost:9092",
"client_id": "janus_client",
"message_timeout_ms": "5000"
}
}MQTT Config:
{
"mqtt_config": {
"host": "localhost",
"port": 1883,
"client_id": "janus_client",
"keep_alive_secs": 30
}
}Response:
{
"message": "Stream bus replay started with file: data/sensors.nq"
}Error Response (400 Bad Request):
{
"error": "Replay is already running"
}Stop the currently running stream bus replay.
Response:
{
"message": "Stream bus replay stopped"
}Error Response (400 Bad Request):
{
"error": "Replay is not running"
}Get the current status of the stream bus replay.
Response (Running):
{
"is_running": true,
"events_read": 15420,
"events_published": 15420,
"events_stored": 15420,
"publish_errors": 0,
"storage_errors": 0,
"events_per_second": 1543.2,
"elapsed_seconds": 10.0
}Response (Not Running):
{
"is_running": false,
"events_read": 0,
"events_published": 0,
"events_stored": 0,
"publish_errors": 0,
"storage_errors": 0,
"events_per_second": 0.0,
"elapsed_seconds": 0.0
}curl -X POST http://localhost:8080/api/queries \
-H "Content-Type: application/json" \
-d '{
"query_id": "temp_query",
"janusql": "PREFIX ex: <http://example.org/> SELECT ?sensor ?temp FROM NAMED WINDOW ex:histWindow ON STREAM ex:sensorStream [START 1704067200 END 1704153600] WHERE { WINDOW ex:histWindow { ?sensor ex:temperature ?temp . } }"
}'curl http://localhost:8080/api/queriescurl http://localhost:8080/api/queries/temp_querycurl -X POST http://localhost:8080/api/queries/temp_query/startcurl -X DELETE http://localhost:8080/api/queries/temp_querycurl -X POST http://localhost:8080/api/replay/start \
-H "Content-Type: application/json" \
-d '{
"input_file": "data/sensors.nq",
"broker_type": "none",
"topics": ["sensors"],
"rate_of_publishing": 1000,
"loop_file": false,
"add_timestamps": true
}'curl http://localhost:8080/api/replay/statuscurl -X POST http://localhost:8080/api/replay/stopimport requests
import json
from websocket import create_connection
BASE_URL = "http://localhost:8080"
# Register a query
response = requests.post(
f"{BASE_URL}/api/queries",
json={
"query_id": "my_query",
"janusql": "PREFIX ex: <http://example.org/> SELECT ?s ?p ?o FROM NAMED WINDOW ex:histWindow ON STREAM ex:sensorStream [START 1704067200 END 1704153600] WHERE { WINDOW ex:histWindow { ?s ?p ?o . } }"
}
)
print(f"Register: {response.json()}")
# Start the query
response = requests.post(f"{BASE_URL}/api/queries/my_query/start")
print(f"Start: {response.json()}")
# Connect to WebSocket for results
ws = create_connection(f"ws://localhost:8080/api/queries/my_query/results")
# Receive results
for i in range(10):
result = ws.recv()
print(f"Result: {json.loads(result)}")
ws.close()
# Stop the query
response = requests.delete(f"{BASE_URL}/api/queries/my_query")
print(f"Stop: {response.json()}")const axios = require('axios');
const WebSocket = require('ws');
const BASE_URL = 'http://localhost:8080';
async function demo() {
// Register a query
const registerResponse = await axios.post(`${BASE_URL}/api/queries`, {
query_id: 'js_query',
janusql: 'PREFIX ex: <http://example.org/> SELECT ?s ?p ?o FROM NAMED WINDOW ex:histWindow ON STREAM ex:sensorStream [START 1704067200 END 1704153600] WHERE { WINDOW ex:histWindow { ?s ?p ?o . } }'
});
console.log('Registered:', registerResponse.data);
// Start the query
const startResponse = await axios.post(`${BASE_URL}/api/queries/js_query/start`);
console.log('Started:', startResponse.data);
// Connect to WebSocket
const ws = new WebSocket(`ws://localhost:8080/api/queries/js_query/results`);
ws.on('message', (data) => {
const result = JSON.parse(data);
console.log('Result:', result);
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
// Wait for results...
await new Promise(resolve => setTimeout(resolve, 10000));
ws.close();
// Stop the query
const stopResponse = await axios.delete(`${BASE_URL}/api/queries/js_query`);
console.log('Stopped:', stopResponse.data);
}
demo().catch(console.error);For a simple demo dashboard with "Start Replay" and "Start Query" buttons:
<!DOCTYPE html>
<html>
<head>
<title>Janus Demo Dashboard</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; }
button { padding: 10px 20px; margin: 10px; font-size: 16px; }
.success { color: green; }
.error { color: red; }
#results { margin-top: 20px; border: 1px solid #ccc; padding: 10px; max-height: 400px; overflow-y: auto; }
</style>
</head>
<body>
<h1>Janus RDF Stream Processing - Demo</h1>
<button id="startReplay" onclick="startReplay()">Start Replay</button>
<button id="stopReplay" onclick="stopReplay()" disabled>Stop Replay</button>
<br>
<button id="startQuery" onclick="startQuery()">Start Query</button>
<button id="stopQuery" onclick="stopQuery()" disabled>Stop Query</button>
<div id="status"></div>
<div id="results"></div>
<script>
const API_BASE = 'http://localhost:8080';
const QUERY_ID = 'demo_query';
let ws = null;
async function startReplay() {
try {
const response = await fetch(`${API_BASE}/api/replay/start`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
input_file: 'data/sensors.nq',
broker_type: 'none',
topics: ['sensors'],
rate_of_publishing: 1000,
loop_file: true,
add_timestamps: true
})
});
const data = await response.json();
if (response.ok) {
showStatus(data.message, 'success');
document.getElementById('startReplay').disabled = true;
document.getElementById('stopReplay').disabled = false;
pollReplayStatus();
} else {
showStatus(data.error, 'error');
}
} catch (error) {
showStatus(`Error: ${error.message}`, 'error');
}
}
async function stopReplay() {
try {
const response = await fetch(`${API_BASE}/api/replay/stop`, {
method: 'POST'
});
const data = await response.json();
if (response.ok) {
showStatus(data.message, 'success');
document.getElementById('startReplay').disabled = false;
document.getElementById('stopReplay').disabled = true;
} else {
showStatus(data.error, 'error');
}
} catch (error) {
showStatus(`Error: ${error.message}`, 'error');
}
}
async function startQuery() {
try {
// First register the query
const registerResponse = await fetch(`${API_BASE}/api/queries`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query_id: QUERY_ID,
janusql: 'PREFIX ex: <http://example.org/> SELECT ?sensor ?temp FROM NAMED WINDOW ex:histWindow ON STREAM ex:sensorStream [START 1704067200 END 1704153600] WHERE { WINDOW ex:histWindow { ?sensor ex:temperature ?temp . } }'
})
});
if (!registerResponse.ok && registerResponse.status !== 400) {
throw new Error('Failed to register query');
}
// Start the query
const startResponse = await fetch(`${API_BASE}/api/queries/${QUERY_ID}/start`, {
method: 'POST'
});
const data = await startResponse.json();
if (startResponse.ok) {
showStatus(data.message, 'success');
document.getElementById('startQuery').disabled = true;
document.getElementById('stopQuery').disabled = false;
connectWebSocket();
} else {
showStatus(data.error, 'error');
}
} catch (error) {
showStatus(`Error: ${error.message}`, 'error');
}
}
async function stopQuery() {
try {
const response = await fetch(`${API_BASE}/api/queries/${QUERY_ID}`, {
method: 'DELETE'
});
const data = await response.json();
if (response.ok) {
showStatus(data.message, 'success');
document.getElementById('startQuery').disabled = false;
document.getElementById('stopQuery').disabled = true;
if (ws) ws.close();
} else {
showStatus(data.error, 'error');
}
} catch (error) {
showStatus(`Error: ${error.message}`, 'error');
}
}
function connectWebSocket() {
ws = new WebSocket(`ws://localhost:8080/api/queries/${QUERY_ID}/results`);
ws.onmessage = (event) => {
const result = JSON.parse(event.data);
displayResult(result);
};
ws.onerror = (error) => {
showStatus('WebSocket error', 'error');
};
ws.onclose = () => {
showStatus('WebSocket closed', 'success');
};
}
function pollReplayStatus() {
const interval = setInterval(async () => {
try {
const response = await fetch(`${API_BASE}/api/replay/status`);
const data = await response.json();
if (!data.is_running) {
clearInterval(interval);
return;
}
document.getElementById('status').innerHTML = `
<div class="success">
<strong>Replay Status:</strong><br>
Events Read: ${data.events_read}<br>
Events Stored: ${data.events_stored}<br>
Rate: ${data.events_per_second.toFixed(2)} events/sec<br>
Elapsed: ${data.elapsed_seconds.toFixed(2)}s
</div>
`;
} catch (error) {
clearInterval(interval);
}
}, 1000);
}
function showStatus(message, type) {
const statusDiv = document.getElementById('status');
statusDiv.className = type;
statusDiv.innerHTML = `<p><strong>${message}</strong></p>`;
}
function displayResult(result) {
const resultsDiv = document.getElementById('results');
const resultHtml = `
<div style="border-bottom: 1px solid #eee; padding: 5px;">
<strong>[${result.source}]</strong>
Timestamp: ${result.timestamp}<br>
Bindings: ${JSON.stringify(result.bindings)}
</div>
`;
resultsDiv.innerHTML = resultHtml + resultsDiv.innerHTML;
}
</script>
</body>
</html>All error responses follow this format:
{
"error": "Descriptive error message"
}200 OK- Successful GET request201 Created- Successful resource creation400 Bad Request- Invalid request or operation not allowed404 Not Found- Resource not found500 Internal Server Error- Server-side error
Usage: http_server [OPTIONS]
Options:
-H, --host <HOST>
Server host address [default: 127.0.0.1]
-p, --port <PORT>
Server port [default: 8080]
-s, --storage-dir <STORAGE_DIR>
Storage directory path [default: ./data/storage]
--max-batch-size-bytes <MAX_BATCH_SIZE_BYTES>
Maximum batch size in bytes [default: 10485760]
--flush-interval-ms <FLUSH_INTERVAL_MS>
Flush interval in milliseconds [default: 5000]
--max-total-memory-mb <MAX_TOTAL_MEMORY_MB>
Maximum total memory in MB [default: 1024]-
WebSocket Connections: Each active query can have multiple WebSocket connections. Results are broadcast to all connected clients.
-
Query Handles: Query handles are stored in memory. Consider resource limits when running many concurrent queries.
-
Stream Bus Replay: Running replay at high rates (>10,000 events/sec) may impact query performance. Adjust
rate_of_publishingaccordingly. -
CORS: CORS is configured to allow all origins. In production, restrict this to specific domains.
WARNING: This API is designed for local development and demos. For production use:
- Add authentication/authorization
- Restrict CORS to specific origins
- Add rate limiting
- Use HTTPS/WSS instead of HTTP/WS
- Validate and sanitize all inputs
- Add request size limits
- Implement proper session management
Issue: Cannot connect to WebSocket endpoint
Solutions:
- Ensure query is registered and started before connecting
- Check that the query ID in the WebSocket URL matches the registered query
- Verify the server is running and accessible
- Check browser console for CORS or connection errors
Issue: WebSocket connects but no results received
Solutions:
- Verify stream bus replay is running (
GET /api/replay/status) - Check query syntax is valid
- Ensure historical data exists for the specified time window
- For live queries, ensure live stream is producing events
Issue: Replay start returns error
Solutions:
- Check that
input_filepath exists and is accessible - Verify no other replay is currently running
- Ensure broker configuration is correct if using Kafka/MQTT
- Check server logs for detailed error messages
For issues, feature requests, or questions:
- GitHub Issues: https://github.com/SolidLabResearch/janus/issues
- Documentation: https://github.com/SolidLabResearch/janus