Skip to content

Commit e46658f

Browse files
Istrate Andrei-EduardIstrate Andrei-Eduard
authored andcommitted
Description
This PR introduces a major architectural overhaul to DynaRust, transforming it from a simple distributed key-value store into a high-performance, causally consistent datastore capable of handling massive concurrency and cluster scales.
1 parent 4ffcaef commit e46658f

11 files changed

Lines changed: 503 additions & 247 deletions

File tree

Cargo.lock

Lines changed: 63 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ chrono = "0.4.40"
2121
jsonwebtoken = "9.3.1"
2222
log = "0.4.27"
2323
rand_core = "0.9.3"
24+
rand = "0.8"
2425
aes-gcm = "0.10" # AES-GCM (AEAD) implementation
2526
aead = "0.5" # traits for Aead, KeyInit, etc.
26-
dotenvy = "0.15.7"
27+
dotenvy = "0.15.7"
28+
dashmap = { version = "6.1.0", features = ["serde"] }
29+
bincode = "1.3.3"

README.md

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
# 🦀 DynaRust: Distributed Key-Value Store
22

3-
DynaRust is a distributed key–JSON‑value built in Rust. It's designed to be reliable and easy to manage, allowing you to add or remove nodes (servers) dynamically without interrupting service 🔄.
3+
DynaRust is a high-performance, distributed key–JSON‑value store built in Rust. It's designed for massive concurrency, reliable consistency, and seamless scalability 🔄.
44

5-
It combines in‑memory caching, on‑disk persistence, automatic cross‑node replication and background synchronization for eventual consistency, delivering a fault‑tolerant, horizontally scalable datastore.
5+
It combines **lock-free concurrent storage**, **binary internal replication**, **causal consistency via Vector Clocks**, and a **SWIM-inspired gossip protocol** to deliver a fault‑tolerant, horizontally scalable datastore.
66

7-
With its advanced real‑time update capabilities, DynaRust pushes live changes with latencies below 5 ms 🚀. In fact, on a typical VPS (1 GB RAM, 100 Mbps bandwidth), a single node can comfortably sustain peak traffic of up to **5000 live connections** 🔥—and you can increase capacity even further simply by adding more nodes to your cluster!
7+
With its advanced real‑time update capabilities, DynaRust pushes live changes with latencies below 5 ms 🚀. Optimized for modern hardware, a single node can comfortably sustain peak traffic of up to **10,000+ live connections** 🔥—and you can increase capacity linearly by adding more nodes to your cluster.
88

99
---
10-
## Performance
10+
## Performance (v2)
1111
| **Metric** | **Value** |
1212
|------------------------|-----------------------------------------------|
13-
| Container Resources | 0.25 vCPU, 0.5 GB RAM |
14-
| Data Storage | 100,000 records (50-word lorem ipsum JSON/record) |
15-
| Memory Consumption | 350 MB |
16-
| Cluster Startup | < 1 sec |
17-
| GET Operation Latency | ~20 ms |
18-
| Cold storage Usage | ~30 MB of disk data |
13+
| Storage Engine | DashMap (Granular Locking) |
14+
| Internal Protocol | Bincode (Binary Serialization) |
15+
| Consistency Model | Eventual Consistency w/ Vector Clocks |
16+
| Replication Strategy | Consistent Hashing Ring |
17+
| Reliability | Read Repair + Exponential Backoff Retries |
18+
| Gossip Protocol | SWIM (O(1) Scalable Membership) |
19+
| GET Latency | ~5-10 ms |
20+
| SSE Capacity | 10,000+ concurrent connections per node |
1921

2022
### While inserting ~300 rows/sec we had a SSE client open on a key flawlessly getting live updates in <5 ms (Cheapest AWS EC2)
2123
---
@@ -27,16 +29,30 @@ With its advanced real‑time update capabilities, DynaRust pushes live changes
2729

2830
## ✨ Key Features
2931

30-
* **🔥 HOT RELOAD & REAL‑TIME UPDATES:**
31-
Enjoy lightning‑fast, real‑time updates using Server‑Sent Events (SSE). Subscribe to a key with:
32+
* **⚡️ MASSIVE CONCURRENCY (DashMap):**
33+
The storage engine uses granular, shard-based locking instead of global `RwLock`s. This allows simultaneous writes across different keys and tables without bottlenecking the entire system.
34+
35+
* **🚀 BINARY REPLICATION (Bincode):**
36+
Node-to-node communication is now powered by Bincode. This binary format is significantly faster and more compact than JSON, reducing CPU usage and network saturation during high-load replication.
37+
38+
* **🕙 CAUSAL CONSISTENCY (Vector Clocks):**
39+
Version tracking has graduated from simple integers to full **Vector Clocks**. DynaRust can now accurately track causality across distributed nodes, automatically resolving concurrent updates and ensuring data integrity.
40+
41+
* **🎯 CONSISTENT HASHING:**
42+
Data is distributed using a consistent hashing ring with virtual nodes. This minimizes data movement when nodes join or leave, ensuring only a small fraction of keys are remapped.
43+
44+
* **🛡️ RELIABILITY (Read Repair & Retries):**
45+
- **Read Repair:** Every `GET` request automatically checks all replicas. If a stale version is detected, a background task immediately "repairs" the outdated nodes with the latest value.
46+
- **Retries:** Outgoing replication now uses exponential backoff. Temporary network glitches no longer lead to data divergence.
47+
48+
* **🛰️ SCALABLE MEMBERSHIP (SWIM Gossip):**
49+
The cluster uses a SWIM-inspired gossip protocol. Network overhead for health checks stays constant ($O(1)$ per node) regardless of cluster size. Indirect probing ensures highly accurate failure detection.
50+
51+
* **🔥 REAL‑TIME UPDATES (SSE):**
52+
Instant updates using Server‑Sent Events (SSE). Subscribe to a key and receive changes in < 5 ms.
3253
```bash
33-
# Example: Subscribe to 'statusKey' in the 'notifications' table
3454
curl -N http://localhost:8080/notifications/subscribe/statusKey
3555
```
36-
![liveupdate](https://github.com/yourfavDev/DynaRust/blob/edc84068e9f88be693cdeb7085b19a648ea33b7d/docs/liveupdate.gif)
37-
Changes are pushed instantly (< 5 ms latency). On a standard VPS (1GB RAM, 100Mbps), a single node handles up to **5000 simultaneous live connections** 💪. Need more capacity? Just add more nodes!
38-
39-
* **Use Case Example:** Imagine a web UI needing push notifications. Store device IDs as keys in a `devices` table. Use a separate `status` key in the same table. The frontend listens to `devices/subscribe/status`. The backend iterates through device keys, performs actions, and updates the `status` key, instantly notifying all listening frontends. Simple and blazing fast! ⚡️
4056
---
4157

4258
### 🔒 **Security**

src/main.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,27 @@ use storage::engine::{
1919
VersionedValue, get_global_store
2020
};
2121
use storage::persistance::{cold_save, load_all_tables};
22-
use network::broadcaster::{membership_sync, heartbeat, get_membership, update_membership};
22+
use network::broadcaster::{membership_sync, heartbeat, get_membership, update_membership, indirect_ping};
2323
use crate::storage::engine::put_value_internal;
2424
use crate::storage::snapshoting::start_snapshot_task;
2525
use crate::storage::statistics::{get_stats, MetricsCollector, MetricsMiddleware};
2626
use crate::storage::subscription::SubscriptionManager;
2727

28+
use dashmap::DashMap;
29+
2830
/// Declare APP_STATE globally so that it’s available throughout the module.
2931
static APP_STATE: OnceCell<web::Data<AppState>> = OnceCell::new();
3032

3133
/// Merge the remote state for a given table into the local state.
3234
fn merge_table_state(
33-
local: &mut HashMap<String, VersionedValue>,
35+
local: &DashMap<String, VersionedValue>,
3436
remote: HashMap<String, VersionedValue>,
3537
) {
3638
for (key, remote_val) in remote {
3739
local
3840
.entry(key)
3941
.and_modify(|local_val| {
40-
if remote_val.version > local_val.version {
42+
if remote_val.dominates(local_val) || (!local_val.dominates(&remote_val) && remote_val.timestamp > local_val.timestamp) {
4143
*local_val = remote_val.clone();
4244
}
4345
})
@@ -47,16 +49,12 @@ fn merge_table_state(
4749

4850
/// Merge the entire global remote store into the local store.
4951
fn merge_global_store(
50-
local: &mut HashMap<String, HashMap<String, VersionedValue>>,
52+
local: &DashMap<String, DashMap<String, VersionedValue>>,
5153
remote: HashMap<String, HashMap<String, VersionedValue>>,
5254
) {
5355
for (table, remote_table) in remote {
54-
local
55-
.entry(table.clone())
56-
.and_modify(|local_table| {
57-
merge_table_state(local_table, remote_table.clone());
58-
})
59-
.or_insert(remote_table);
56+
let local_table = local.entry(table).or_default();
57+
merge_table_state(&local_table, remote_table);
6058
}
6159
}
6260

@@ -84,7 +82,7 @@ async fn main() -> std::io::Result<()> {
8482
// Initialize the local in‑memory multi‑table key–value store.
8583
let state = web::Data::new(AppState {
8684
base_dir: "./data",
87-
store: RwLock::new(HashMap::new()),
85+
store: DashMap::new(),
8886
});
8987
let _ = APP_STATE.set(state.clone());
9088

@@ -95,10 +93,7 @@ async fn main() -> std::io::Result<()> {
9593
}
9694

9795
// (Optionally) Ensure the default table exists in memory.
98-
{
99-
let mut store = state.store.write().await;
100-
store.entry("default".to_string()).or_insert(HashMap::new());
101-
}
96+
state.store.entry("default".to_string()).or_default();
10297
start_snapshot_task(state.clone());
10398

10499
// If a join node is provided, join its cluster and merge its global state.
@@ -143,9 +138,8 @@ async fn main() -> std::io::Result<()> {
143138
HashMap::new()
144139
});
145140
{
146-
let mut store = state.store.write().await;
147-
merge_global_store(&mut store, remote_store);
148-
println!("Merged global cold storage: {:?}", *store);
141+
merge_global_store(&state.store, remote_store);
142+
println!("Merged global cold storage: {:?}", state.store);
149143
}
150144
} else {
151145
eprintln!(
@@ -231,6 +225,7 @@ async fn main() -> std::io::Result<()> {
231225
.route("/join", web::post().to(join_cluster))
232226
.route("/membership", web::get().to(get_membership))
233227
.route("/update_membership", web::post().to(update_membership))
228+
.route("/indirect_ping", web::post().to(indirect_ping))
234229
.route("/heartbeat", web::get().to(heartbeat))
235230
.route(
236231
"/internal/{table}/key/{key}",
@@ -284,6 +279,7 @@ async fn main() -> std::io::Result<()> {
284279
.route("/join", web::post().to(join_cluster))
285280
.route("/membership", web::get().to(get_membership))
286281
.route("/update_membership", web::post().to(update_membership))
282+
.route("/indirect_ping", web::post().to(indirect_ping))
287283
.route("/heartbeat", web::get().to(heartbeat))
288284
.route(
289285
"/internal/{table}/key/{key}",

0 commit comments

Comments
 (0)