Skip to content

Commit 0ee7616

Browse files
committed
fix: Replace Arc with Rc in window operators and tests to resolve Clippy warnings.
1 parent 97c0144 commit 0ee7616

5 files changed

Lines changed: 18 additions & 15 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ All notable changes to the Janus project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## [Unreleased]
8+
## [Unreleased] - 2025-11-24
99

1010
### Added
1111
- **Historical Sliding Window Operator** (`src/stream/operators/historical_sliding_window.rs`)
@@ -33,6 +33,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3333
### Fixed
3434
- Window end clamping to prevent querying future data in sliding windows
3535
- Type consistency across window operators
36+
- **Clippy warnings**: Replaced `Arc` with `Rc` in window operators and tests since they don't use threading
37+
- Fixed `arc_with_non_send_sync` lint errors
38+
- All CI checks now pass (rustfmt, clippy, tests, build)
3639

3740
## [0.1.0] - Initial Release
3841

src/stream/operators/historical_fixed_window.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use crate::core::Event;
22
use crate::parsing::janusql_parser::WindowDefinition;
33
use crate::storage::segmented_storage::StreamingSegmentedStorage;
4-
use std::sync::Arc;
4+
use std::rc::Rc;
55

66
/// Operator for processing historical data with a fixed window.
77
/// Unlike sliding windows, this queries a single fixed time range [start, end].
88
pub struct HistoricalFixedWindowOperator {
9-
storage: Arc<StreamingSegmentedStorage>,
9+
storage: Rc<StreamingSegmentedStorage>,
1010
window_def: WindowDefinition,
1111
has_yielded: bool,
1212
}
@@ -18,7 +18,7 @@ impl HistoricalFixedWindowOperator {
1818
///
1919
/// * `storage` - The storage backend to query.
2020
/// * `window_def` - The window definition with start and end timestamps.
21-
pub fn new(storage: Arc<StreamingSegmentedStorage>, window_def: WindowDefinition) -> Self {
21+
pub fn new(storage: Rc<StreamingSegmentedStorage>, window_def: WindowDefinition) -> Self {
2222
HistoricalFixedWindowOperator { storage, window_def, has_yielded: false }
2323
}
2424
}
@@ -75,7 +75,7 @@ mod tests {
7575
let _ = fs::remove_dir_all(test_dir);
7676

7777
let config = create_test_config(test_dir);
78-
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
78+
let storage = Rc::new(StreamingSegmentedStorage::new(config).unwrap());
7979

8080
// Write events at timestamps 100, 200, 300, 400, 500, 600
8181
for i in 1..=6 {

src/stream/operators/historical_sliding_window.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use crate::core::Event;
22
use crate::parsing::janusql_parser::WindowDefinition;
33
use crate::storage::segmented_storage::StreamingSegmentedStorage;
4-
use std::sync::Arc;
4+
use std::rc::Rc;
55

66
/// Operator for processing historical data with a sliding window.
77
/// It iterates over the storage and yields events for each window.
88
pub struct HistoricalSlidingWindowOperator {
9-
storage: Arc<StreamingSegmentedStorage>,
9+
storage: Rc<StreamingSegmentedStorage>,
1010
window_def: WindowDefinition,
1111
current_start: u64,
1212
end_bound: u64,
@@ -19,7 +19,7 @@ impl HistoricalSlidingWindowOperator {
1919
///
2020
/// * `storage` - The storage backend to query.
2121
/// * `window_def` - The window definition (width, slide, offset, etc.).
22-
pub fn new(storage: Arc<StreamingSegmentedStorage>, window_def: WindowDefinition) -> Self {
22+
pub fn new(storage: Rc<StreamingSegmentedStorage>, window_def: WindowDefinition) -> Self {
2323
let now = std::time::SystemTime::now()
2424
.duration_since(std::time::UNIX_EPOCH)
2525
.unwrap()
@@ -98,7 +98,7 @@ mod tests {
9898
let _ = fs::remove_dir_all(test_dir); // Clean up before test
9999

100100
let config = create_test_config(test_dir);
101-
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
101+
let storage = Rc::new(StreamingSegmentedStorage::new(config).unwrap());
102102

103103
let now = std::time::SystemTime::now()
104104
.duration_since(std::time::UNIX_EPOCH)

tests/historical_fixed_window_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use janus::storage::segmented_storage::StreamingSegmentedStorage;
33
use janus::storage::util::StreamingConfig;
44
use janus::stream::operators::historical_fixed_window::HistoricalFixedWindowOperator;
55
use std::fs;
6-
use std::sync::Arc;
6+
use std::rc::Rc;
77

88
fn create_test_config(path: &str) -> StreamingConfig {
99
StreamingConfig {
@@ -22,7 +22,7 @@ fn test_historical_fixed_window_with_real_iris() {
2222
let _ = fs::remove_dir_all(test_dir);
2323

2424
let config = create_test_config(test_dir);
25-
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
25+
let storage = Rc::new(StreamingSegmentedStorage::new(config).unwrap());
2626

2727
// Write events with real RDF IRIs representing IoT device events
2828
let devices = [
@@ -107,7 +107,7 @@ fn test_historical_fixed_window_semantic_web() {
107107
let _ = fs::remove_dir_all(test_dir);
108108

109109
let config = create_test_config(test_dir);
110-
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
110+
let storage = Rc::new(StreamingSegmentedStorage::new(config).unwrap());
111111

112112
// Semantic web example: Publications and authors
113113
let publications = [

tests/historical_sliding_window_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use janus::storage::segmented_storage::StreamingSegmentedStorage;
33
use janus::storage::util::StreamingConfig;
44
use janus::stream::operators::historical_sliding_window::HistoricalSlidingWindowOperator;
55
use std::fs;
6-
use std::sync::Arc;
6+
use std::rc::Rc;
77

88
fn create_test_config(path: &str) -> StreamingConfig {
99
StreamingConfig {
@@ -22,7 +22,7 @@ fn test_historical_sliding_window_with_real_iris() {
2222
let _ = fs::remove_dir_all(test_dir);
2323

2424
let config = create_test_config(test_dir);
25-
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
25+
let storage = Rc::new(StreamingSegmentedStorage::new(config).unwrap());
2626

2727
let now = std::time::SystemTime::now()
2828
.duration_since(std::time::UNIX_EPOCH)
@@ -102,7 +102,7 @@ fn test_historical_sliding_window_foaf_example() {
102102
let _ = fs::remove_dir_all(test_dir);
103103

104104
let config = create_test_config(test_dir);
105-
let storage = Arc::new(StreamingSegmentedStorage::new(config).unwrap());
105+
let storage = Rc::new(StreamingSegmentedStorage::new(config).unwrap());
106106

107107
let now = std::time::SystemTime::now()
108108
.duration_since(std::time::UNIX_EPOCH)

0 commit comments

Comments
 (0)