Skip to content

Commit ef3190e

Browse files
author
test
committed
put_value bug fix
1 parent 33b97c6 commit ef3190e

2 files changed

Lines changed: 76 additions & 34 deletions

File tree

src/storage/engine.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -295,23 +295,29 @@ pub async fn put_value(
295295

296296
// Internal replication requests bypass authentication.
297297
if req.headers().contains_key("X-Internal-Request") {
298-
let new_value = {
299-
let mut store = state.store.write().await;
300-
let table = store.entry(table_name.clone()).or_insert_with(HashMap::new);
301-
if let Some(existing) = table.get_mut(&key_val) {
302-
existing.update(body.0.clone());
303-
existing.clone()
304-
} else {
305-
// For internal requests, set owner as "internal"
306-
let v = VersionedValue::new(body.0.clone(), "internal".to_string());
307-
table.insert(key_val.clone(), v.clone());
308-
v
309-
}
310-
};
311-
sub_manager
312-
.notify(&table_name, &key_val, KeyEvent::Updated(new_value.clone()))
313-
.await;
314-
return HttpResponse::Created().json(new_value);
298+
if env::var("CLUSTER_SECRET").unwrap_or_default().as_str() == req.headers().get("SECRET").unwrap().to_str().unwrap() {
299+
let new_value = {
300+
let mut store = state.store.write().await;
301+
let table = store.entry(table_name.clone()).or_insert_with(HashMap::new);
302+
if let Some(existing) = table.get_mut(&key_val) {
303+
existing.update(body.0.clone());
304+
existing.clone()
305+
} else {
306+
let user_header = req.headers().get("User").unwrap().to_str().unwrap();
307+
308+
// For internal requests, set owner as "internal"
309+
let v = VersionedValue::new(body.0.clone(), String::from(user_header));
310+
table.insert(key_val.clone(), v.clone());
311+
v
312+
}
313+
};
314+
sub_manager
315+
.notify(&table_name, &key_val, KeyEvent::Updated(new_value.clone()))
316+
.await;
317+
return HttpResponse::Created().json(new_value);
318+
} else {
319+
return HttpResponse::Unauthorized().json(json!({}))
320+
}
315321
}
316322

317323
// External requests require a valid JWT token.
@@ -353,16 +359,20 @@ pub async fn put_value(
353359

354360
let client = reqwest::Client::new();
355361
let mut replication_futures = Vec::new();
362+
356363
for target in targets {
357364
if target != *current_addr.get_ref() {
365+
let user2 = user.clone();
358366
let url = format!("http://{}/{}/key/{}", target, table_name, key_val);
359367
let client_clone = client.clone();
360368
let value_clone = new_value.clone();
361369
let fut = async move {
362370
let result = client_clone
363371
.put(&url)
364372
.header("X-Internal-Request", "true")
365-
.json(&value_clone)
373+
.header("User", user2.clone())
374+
.header("SECRET", env::var("CLUSTER_SECRET").unwrap_or_default())
375+
.json(&value_clone.value) // HERE'S THE FIX: Send only the inner value data
366376
.timeout(std::time::Duration::from_secs(3))
367377
.send()
368378
.await;
@@ -379,6 +389,7 @@ pub async fn put_value(
379389
HttpResponse::Created().json(new_value)
380390
}
381391

392+
382393
/// DELETE handler: Removes a key with an ownership check.
383394
pub async fn delete_value(
384395
req: HttpRequest,

src/storage/statistics.rs

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
// src/statistics.rs
1+
// src/storage/statistics.rs
2+
23
use actix_web::{
3-
dev::{Service, ServiceRequest, ServiceResponse, Transform},
4+
dev::{ServiceRequest, ServiceResponse, Transform},
45
get,
5-
web::{ Data},
6-
HttpResponse, Responder,
6+
web::Data,
7+
HttpResponse,
8+
Responder,
79
};
810
use futures_util::future::{ok, LocalBoxFuture, Ready};
911
use serde::Serialize;
@@ -16,7 +18,8 @@ use std::{
1618
task::{Context, Poll},
1719
time::Instant,
1820
};
19-
use crate::storage::engine::AppState;
21+
22+
use crate::storage::engine::{AppState, ClusterData};
2023
use crate::storage::subscription::SubscriptionManager;
2124

2225
/// Holds global counters.
@@ -53,7 +56,7 @@ impl MetricsMiddleware {
5356

5457
impl<S, B> Transform<S, ServiceRequest> for MetricsMiddleware
5558
where
56-
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>
59+
S: actix_web::dev::Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>
5760
+ 'static,
5861
B: 'static,
5962
{
@@ -76,9 +79,9 @@ pub struct MetricsMiddlewareService<S> {
7679
collector: MetricsCollector,
7780
}
7881

79-
impl<S, B> Service<ServiceRequest> for MetricsMiddlewareService<S>
82+
impl<S, B> actix_web::dev::Service<ServiceRequest> for MetricsMiddlewareService<S>
8083
where
81-
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>
84+
S: actix_web::dev::Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>
8285
+ 'static,
8386
B: 'static,
8487
{
@@ -94,6 +97,7 @@ where
9497
let collector = self.collector.clone();
9598
let start = Instant::now();
9699
let fut = self.service.call(req);
100+
97101
Box::pin(async move {
98102
let res = fut.await?;
99103
let latency = start.elapsed().as_nanos() as u64;
@@ -103,13 +107,21 @@ where
103107
}
104108
}
105109

110+
#[derive(Serialize)]
111+
struct ClusterStats {
112+
status: String,
113+
last_heartbeat: u128,
114+
}
115+
106116
#[derive(Serialize)]
107117
struct StatsResponse {
108118
tables: HashMap<String, usize>,
109119
total_keys: usize,
110120
total_requests: u64,
111121
average_latency_ms: f64,
112122
active_sse_connections: usize,
123+
/// Current cluster membership with status and last heartbeat
124+
cluster: HashMap<String, ClusterStats>,
113125
}
114126

115127
/// GET /stats
@@ -118,8 +130,9 @@ pub async fn get_stats(
118130
state: Data<AppState>,
119131
sub: Data<SubscriptionManager>,
120132
metrics: Data<MetricsCollector>,
133+
cluster_data: Data<ClusterData>,
121134
) -> impl Responder {
122-
// count keys per table
135+
// Count keys per table.
123136
let store = state.store.read().await;
124137
let mut tables = HashMap::new();
125138
let mut total_keys = 0;
@@ -128,27 +141,45 @@ pub async fn get_stats(
128141
total_keys += map.len();
129142
}
130143

144+
// Compute request stats.
131145
let total_requests = metrics.total_requests.load(Ordering::Relaxed);
132146
let total_latency_ns = metrics.total_latency_ns.load(Ordering::Relaxed);
133-
let avg_latency_ms = if total_requests > 0 {
147+
let average_latency_ms = if total_requests > 0 {
134148
(total_latency_ns as f64 / total_requests as f64) / 1e6
135149
} else {
136150
0.0
137151
};
138152

139-
// SSE connections = sum of receiver_count() across all channels
153+
// Count active SSE connections.
140154
let channels = sub.channels.read().await;
141-
let active_sse_connections = channels
142-
.values()
143-
.map(|tx| tx.receiver_count())
144-
.sum();
155+
let active_sse_connections = channels.values().map(|tx| tx.receiver_count()).sum();
156+
157+
// Build cluster membership map.
158+
let cluster = {
159+
let nodes = cluster_data.nodes.read().await;
160+
nodes
161+
.iter()
162+
.map(|(addr, info)| {
163+
let status_str = format!("{:?}", info.status);
164+
(
165+
addr.clone(),
166+
ClusterStats {
167+
status: status_str,
168+
last_heartbeat: info.last_heartbeat,
169+
},
170+
)
171+
})
172+
.collect::<HashMap<_, _>>()
173+
};
145174

146175
let resp = StatsResponse {
147176
tables,
148177
total_keys,
149178
total_requests,
150-
average_latency_ms: avg_latency_ms,
179+
average_latency_ms,
151180
active_sse_connections,
181+
cluster,
152182
};
183+
153184
HttpResponse::Ok().json(resp)
154185
}

0 commit comments

Comments
 (0)