Skip to content

Commit ec893d4

Browse files
Istrate Andrei-EduardIstrate Andrei-Eduard
authored andcommitted
Fix
1 parent e46658f commit ec893d4

8 files changed

Lines changed: 325 additions & 159 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
DynaRust is a high-performance, distributed key–JSON‑value store built in Rust. It's designed for massive concurrency, reliable consistency, and seamless scalability 🔄.
44

5-
It combines **lock-free concurrent storage**, **binary internal replication**, **causal consistency via Vector Clocks**, and a **SWIM-inspired gossip protocol** to deliver a fault‑tolerant, horizontally scalable datastore.
5+
It combines **lock-free concurrent storage**, **robust JSON internal replication**, **causal consistency via Vector Clocks**, and a **SWIM-inspired gossip protocol** to deliver a fault‑tolerant, horizontally scalable datastore.
66

77
With its advanced real‑time update capabilities, DynaRust pushes live changes with latencies below 5 ms 🚀. Optimized for modern hardware, a single node can comfortably sustain peak traffic of up to **10,000+ live connections** 🔥—and you can increase capacity linearly by adding more nodes to your cluster.
88

@@ -11,7 +11,7 @@ With its advanced real‑time update capabilities, DynaRust pushes live changes
1111
| **Metric** | **Value** |
1212
|------------------------|-----------------------------------------------|
1313
| Storage Engine | DashMap (Granular Locking) |
14-
| Internal Protocol | Bincode (Binary Serialization) |
14+
| Internal Protocol | JSON (Reliable Serialization) |
1515
| Consistency Model | Eventual Consistency w/ Vector Clocks |
1616
| Replication Strategy | Consistent Hashing Ring |
1717
| Reliability | Read Repair + Exponential Backoff Retries |

src/main.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,20 @@ async fn main() -> std::io::Result<()> {
9494

9595
// (Optionally) Ensure the default table exists in memory.
9696
state.store.entry("default".to_string()).or_default();
97-
start_snapshot_task(state.clone());
97+
start_snapshot_task(&state);
98+
99+
// Initialize cluster data with dynamic membership.
100+
let mut initial_nodes = HashMap::new();
101+
initial_nodes.insert(
102+
current_node.clone(),
103+
NodeInfo {
104+
status: NodeStatus::Active,
105+
last_heartbeat: current_timestamp(),
106+
},
107+
);
108+
let cluster_data = web::Data::new(ClusterData {
109+
nodes: Arc::new(RwLock::new(initial_nodes)),
110+
});
98111

99112
// If a join node is provided, join its cluster and merge its global state.
100113
if args.len() >= 3 {
@@ -116,6 +129,10 @@ async fn main() -> std::io::Result<()> {
116129
if response.status().is_success() {
117130
if let Ok(nodes) = response.json::<HashMap<String, NodeInfo>>().await {
118131
println!("Joined cluster: {:?}", nodes);
132+
let mut nodes_guard = cluster_data.nodes.write().await;
133+
for (node, info) in nodes {
134+
nodes_guard.insert(node, info);
135+
}
119136
}
120137
} else {
121138
println!("Failed to join cluster (status = {})", response.status());
@@ -167,23 +184,10 @@ async fn main() -> std::io::Result<()> {
167184
// Spawn the periodic cold save task.
168185
tokio::spawn(cold_save(state.clone(), 30));
169186

170-
// Initialize cluster data with dynamic membership.
171-
let mut initial_nodes = HashMap::new();
172-
initial_nodes.insert(
173-
current_node.clone(),
174-
NodeInfo {
175-
status: NodeStatus::Active,
176-
last_heartbeat: current_timestamp(),
177-
},
178-
);
179-
let cluster_data = web::Data::new(ClusterData {
180-
nodes: Arc::new(RwLock::new(initial_nodes)),
181-
});
182-
183187
// Spawn the membership synchronization (heartbeat) task.
184188
let cluster_clone = cluster_data.clone();
185189
let current_clone = current_node.clone();
186-
tokio::spawn(membership_sync(cluster_clone, current_clone, 60));
190+
tokio::spawn(membership_sync(cluster_clone, current_clone, 5));
187191
let subscription_manager = web::Data::new(SubscriptionManager::new());
188192

189193
// Initialize metrics collector and middleware

src/network/broadcaster.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,12 @@ async fn send_membership_update(
217217
url: &str,
218218
membership: &HashMap<String, NodeInfo>,
219219
) -> Result<(), String> {
220+
let cluster_token = env::var("CLUSTER_SECRET")
221+
.unwrap_or_else(|_| "default_secret".to_string());
222+
220223
match client.post(url)
221224
.header("Content-Type", "application/octet-stream")
225+
.header("X-Cluster-Token", &cluster_token)
222226
.body(bincode::serialize(membership).unwrap())
223227
.send().await {
224228
Ok(resp) => {
@@ -256,7 +260,7 @@ pub async fn update_membership(
256260
// 1. Authentication Check
257261
// In production, load this once at startup rather than on every request
258262
let expected_token = env::var("CLUSTER_SECRET")
259-
.unwrap_or_else(|_| "default_insecure_secret".to_string());
263+
.unwrap_or_else(|_| "default_secret".to_string());
260264

261265
let is_authenticated = req.headers()
262266
.get("X-Cluster-Token")
@@ -277,9 +281,9 @@ pub async fn update_membership(
277281
let mut updated = false;
278282

279283
for (node, info) in incoming.into_iter() {
280-
// 2. Input Validation (Basic check to ensure it looks like a URL)
281-
if !node.starts_with("http://") && !node.starts_with("https://") {
282-
continue; // Ignore malformed node addresses
284+
// 2. Input Validation (ensure it's not empty and has no spaces)
285+
if node.is_empty() || node.contains(' ') {
286+
continue;
283287
}
284288

285289
// For nodes not in our membership, add them safely

src/storage/admin.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ fn validate_admin_token(req: &HttpRequest) -> bool {
6868
&Validation::default(),
6969
) {
7070
Ok(token_data) => token_data.claims.admin,
71-
Err(_) => false,
71+
Err(e) => {
72+
eprintln!("Admin token validation failed: {}", e);
73+
false
74+
},
7275
};
7376
}
7477
}
@@ -243,8 +246,7 @@ async fn replicate_admin_change(
243246
.put(&url)
244247
.header("X-Internal-Request", "true")
245248
.header("SECRET", &secret_clone)
246-
.header("Content-Type", "application/octet-stream")
247-
.body(bincode::serialize(&payload).unwrap())
249+
.json(&payload)
248250
.send()
249251
.await;
250252
});

src/storage/engine.rs

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,6 @@ impl VersionedValue {
8585
}
8686
strictly_newer
8787
}
88-
89-
pub fn merge_clock(&mut self, other: &HashMap<String, u64>) {
90-
for (node, other_v) in other {
91-
let self_v = self.vector_clock.entry(node.clone()).or_insert(0);
92-
if *other_v > *self_v {
93-
*self_v = *other_v;
94-
}
95-
}
96-
}
9788
}
9889

9990
/// The local in-memory database state.
@@ -384,15 +375,12 @@ pub async fn get_value(
384375
tokio::spawn(async move {
385376
let _permit = permit;
386377
let url = format!("http://{}/internal/{}/key/{}", target, table_repair, key_repair);
387-
if let Ok(payload) = bincode::serialize(&latest_repair) {
388-
let _ = cli.put(&url)
389-
.header("X-Internal-Request", "true")
390-
.header("SECRET", &secret_repair)
391-
.header("Content-Type", "application/octet-stream")
392-
.body(payload)
393-
.send()
394-
.await;
395-
}
378+
let _ = cli.put(&url)
379+
.header("X-Internal-Request", "true")
380+
.header("SECRET", &secret_repair)
381+
.json(&latest_repair)
382+
.send()
383+
.await;
396384
});
397385
}
398386
}
@@ -484,8 +472,7 @@ pub async fn patch_value(
484472
.put(&url)
485473
.header("X-Internal-Request", "true")
486474
.header("SECRET", &secret_clone)
487-
.header("Content-Type", "application/octet-stream")
488-
.body(bincode::serialize(&payload).unwrap())
475+
.json(&payload)
489476
.send()
490477
.await
491478
{
@@ -583,8 +570,7 @@ pub async fn put_value(
583570
.put(&url)
584571
.header("X-Internal-Request", "true")
585572
.header("SECRET", &secret_clone)
586-
.header("Content-Type", "application/octet-stream")
587-
.body(bincode::serialize(&payload).unwrap())
573+
.json(&payload)
588574
.send()
589575
.await
590576
{
@@ -610,22 +596,20 @@ pub async fn put_value_internal(
610596
path: web::Path<(String, String)>,
611597
state: web::Data<AppState>,
612598
sub_manager: web::Data<SubscriptionManager>,
613-
body: web::Bytes,
599+
body: web::Json<VersionedValue>,
614600
) -> impl Responder {
615601
let (table_name, key_val) = path.into_inner();
616602

617603
// 1️⃣ Cluster‐secret check
618-
let cluster_secret = env::var("CLUSTER_SECRET").unwrap_or_default();
619-
match req.headers().get("SECRET") {
620-
Some(h) if h.to_str().unwrap_or("") == cluster_secret => {}
621-
_ => return HttpResponse::Unauthorized().finish(),
604+
let cluster_secret = env::var("CLUSTER_SECRET").unwrap_or_else(|_| "default_secret".to_string());
605+
let provided_secret = req.headers().get("SECRET").and_then(|h| h.to_str().ok()).unwrap_or("");
606+
607+
if provided_secret != cluster_secret {
608+
return HttpResponse::Unauthorized().finish();
622609
}
623610

624611
// 2️⃣ Merge the incoming VersionedValue by version number
625-
let incoming: VersionedValue = match bincode::deserialize(&body) {
626-
Ok(v) => v,
627-
Err(_) => return HttpResponse::BadRequest().finish(),
628-
};
612+
let incoming = body.into_inner();
629613
let table = state.store.entry(table_name.clone()).or_default();
630614

631615
let cached = table

src/storage/snapshoting.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ async fn clean_old_snapshots(dir: &PathBuf, max_snapshots: usize)
4646
/// 2. Encrypt it with `encrypt()`
4747
/// 3. Write to `./snapshots/snapshot_<ts>.json.enc`
4848
/// 4. Clean up old snapshots
49-
pub fn start_snapshot_task(state: Data<AppState>) {
49+
pub fn start_snapshot_task(states: &Data<AppState>) {
5050
let snap_limit = env::var("SNAP_LIMIT")
5151
.ok()
5252
.and_then(|s| s.parse().ok())
5353
.unwrap_or(100);
5454
let snapshot_dir = PathBuf::from("./snapshots");
55-
55+
let state = states.clone();
5656
tokio::spawn(async move {
5757
if let Err(e) = fs::create_dir_all(&snapshot_dir).await {
5858
eprintln!(

0 commit comments

Comments
 (0)