Skip to content

Commit 97c0144

Browse files
committed
feat: Add historical sliding and fixed window operators, their tests, and update stream module and architecture.
1 parent 154d7ff commit 97c0144

10 files changed

Lines changed: 690 additions & 17 deletions

ARCHITECTURE.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ graph TD
1414
API -->|Encode| Dictionary[Dictionary Encoding]
1515
Dictionary -->|Event IDs| BatchBuffer[Batch Buffer]
1616
17-
subgraph Storage Engine
17+
subgraph StorageSystem [Storage Engine]
1818
BatchBuffer -->|Flush| SegmentedStorage[Segmented Storage]
1919
SegmentedStorage -->|Write| DataFile[Data File .log]
2020
SegmentedStorage -->|Index| IndexFile[Index File .idx]
@@ -23,10 +23,10 @@ graph TD
2323
end
2424
2525
Query[Query Request] -->|Parse| Parser[JanusQL Parser]
26-
Parser -->|Execute| StorageEngine
26+
Parser -->|Execute| SegmentedStorage
2727
28-
StorageEngine -->|Search| BatchBuffer
29-
StorageEngine -->|Search| InMemoryIndex
28+
SegmentedStorage -->|Search| BatchBuffer
29+
SegmentedStorage -->|Search| InMemoryIndex
3030
InMemoryIndex -->|Locate| IndexFile
3131
IndexFile -->|Locate| DataFile
3232

CHANGELOG.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Changelog
2+
3+
All notable changes to the Janus project will be documented in this file.
4+
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## [Unreleased]
9+
10+
### Added
11+
- **Historical Sliding Window Operator** (`src/stream/operators/historical_sliding_window.rs`)
12+
- Implements `Iterator` pattern for sliding windows over historical data
13+
- Supports configurable width, slide, and offset parameters
14+
- Uses `SystemTime::now()` internally to determine window boundaries
15+
- Clamps window end to prevent querying beyond current time
16+
- Mandatory offset parameter for "go back" semantics
17+
18+
- **Historical Fixed Window Operator** (`src/stream/operators/historical_fixed_window.rs`)
19+
- Single-query operator for fixed time ranges [start, end]
20+
- Implements `Iterator` pattern (yields once, then returns `None`)
21+
- Enforces mandatory start and end timestamps
22+
23+
- **Integration Tests with Real RDF IRIs**
24+
- `tests/historical_sliding_window_test.rs`: Sensor data and FOAF examples
25+
- `tests/historical_fixed_window_test.rs`: IoT devices and semantic web publications
26+
- Uses standard vocabularies: RDF, FOAF, Dublin Core, BIBO
27+
28+
### Changed
29+
- Consolidated window definitions to use `WindowDefinition` from `janusql_parser.rs` consistently
30+
- Removed duplicate `HistoricalWindow` struct from `src/stream/operators/hs2r.rs`
31+
- Moved `historical_sliding_window.rs` into `src/stream/operators/` directory for better organization
32+
33+
### Fixed
34+
- Window end clamping to prevent querying future data in sliding windows
35+
- Type consistency across window operators
36+
37+
## [0.1.0] - Initial Release
38+
39+
### Added
40+
- Core RDF event processing
41+
- Segmented storage with two-level indexing
42+
- JanusQL parser with support for live, historical sliding, and historical fixed windows
43+
- Dictionary encoding for efficient RDF storage
44+
- Basic streaming infrastructure
45+
46+
---
47+
48+
**Note**: This changelog tracks changes starting from the implementation of historical window operators. Earlier changes may not be fully documented.

src/stream/historical_stream_processing.rs

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
pub struct LiveStreamProcessing {}
2+
#[derive(Debug)]
3+
pub struct LiveStreamProcessingError(String);
4+
5+
impl std::fmt::Display for LiveStreamProcessingError {
6+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
7+
write!(f, "LiveStreamProcessingError: {}", self.0)
8+
}
9+
}
10+
11+
impl std::error::Error for LiveStreamProcessingError {}
12+
13+
impl LiveStreamProcessing {
14+
pub fn new(rspql_query: String) -> Result<Self, LiveStreamProcessingError> {
15+
Ok(Self {})
16+
}
17+
}

src/stream/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod operators {
2+
pub mod historical_fixed_window;
3+
pub mod historical_sliding_window;
24
pub mod hs2r;
35
}
46

5-
pub mod historical_stream_processing;
67
pub mod live_stream_processing;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use crate::core::Event;
2+
use crate::parsing::janusql_parser::WindowDefinition;
3+
use crate::storage::segmented_storage::StreamingSegmentedStorage;
4+
use std::sync::Arc;
5+
6+
/// Operator for processing historical data with a fixed window.
7+
/// Unlike sliding windows, this queries a single fixed time range [start, end].
8+
pub struct HistoricalFixedWindowOperator {
9+
storage: Arc<StreamingSegmentedStorage>,
10+
window_def: WindowDefinition,
11+
has_yielded: bool,
12+
}
13+
14+
impl HistoricalFixedWindowOperator {
15+
/// Creates a new HistoricalFixedWindowOperator.
16+
///
17+
/// # Arguments
18+
///
19+
/// * `storage` - The storage backend to query.
20+
/// * `window_def` - The window definition with start and end timestamps.
21+
pub fn new(storage: Arc<StreamingSegmentedStorage>, window_def: WindowDefinition) -> Self {
22+
HistoricalFixedWindowOperator { storage, window_def, has_yielded: false }
23+
}
24+
}
25+
26+
impl Iterator for HistoricalFixedWindowOperator {
27+
type Item = Vec<Event>;
28+
29+
fn next(&mut self) -> Option<Self::Item> {
30+
// Fixed window only yields once
31+
if self.has_yielded {
32+
return None;
33+
}
34+
35+
// Start and end are mandatory for HistoricalFixed windows
36+
let start = self.window_def.start.expect("Start must be defined for HistoricalFixedWindow");
37+
let end = self.window_def.end.expect("End must be defined for HistoricalFixedWindow");
38+
39+
// Query the storage for events in the fixed window
40+
let events_result = self.storage.query(start, end);
41+
42+
self.has_yielded = true;
43+
44+
match events_result {
45+
Ok(events) => Some(events),
46+
Err(e) => {
47+
eprintln!("Error querying storage for fixed window: {}", e);
48+
None
49+
}
50+
}
51+
}
52+
}
53+
54+
#[cfg(test)]
55+
mod tests {
56+
use super::*;
57+
use crate::parsing::janusql_parser::WindowType;
58+
use crate::storage::util::StreamingConfig;
59+
use std::fs;
60+
61+
fn create_test_config(path: &str) -> StreamingConfig {
62+
StreamingConfig {
63+
segment_base_path: path.to_string(),
64+
max_batch_events: 10,
65+
max_batch_bytes: 1024,
66+
max_batch_age_seconds: 1,
67+
sparse_interval: 2,
68+
entries_per_index_block: 2,
69+
}
70+
}
71+
72+
#[test]
73+
fn test_historical_fixed_window() {
74+
let test_dir = "/tmp/janus_test_fixed_window";
75+
let _ = fs::remove_dir_all(test_dir);
76+
77+
let config = create_test_config(test_dir);
78+
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
79+
80+
// Write events at timestamps 100, 200, 300, 400, 500, 600
81+
for i in 1..=6 {
82+
storage.write_rdf(i * 100, "s", "p", "o", "g").unwrap();
83+
}
84+
85+
// Define Fixed Window: [200, 500]
86+
let window_def = WindowDefinition {
87+
window_name: "w1".to_string(),
88+
stream_name: "s1".to_string(),
89+
width: 0,
90+
slide: 0,
91+
offset: None,
92+
start: Some(200),
93+
end: Some(500),
94+
window_type: WindowType::HistoricalFixed,
95+
};
96+
97+
let mut operator = HistoricalFixedWindowOperator::new(storage.clone(), window_def);
98+
99+
// Should yield once with events in [200, 500]
100+
let w1 = operator.next().unwrap();
101+
assert_eq!(w1.len(), 4); // Events at 200, 300, 400, 500
102+
assert_eq!(w1[0].timestamp, 200);
103+
assert_eq!(w1[3].timestamp, 500);
104+
105+
// Should not yield again
106+
assert!(operator.next().is_none());
107+
108+
let _ = fs::remove_dir_all(test_dir);
109+
}
110+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use crate::core::Event;
2+
use crate::parsing::janusql_parser::WindowDefinition;
3+
use crate::storage::segmented_storage::StreamingSegmentedStorage;
4+
use std::sync::Arc;
5+
6+
/// Operator for processing historical data with a sliding window.
7+
/// It iterates over the storage and yields events for each window.
8+
pub struct HistoricalSlidingWindowOperator {
9+
storage: Arc<StreamingSegmentedStorage>,
10+
window_def: WindowDefinition,
11+
current_start: u64,
12+
end_bound: u64,
13+
}
14+
15+
impl HistoricalSlidingWindowOperator {
16+
/// Creates a new HistoricalSlidingWindowOperator.
17+
///
18+
/// # Arguments
19+
///
20+
/// * `storage` - The storage backend to query.
21+
/// * `window_def` - The window definition (width, slide, offset, etc.).
22+
pub fn new(storage: Arc<StreamingSegmentedStorage>, window_def: WindowDefinition) -> Self {
23+
let now = std::time::SystemTime::now()
24+
.duration_since(std::time::UNIX_EPOCH)
25+
.unwrap()
26+
.as_millis() as u64;
27+
28+
// Offset is mandatory for HistoricalSliding windows as per the parser and requirements.
29+
// We subtract it from the query_start to "go back" in time.
30+
let offset = window_def.offset.expect("Offset must be defined for HistoricalSlidingWindow");
31+
let start_time = now.saturating_sub(offset);
32+
33+
HistoricalSlidingWindowOperator {
34+
storage,
35+
window_def,
36+
current_start: start_time,
37+
end_bound: now,
38+
}
39+
}
40+
}
41+
42+
impl Iterator for HistoricalSlidingWindowOperator {
43+
type Item = Vec<Event>;
44+
45+
fn next(&mut self) -> Option<Self::Item> {
46+
// Calculate the window bounds
47+
let window_start = self.current_start;
48+
let window_end = (window_start + self.window_def.width).min(self.end_bound);
49+
50+
// Check if we have exceeded the query range
51+
// We stop if the window start goes beyond the end bound.
52+
// (Alternative: stop if window_end > end_bound, depending on strict containment requirements)
53+
if window_start > self.end_bound {
54+
return None;
55+
}
56+
57+
// Query the storage for events in this window
58+
// Note: query() is inclusive, so we might need to adjust if we want [start, end)
59+
// For now, we assume the storage query semantics match what we want or we accept inclusive.
60+
// Usually windows are [start, end).
61+
let events_result = self.storage.query(window_start, window_end);
62+
63+
match events_result {
64+
Ok(events) => {
65+
// Advance the window
66+
self.current_start += self.window_def.slide;
67+
Some(events)
68+
}
69+
Err(e) => {
70+
eprintln!("Error querying storage for window: {}", e);
71+
None
72+
}
73+
}
74+
}
75+
}
76+
77+
#[cfg(test)]
78+
mod tests {
79+
use super::*;
80+
use crate::parsing::janusql_parser::WindowType;
81+
use crate::storage::util::StreamingConfig;
82+
use std::fs;
83+
84+
fn create_test_config(path: &str) -> StreamingConfig {
85+
StreamingConfig {
86+
segment_base_path: path.to_string(),
87+
max_batch_events: 10,
88+
max_batch_bytes: 1024,
89+
max_batch_age_seconds: 1,
90+
sparse_interval: 2,
91+
entries_per_index_block: 2,
92+
}
93+
}
94+
95+
#[test]
96+
fn test_historical_sliding_window() {
97+
let test_dir = "/tmp/janus_test_sliding_window";
98+
let _ = fs::remove_dir_all(test_dir); // Clean up before test
99+
100+
let config = create_test_config(test_dir);
101+
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
102+
103+
let now = std::time::SystemTime::now()
104+
.duration_since(std::time::UNIX_EPOCH)
105+
.unwrap()
106+
.as_millis() as u64;
107+
108+
// Write events in the past: now-500, now-400, now-300, now-200, now-100, now
109+
for i in 0..6 {
110+
let ts = now - (500 - (i * 100));
111+
storage.write_rdf(ts, "s", "p", "o", "g").unwrap();
112+
}
113+
114+
// Define Window: Width 200, Slide 100, Offset 500 (Start at now - 500)
115+
let window_def = WindowDefinition {
116+
window_name: "w1".to_string(),
117+
stream_name: "s1".to_string(),
118+
width: 200,
119+
slide: 100,
120+
offset: Some(500),
121+
start: None,
122+
end: None,
123+
window_type: WindowType::HistoricalSliding,
124+
};
125+
126+
let mut operator = HistoricalSlidingWindowOperator::new(storage.clone(), window_def);
127+
128+
// Window 1: [now-500, now-300] -> Events at now-500, now-400, now-300
129+
// Note: query is inclusive.
130+
let w1 = operator.next().unwrap();
131+
assert_eq!(w1.len(), 3);
132+
assert_eq!(w1[0].timestamp, now - 500);
133+
assert_eq!(w1[2].timestamp, now - 300);
134+
135+
// Window 2: [now-400, now-200] -> Events at now-400, now-300, now-200
136+
let w2 = operator.next().unwrap();
137+
assert_eq!(w2.len(), 3);
138+
assert_eq!(w2[0].timestamp, now - 400);
139+
assert_eq!(w2[2].timestamp, now - 200);
140+
141+
// Window 3: [now-300, now-100] -> Events at now-300, now-200, now-100
142+
let w3 = operator.next().unwrap();
143+
assert_eq!(w3.len(), 3);
144+
assert_eq!(w3[0].timestamp, now - 300);
145+
assert_eq!(w3[2].timestamp, now - 100);
146+
147+
let _ = fs::remove_dir_all(test_dir);
148+
}
149+
}

src/stream/operators/hs2r.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1 @@
1-
use crate::parsing::janusql_parser::WindowType;
21
use crate::parsing::janusql_parser::R2SOperator;
3-
4-
pub struct HistoricalWindow {
5-
pub window_name: String,
6-
pub stream_name: String,
7-
pub width: Option<u64>,
8-
pub slide: Option<u64>,
9-
pub offset: Option<u64>,
10-
pub start: Option<u64>,
11-
pub end: Option<u64>,
12-
pub window_type: WindowType,
13-
}

0 commit comments

Comments
 (0)