Skip to content

Commit 25f67e3

Browse files
Istrate Andrei-EduardIstrate Andrei-Eduard
authored andcommitted
Patch value
1 parent 87aeb62 commit 25f67e3

6 files changed

Lines changed: 186 additions & 28 deletions

File tree

.DS_Store

0 Bytes
Binary file not shown.

GEMINI.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# DynaRust Project
2+
3+
## Project Overview
4+
DynaRust is a distributed key-JSON-value datastore built in Rust. It provides in-memory caching, on-disk persistence, background replication for eventual consistency, and real-time update capabilities using Server-Sent Events (SSE). It handles cluster membership dynamically, allowing nodes to join and synchronize their state.
5+
6+
## Building and Running
7+
* **Build the project:**
8+
```bash
9+
cargo build --release
10+
```
11+
* **Run a node:**
12+
```bash
13+
# Run the first node
14+
./target/release/DynaRust <LISTEN_ADDRESS>
15+
16+
# Run additional nodes and join the cluster
17+
./target/release/DynaRust <LISTEN_ADDRESS> <JOIN_ADDRESS>
18+
```
19+
20+
## Development Conventions
21+
* **Rust & Actix-Web:** Built predominantly with Rust and Actix-Web for high-performance concurrent request handling.
22+
* **Authentication:** Endpoints (except GET) require JWT token via `Authorization: Bearer <token>`. Node-to-node replication uses a cluster secret defined in `.env`.
23+
* **Concurrency:** Utilizes `tokio` for async tasks, e.g., snapshots, gossip membership sync, and cluster replication.
24+
* **Storage Architecture:** Data is stored in-memory using `HashMap<String, HashMap<String, VersionedValue>>` with background disk persistence.
25+
26+
## Key Files
27+
* `src/main.rs`: Entry point and Actix-Web server initialization.
28+
* `src/storage/engine.rs`: Core data structures (`VersionedValue`, `AppState`) and CRUD handler logic (`put_value`, `get_value`, `delete_value`).
29+
* `src/network/broadcaster.rs`: Cluster membership gossip protocol implementation.
30+
* `src/storage/subscription.rs`: Real-time updates handler using Server-Sent Events (SSE).

README.md

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -110,22 +110,22 @@ All operations except **GET** require a valid JWT in the `Authorization: Bearer
110110
111111
2. **✍️ Store or Update a Value (HTTP PUT)**
112112
Create or update a value under `{table}/{key}`.
113-
- URL: `/default/key/mykey`
113+
- URL: `/{table}/key/{key}`
114114
- Headers:
115115
```
116116
Content-Type: application/json
117117
Authorization: Bearer <JWT‑TOKEN>
118118
```
119119
- Body:
120120
```json
121-
{ "value": "mydata" }
121+
{ "value": { ... } }
122122
```
123123
- Success:
124124
`201 Created`
125125
• Body (VersionedValue):
126126
```json
127127
{
128-
"value": "mydata",
128+
"value": { ... },
129129
"version": 1,
130130
"timestamp": 1618880821123,
131131
"owner": "alice"
@@ -134,25 +134,52 @@ All operations except **GET** require a valid JWT in the `Authorization: Bearer
134134
- Errors:
135135
`401 Unauthorized` if missing/invalid JWT or not owner on update
136136
137-
3. **🔍 Retrieve a Value (HTTP GET)**
137+
3. **🛠️ Partially Update a Value (HTTP PATCH)**
138+
Merge updates into an existing value under `{table}/{key}`. Only the owner can patch.
139+
- URL: `/{table}/key/{key}`
140+
- Headers:
141+
```
142+
Content-Type: application/json
143+
Authorization: Bearer <JWT‑TOKEN>
144+
```
145+
- Body:
146+
```json
147+
{ "new_field": "updated_data" }
148+
```
149+
- Success:
150+
`200 OK`
151+
• Body (Updated VersionedValue):
152+
```json
153+
{
154+
"value": { "original_field": "...", "new_field": "updated_data" },
155+
"version": 2,
156+
"timestamp": 1618880825000,
157+
"owner": "alice"
158+
}
159+
```
160+
- Errors:
161+
`401 Unauthorized` if missing/invalid JWT or not owner
162+
`404 Not Found` if key/table missing
163+
164+
4. **🔍 Retrieve a Value (HTTP GET)**
138165
Anyone can fetch a key’s latest value.
139-
- URL: `/default/key/mykey`
166+
- URL: `/{table}/key/{key}`
140167
- Success:
141168
`200 OK`
142169
• Body:
143170
```json
144171
{
145-
"value": "mydata",
172+
"value": { ... },
146173
"version": 1,
147174
"timestamp": 1618880821123,
148175
"owner": "alice"
149176
}
150177
```
151178
`404 Not Found` if key/table missing
152179
153-
4. **🗑️ Delete a Value (HTTP DELETE)**
180+
5. **🗑️ Delete a Value (HTTP DELETE)**
154181
Only the owner may delete.
155-
- URL: `/default/key/mykey`
182+
- URL: `/{table}/key/{key}`
156183
- Header:
157184
```
158185
Authorization: Bearer <JWT‑TOKEN>
@@ -167,46 +194,46 @@ All operations except **GET** require a valid JWT in the `Authorization: Bearer
167194
`401 Unauthorized` if no JWT or not owner
168195
`404 Not Found` if key/table missing
169196
170-
5. **📚 Fetch Entire Table Store (HTTP GET)**
197+
6. **📚 Fetch Entire Table Store (HTTP GET)**
171198
List all key→VersionedValue pairs in a table.
172-
- URL: `/default/store`
199+
- URL: `/{table}/store`
173200
- Success:
174201
`200 OK`
175202
• Body:
176203
```json
177204
{
178-
"key1": { "value":"v1","version":2,…,"owner":"bob" },
205+
"key1": { "value":{...},"version":2,…,"owner":"bob" },
179206
"key2": { … }
180207
}
181208
```
182209
- `404 Not Found` if table missing
183210
184-
6. **🔑 List or Batch‑Fetch Keys**
185-
6.1 **GET** `/default/keys`
211+
7. **🔑 List or Batch‑Fetch Keys**
212+
7.1 **GET** `/{table}/keys`
186213
`200 OK`
187214
```json
188215
["key1","key2",…]
189216
```
190-
6.2 **POST** `/default/keys`
217+
7.2 **POST** `/{table}/keys`
191218
- Body:
192219
```json
193220
["key1","key2","key3"]
194221
```
195222
- `200 OK`
196223
```json
197224
{
198-
"key1": { "value":"v1","version":… },
225+
"key1": { "value":{...},"version":… },
199226
"key3": { … }
200227
}
201228
```
202229
(non‑existent keys are omitted)
203230
204-
7. **🔔 Subscribe to Real‑Time Updates (SSE)**
231+
8. **🔔 Subscribe to Real‑Time Updates (SSE)**
205232
Instant updates on a single key.
206-
- URL: `/default/subscribe/mykey`
233+
- URL: `/{table}/subscribe/{key}`
207234
- Usage:
208235
```bash
209-
curl -N http://localhost:6660/default/subscribe/mykey
236+
curl -N http://localhost:6660/{table}/subscribe/{key}
210237
```
211238
- Each update is a JSON event:
212239
```json
@@ -232,22 +259,26 @@ TOKEN=$(curl -s -X POST http://localhost:6660/auth/alice \
232259
curl -i -X PUT http://localhost:6660/default/key/foo \
233260
-H "Content-Type: application/json" \
234261
-H "Authorization: Bearer $TOKEN" \
235-
-d '{"value":"hello"}'
262+
-d '{"value": {"name": "bar"}}'
263+
264+
# 4) PATCH (partially update)
265+
curl -i -X PATCH http://localhost:6660/default/key/foo \
266+
-H "Content-Type: application/json" \
267+
-H "Authorization: Bearer $TOKEN" \
268+
-d '{"age": 30}'
236269
237-
# 4) GET
270+
# 5) GET
238271
curl -i http://localhost:6660/default/key/foo
239272
240-
# 5) DELETE (owner only)
273+
# 6) DELETE (owner only)
241274
curl -i -X DELETE http://localhost:6660/default/key/foo \
242275
-H "Authorization: Bearer $TOKEN"
243276
244-
# 6) Node stats
277+
# 7) Node stats
245278
curl http://localhost:6660/stats
246-
Response:
247-
{"tables":{"auth":2,"test":20088,"default":1},"total_keys":20091,"total_requests":5,"average_latency_ms":0.4171416,"active_sse_connections":0}
248279
```
249280
250-
> ⚠️ PUT/DELETE without a valid JWT → **401 Unauthorized**
281+
> ⚠️ PUT/PATCH/DELETE without a valid JWT → **401 Unauthorized**
251282
> 🔍 GET is always open (no auth needed).
252283
---
253284

src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::sync::Arc;
1515
use tokio::sync::{RwLock, Semaphore};
1616
use storage::engine::{
1717
AppState, ClusterData, delete_value, get_value, get_table_store, join_cluster, put_value,
18-
get_all_keys, get_multiple_keys, NodeInfo, NodeStatus, current_timestamp,
18+
patch_value, get_all_keys, get_multiple_keys, NodeInfo, NodeStatus, current_timestamp,
1919
VersionedValue, get_global_store
2020
};
2121
use storage::persistance::{cold_save, load_all_tables};
@@ -239,6 +239,7 @@ async fn main() -> std::io::Result<()> {
239239
// Key–value endpoints with multi‑table support.
240240
.route("/{table}/key/{key}", web::get().to(get_value))
241241
.route("/{table}/key/{key}", web::put().to(put_value))
242+
.route("/{table}/key/{key}", web::patch().to(patch_value))
242243
.route("/{table}/key/{key}", web::delete().to(delete_value))
243244
// Endpoint to fetch a table’s entire in‑memory store.
244245
.route("/{table}/store", web::get().to(get_table_store))
@@ -283,6 +284,7 @@ async fn main() -> std::io::Result<()> {
283284
// Key–value endpoints with multi‑table support.
284285
.route("/{table}/key/{key}", web::get().to(get_value))
285286
.route("/{table}/key/{key}", web::put().to(put_value))
287+
.route("/{table}/key/{key}", web::patch().to(patch_value))
286288
.route("/{table}/key/{key}", web::delete().to(delete_value))
287289
// Endpoint to fetch a table’s entire in‑memory store.
288290
.route("/{table}/store", web::get().to(get_table_store))

src/storage/engine.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ impl VersionedValue {
5353
self.version += 1;
5454
self.timestamp = current_timestamp();
5555
}
56+
57+
pub fn patch(&mut self, updates: HashMap<String, Value>) {
58+
for (k, v) in updates {
59+
self.value.insert(k, v);
60+
}
61+
self.version += 1;
62+
self.timestamp = current_timestamp();
63+
}
5664
}
5765

5866
/// The local in-memory database state.
@@ -305,6 +313,93 @@ pub async fn get_value(
305313

306314
HttpResponse::Ok().json(latest)
307315
}
316+
// external PATCH handler
317+
#[allow(clippy::too_many_arguments)]
318+
pub async fn patch_value(
319+
req: HttpRequest,
320+
path: web::Path<(String, String)>,
321+
state: web::Data<AppState>,
322+
cluster_data: web::Data<ClusterData>,
323+
current_addr: web::Data<String>,
324+
sub_manager: web::Data<SubscriptionManager>,
325+
client: web::Data<reqwest::Client>,
326+
sem: web::Data<Arc<Semaphore>>,
327+
body: web::Json<HashMap<String, Value>>,
328+
) -> impl Responder {
329+
let (table_name, key_val) = path.into_inner();
330+
331+
// 1️⃣ Authenticate external user via JWT
332+
let user = match extract_user_from_token(&req) {
333+
Ok(u) => u,
334+
Err(resp) => return resp,
335+
};
336+
337+
// 2️⃣ Apply local update with ownership check
338+
let new_value = {
339+
let mut db = state.store.write().await;
340+
let table = db.entry(table_name.clone()).or_default();
341+
342+
if let Some(existing) = table.get_mut(&key_val) {
343+
if existing.owner != user {
344+
return HttpResponse::Unauthorized().json(serde_json::json!({
345+
"error": "Not authorized to update this record"
346+
}));
347+
}
348+
existing.patch(body.0.clone());
349+
existing.clone()
350+
} else {
351+
return HttpResponse::NotFound().json(serde_json::json!({
352+
"error": "Record not found"
353+
}));
354+
}
355+
};
356+
357+
// 3️⃣ Notify any SSE subscriptions
358+
sub_manager
359+
.notify(&table_name, &key_val, KeyEvent::Updated(new_value.clone()))
360+
.await;
361+
362+
// 4️⃣ Replicate *full* VersionedValue to other nodes, but cap concurrency
363+
let cluster = cluster_data.nodes.read().await;
364+
let active = get_active_nodes(&*cluster);
365+
drop(cluster);
366+
367+
let targets = get_replication_nodes(&key_val, &active, active.len());
368+
let secret = env::var("CLUSTER_SECRET").unwrap_or_default();
369+
370+
for target in targets {
371+
if target == *current_addr.get_ref() {
372+
continue;
373+
}
374+
// acquire a permit (will await if > 20 in flight)
375+
let permit = <Arc<Semaphore> as Clone>::clone(&sem).acquire_owned().await.unwrap();
376+
let cli = client.clone();
377+
let payload = new_value.clone();
378+
let url = format!(
379+
"http://{}/internal/{}/key/{}",
380+
target, table_name, key_val
381+
);
382+
383+
let secret_clone = secret.clone();
384+
385+
tokio::spawn(async move {
386+
let _permit = permit; // held until this future ends
387+
if let Err(e) = cli
388+
.put(&url)
389+
.header("X-Internal-Request", "true")
390+
.header("SECRET", &secret_clone)
391+
.json(&payload)
392+
.send()
393+
.await
394+
{
395+
eprintln!("replication to {} failed: {}", target, e);
396+
}
397+
});
398+
}
399+
400+
HttpResponse::Ok().json(new_value)
401+
}
402+
308403
// external PUT handler
309404
#[allow(clippy::too_many_arguments)]
310405
pub async fn put_value(

stress.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
# Configuration
44
PORT=6660
5-
BASE_URL="http://localhost:$PORT"
6-
TOTAL_RECORDS=10000 # Change to 100000 for the full test
5+
BASE_URL="http://linking.studio:$PORT"
6+
TOTAL_RECORDS=1000 # Change to 100000 for the full test
77
CONCURRENCY=50 # Number of parallel requests
88
TABLE="test_perf"
99

0 commit comments

Comments
 (0)