Skip to content

Commit 2da0c44

Browse files
Merge pull request #9 from rohas-dev/feat/tracing-timeline
feat(telemetry): Add persistent telemetry system with RocksDB backend
2 parents 93f7489 + 723c7ee commit 2da0c44

23 files changed

Lines changed: 1724 additions & 187 deletions

File tree

Cargo.lock

Lines changed: 88 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ members = [
88
"crates/rohas-cron",
99
"crates/rohas-cli",
1010
"crates/rohas-dev-server",
11+
"crates/rohas-telemetry",
1112
"crates/rohas-adapters/adapter-memory",
1213
"crates/rohas-adapters/adapter-nats",
1314
"crates/rohas-adapters/adapter-kafka",
1415
"crates/rohas-adapters/adapter-rabbitmq",
1516
"crates/rohas-adapters/adapter-sqs",
17+
"crates/rohas-adapters/adapter-rocksdb",
1618
]
1719

1820
[workspace.package]
@@ -94,6 +96,8 @@ adapter-nats = { path = "crates/rohas-adapters/adapter-nats" }
9496
adapter-kafka = { path = "crates/rohas-adapters/adapter-kafka" }
9597
adapter-rabbitmq = { path = "crates/rohas-adapters/adapter-rabbitmq" }
9698
adapter-sqs = { path = "crates/rohas-adapters/adapter-sqs" }
99+
adapter-rocksdb = { path = "crates/rohas-adapters/adapter-rocksdb" }
100+
rohas-telemetry = { path = "crates/rohas-telemetry" }
97101

98102
[profile.release]
99103
opt-level = 3
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "adapter-rocksdb"
3+
version = { workspace = true }
4+
edition = { workspace = true }
5+
authors = { workspace = true }
6+
license = { workspace = true }
7+
8+
[dependencies]
9+
tokio = { workspace = true }
10+
rohas-telemetry = { workspace = true }
11+
rocksdb = "0.24.0"
12+
async-trait = "0.1"
13+
thiserror = { workspace = true }
14+
15+
[dev-dependencies]
16+
tokio-test = "0.4"
17+
tempfile = "3.10"
18+
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
use rohas_telemetry::error::{Result, TelemetryError};
2+
use rohas_telemetry::storage::{IterateCallback, StorageAdapter};
3+
use async_trait::async_trait;
4+
use rocksdb::{DB, IteratorMode, Options};
5+
use std::path::PathBuf;
6+
use std::sync::Arc;
7+
use tokio::sync::RwLock;
8+
9+
/// RocksDB storage adapter for telemetry data
10+
pub struct RocksDBAdapter {
11+
db: Arc<RwLock<DB>>,
12+
}
13+
14+
impl RocksDBAdapter {
15+
pub async fn new(path: PathBuf) -> Result<Self> {
16+
if let Some(parent) = path.parent() {
17+
std::fs::create_dir_all(parent)
18+
.map_err(|e| TelemetryError::Io(e))?;
19+
}
20+
21+
let mut opts = Options::default();
22+
opts.create_if_missing(true);
23+
opts.create_missing_column_families(true);
24+
25+
opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
26+
opts.set_max_write_buffer_number(3);
27+
opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB
28+
29+
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
30+
31+
opts.optimize_for_point_lookup(1024);
32+
33+
let db = DB::open(&opts, &path)
34+
.map_err(|e| TelemetryError::StorageBackend(e.to_string()))?;
35+
36+
Ok(Self {
37+
db: Arc::new(RwLock::new(db)),
38+
})
39+
}
40+
}
41+
42+
#[async_trait]
43+
impl StorageAdapter for RocksDBAdapter {
44+
async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
45+
let db = self.db.write().await;
46+
db.put(key, value)
47+
.map_err(|e| TelemetryError::StorageBackend(e.to_string()))?;
48+
Ok(())
49+
}
50+
51+
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
52+
let db = self.db.read().await;
53+
match db.get(key) {
54+
Ok(Some(value)) => Ok(Some(value)),
55+
Ok(None) => Ok(None),
56+
Err(e) => Err(TelemetryError::StorageBackend(e.to_string())),
57+
}
58+
}
59+
60+
async fn delete(&self, key: &[u8]) -> Result<()> {
61+
let db = self.db.write().await;
62+
db.delete(key)
63+
.map_err(|e| TelemetryError::StorageBackend(e.to_string()))?;
64+
Ok(())
65+
}
66+
67+
async fn get_by_prefix(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
68+
let db = self.db.read().await;
69+
let iter = db.iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
70+
71+
let mut keys = Vec::new();
72+
for item in iter {
73+
let (key, _) = item.map_err(|e| TelemetryError::StorageBackend(e.to_string()))?;
74+
if key.starts_with(prefix) {
75+
keys.push(key.to_vec());
76+
} else {
77+
break;
78+
}
79+
}
80+
81+
Ok(keys)
82+
}
83+
84+
async fn iterate(&self, prefix: &[u8], mut callback: Box<dyn IterateCallback>) -> Result<()> {
85+
let db = self.db.read().await;
86+
let iter = db.iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
87+
88+
for item in iter {
89+
let (key, value) = item.map_err(|e| TelemetryError::StorageBackend(e.to_string()))?;
90+
if key.starts_with(prefix) {
91+
let should_continue = callback.call(&key, &value)?;
92+
if !should_continue {
93+
break;
94+
}
95+
} else {
96+
break;
97+
}
98+
}
99+
100+
Ok(())
101+
}
102+
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use super::*;
107+
use tempfile::TempDir;
108+
109+
#[tokio::test]
110+
async fn test_rocksdb_adapter() {
111+
let temp_dir = TempDir::new().unwrap();
112+
let db_path = temp_dir.path().join("test_db");
113+
114+
let adapter = RocksDBAdapter::new(db_path).await.unwrap();
115+
116+
adapter.put(b"test:key1", b"value1").await.unwrap();
117+
let value = adapter.get(b"test:key1").await.unwrap();
118+
assert_eq!(value, Some(b"value1".to_vec()));
119+
120+
adapter.delete(b"test:key1").await.unwrap();
121+
let value = adapter.get(b"test:key1").await.unwrap();
122+
assert_eq!(value, None);
123+
124+
adapter.put(b"test:key1", b"value1").await.unwrap();
125+
adapter.put(b"test:key2", b"value2").await.unwrap();
126+
adapter.put(b"other:key1", b"value3").await.unwrap();
127+
128+
let keys = adapter.get_by_prefix(b"test:").await.unwrap();
129+
assert_eq!(keys.len(), 2);
130+
}
131+
}
132+

crates/rohas-cli/src/commands/init.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,22 @@ enable_cors = true
8080
type = "memory"
8181
buffer_size = 1000
8282
83+
[telemetry]
84+
# Telemetry adapter type: rocksdb (default), prometheus, influxdb, timescaledb
85+
type = "rocksdb"
86+
# Path to telemetry storage (relative to project root or absolute)
87+
path = ".rohas/telemetry"
88+
# Retention period for traces in days (0 = keep forever)
89+
retention_days = 30
90+
# Maximum number of traces to keep in memory cache
91+
max_cache_size = 1000
92+
# Enable metrics collection
93+
enable_metrics = true
94+
# Enable logs collection
95+
enable_logs = true
96+
# Enable traces collection
97+
enable_traces = true
98+
8399
[workbench]
84100
api_key = "{}"
85101
allowed_origins = []

crates/rohas-engine/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ rohas-runtime = { workspace = true }
1111
rohas-cron = { workspace = true }
1212
rohas-codegen = { workspace = true }
1313
adapter-memory = { workspace = true }
14+
rohas-telemetry = { workspace = true }
15+
adapter-rocksdb = { workspace = true }
1416

1517
tokio = { workspace = true }
1618
serde = { workspace = true }
@@ -30,6 +32,7 @@ futures = { workspace = true }
3032
futures-util = "0.3"
3133
regex = "1.11"
3234
base64 = { workspace = true }
35+
async-trait = "0.1"
3336

3437
[dev-dependencies]
3538
tokio-test = "0.4"

0 commit comments

Comments
 (0)