Skip to content

Commit ff45917

Browse files
author
test
committed
Implemented /stats
1 parent 99f264d commit ff45917

4 files changed

Lines changed: 198 additions & 32 deletions

File tree

.DS_Store

2 KB
Binary file not shown.

src/main.rs

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use storage::engine::{
2121
use storage::persistance::{cold_save, load_all_tables};
2222
use network::broadcaster::{membership_sync, heartbeat, get_membership, update_membership};
2323
use crate::storage::snapshoting::start_snapshot_task;
24+
use crate::storage::statistics::{get_stats, MetricsCollector, MetricsMiddleware};
2425
use crate::storage::subscription::SubscriptionManager;
2526

2627
/// Declare APP_STATE globally so that it’s available throughout the module.
@@ -176,6 +177,10 @@ async fn main() -> std::io::Result<()> {
176177
tokio::spawn(membership_sync(cluster_clone, current_clone, 60));
177178
let subscription_manager = web::Data::new(SubscriptionManager::new());
178179

180+
// Initialize metrics collector and middleware
181+
let metrics_collector = web::Data::new(MetricsCollector::new());
182+
// extract a clonable handle for the middleware
183+
let mw_col = metrics_collector.get_ref().clone();
179184

180185
let use_https = match env::var("DYNA_MODE").unwrap_or_default().as_str() {
181186
"https" => true,
@@ -199,6 +204,8 @@ async fn main() -> std::io::Result<()> {
199204
// Build and run the HTTP server.
200205
HttpServer::new(move || {
201206
App::new()
207+
.wrap(MetricsMiddleware::new(mw_col.clone()))
208+
.app_data(metrics_collector.clone())
202209
.app_data(subscription_manager.clone())
203210
.app_data(state.clone())
204211
.app_data(cluster_data.clone())
@@ -223,6 +230,7 @@ async fn main() -> std::io::Result<()> {
223230
storage::subscription::subscribe_to_key
224231
))
225232
.route("/auth/{user}", web::post().to(security::authentication::access))
233+
.service(get_stats)
226234
})
227235
.bind_openssl(bind_addr.as_str(), builder)?
228236
.run()
@@ -232,37 +240,40 @@ async fn main() -> std::io::Result<()> {
232240
println!("Starting distributed DB engine at http://{}", current_node);
233241

234242
// Build and run the HTTP server.
235-
HttpServer::new(move || {
236-
App::new()
237-
.app_data(subscription_manager.clone())
238-
.app_data(state.clone())
239-
.app_data(cluster_data.clone())
240-
.app_data(web::Data::new(current_node.clone()))
241-
// Cluster management endpoints.
242-
.route("/join", web::post().to(join_cluster))
243-
.route("/membership", web::get().to(get_membership))
244-
.route("/update_membership", web::post().to(update_membership))
245-
.route("/heartbeat", web::get().to(heartbeat))
246-
// Key–value endpoints with multi‑table support.
247-
.route("/{table}/key/{key}", web::get().to(get_value))
248-
.route("/{table}/key/{key}", web::put().to(put_value))
249-
.route("/{table}/key/{key}", web::delete().to(delete_value))
250-
// Endpoint to fetch a table’s entire in‑memory store.
251-
.route("/{table}/store", web::get().to(get_table_store))
252-
// Global endpoint returning the entire in‑memory store.
253-
.route("/store", web::get().to(get_global_store))
254-
// Endpoints to get keys from a table.
255-
.route("/{table}/keys", web::get().to(get_all_keys))
256-
.route("/{table}/keys", web::post().to(get_multiple_keys))
257-
.route("/{table}/subscribe/{key}", web::get().to(
258-
storage::subscription::subscribe_to_key
259-
))
260-
.route("/auth/{user}", web::post().to(security::authentication::access))
261-
})
262-
.bind(bind_addr.as_str())?
263-
.run()
264-
.await
243+
HttpServer::new(move || {
244+
App::new()
245+
.wrap(MetricsMiddleware::new(mw_col.clone()))
246+
.app_data(metrics_collector.clone())
247+
.app_data(subscription_manager.clone())
248+
.app_data(state.clone())
249+
.app_data(cluster_data.clone())
250+
.app_data(web::Data::new(current_node.clone()))
251+
// Cluster management endpoints.
252+
.route("/join", web::post().to(join_cluster))
253+
.route("/membership", web::get().to(get_membership))
254+
.route("/update_membership", web::post().to(update_membership))
255+
.route("/heartbeat", web::get().to(heartbeat))
256+
// Key–value endpoints with multi‑table support.
257+
.route("/{table}/key/{key}", web::get().to(get_value))
258+
.route("/{table}/key/{key}", web::put().to(put_value))
259+
.route("/{table}/key/{key}", web::delete().to(delete_value))
260+
// Endpoint to fetch a table’s entire in‑memory store.
261+
.route("/{table}/store", web::get().to(get_table_store))
262+
// Global endpoint returning the entire in‑memory store.
263+
.route("/store", web::get().to(get_global_store))
264+
// Endpoints to get keys from a table.
265+
.route("/{table}/keys", web::get().to(get_all_keys))
266+
.route("/{table}/keys", web::post().to(get_multiple_keys))
267+
.route("/{table}/subscribe/{key}", web::get().to(
268+
storage::subscription::subscribe_to_key
269+
))
270+
.route("/auth/{user}", web::post().to(security::authentication::access))
271+
.service(get_stats)
272+
})
273+
.bind(bind_addr.as_str())?
274+
.run()
275+
.await
265276
}
266277
}
267278

268-
}
279+
}

src/storage/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod engine;
22
pub mod persistance;
33
pub(crate) mod subscription;
4-
pub mod snapshoting;
4+
pub mod snapshoting;
5+
pub mod statistics;

src/storage/statistics.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// src/statistics.rs
2+
use actix_web::{
3+
dev::{Service, ServiceRequest, ServiceResponse, Transform},
4+
get,
5+
web::{ Data},
6+
HttpResponse, Responder,
7+
};
8+
use futures_util::future::{ok, LocalBoxFuture, Ready};
9+
use serde::Serialize;
10+
use std::{
11+
collections::HashMap,
12+
sync::{
13+
atomic::{AtomicU64, Ordering},
14+
Arc,
15+
},
16+
task::{Context, Poll},
17+
time::Instant,
18+
};
19+
use crate::storage::engine::AppState;
20+
use crate::storage::subscription::SubscriptionManager;
21+
22+
/// Holds global counters.
23+
#[derive(Clone)]
24+
pub struct MetricsCollector {
25+
pub total_requests: Arc<AtomicU64>,
26+
pub total_latency_ns: Arc<AtomicU64>,
27+
}
28+
29+
impl MetricsCollector {
30+
pub fn new() -> Self {
31+
Self {
32+
total_requests: Arc::new(AtomicU64::new(0)),
33+
total_latency_ns: Arc::new(AtomicU64::new(0)),
34+
}
35+
}
36+
37+
fn record(&self, latency_ns: u64) {
38+
self.total_requests.fetch_add(1, Ordering::Relaxed);
39+
self.total_latency_ns.fetch_add(latency_ns, Ordering::Relaxed);
40+
}
41+
}
42+
43+
/// Actix middleware that measures request latency.
44+
pub struct MetricsMiddleware {
45+
collector: MetricsCollector,
46+
}
47+
48+
impl MetricsMiddleware {
49+
pub fn new(collector: MetricsCollector) -> Self {
50+
MetricsMiddleware { collector }
51+
}
52+
}
53+
54+
impl<S, B> Transform<S, ServiceRequest> for MetricsMiddleware
55+
where
56+
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>
57+
+ 'static,
58+
B: 'static,
59+
{
60+
type Response = ServiceResponse<B>;
61+
type Error = actix_web::Error;
62+
type InitError = ();
63+
type Transform = MetricsMiddlewareService<S>;
64+
type Future = Ready<Result<Self::Transform, Self::InitError>>;
65+
66+
fn new_transform(&self, service: S) -> Self::Future {
67+
ok(MetricsMiddlewareService {
68+
service,
69+
collector: self.collector.clone(),
70+
})
71+
}
72+
}
73+
74+
pub struct MetricsMiddlewareService<S> {
75+
service: S,
76+
collector: MetricsCollector,
77+
}
78+
79+
impl<S, B> Service<ServiceRequest> for MetricsMiddlewareService<S>
80+
where
81+
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>
82+
+ 'static,
83+
B: 'static,
84+
{
85+
type Response = ServiceResponse<B>;
86+
type Error = actix_web::Error;
87+
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
88+
89+
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90+
self.service.poll_ready(cx)
91+
}
92+
93+
fn call(&self, req: ServiceRequest) -> Self::Future {
94+
let collector = self.collector.clone();
95+
let start = Instant::now();
96+
let fut = self.service.call(req);
97+
Box::pin(async move {
98+
let res = fut.await?;
99+
let latency = start.elapsed().as_nanos() as u64;
100+
collector.record(latency);
101+
Ok(res)
102+
})
103+
}
104+
}
105+
106+
#[derive(Serialize)]
107+
struct StatsResponse {
108+
tables: HashMap<String, usize>,
109+
total_keys: usize,
110+
total_requests: u64,
111+
average_latency_ms: f64,
112+
active_sse_connections: usize,
113+
}
114+
115+
/// GET /stats
116+
#[get("/stats")]
117+
pub async fn get_stats(
118+
state: Data<AppState>,
119+
sub: Data<SubscriptionManager>,
120+
metrics: Data<MetricsCollector>,
121+
) -> impl Responder {
122+
// count keys per table
123+
let store = state.store.read().await;
124+
let mut tables = HashMap::new();
125+
let mut total_keys = 0;
126+
for (table, map) in store.iter() {
127+
tables.insert(table.clone(), map.len());
128+
total_keys += map.len();
129+
}
130+
131+
let total_requests = metrics.total_requests.load(Ordering::Relaxed);
132+
let total_latency_ns = metrics.total_latency_ns.load(Ordering::Relaxed);
133+
let avg_latency_ms = if total_requests > 0 {
134+
(total_latency_ns as f64 / total_requests as f64) / 1e6
135+
} else {
136+
0.0
137+
};
138+
139+
// SSE connections = sum of receiver_count() across all channels
140+
let channels = sub.channels.read().await;
141+
let active_sse_connections = channels
142+
.values()
143+
.map(|tx| tx.receiver_count())
144+
.sum();
145+
146+
let resp = StatsResponse {
147+
tables,
148+
total_keys,
149+
total_requests,
150+
average_latency_ms: avg_latency_ms,
151+
active_sse_connections,
152+
};
153+
HttpResponse::Ok().json(resp)
154+
}

0 commit comments

Comments
 (0)