Skip to content

Commit af2a838

Browse files
author
test
committed
Real-time updates
1 parent dff5a01 commit af2a838

7 files changed

Lines changed: 459 additions & 211 deletions

File tree

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ base64 = "0.21"
1414
once_cell = "1.21.3"
1515
regex = "1.11.1"
1616
futures = "0.3.31"
17+
async-stream = "0.3.6"
1718

README.md

Lines changed: 303 additions & 192 deletions
Large diffs are not rendered by default.

src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use storage::engine::{
1818
};
1919
use storage::persistance::{cold_save, load_all_tables};
2020
use network::broadcaster::{membership_sync, heartbeat, get_membership, update_membership};
21+
use crate::storage::subscription::SubscriptionManager;
2122

2223
/// Declare APP_STATE globally so that it’s available throughout the module.
2324
static APP_STATE: OnceCell<web::Data<AppState>> = OnceCell::new();
@@ -169,12 +170,14 @@ async fn main() -> std::io::Result<()> {
169170
let cluster_clone = cluster_data.clone();
170171
let current_clone = current_node.clone();
171172
tokio::spawn(membership_sync(cluster_clone, current_clone, 60));
173+
let subscription_manager = web::Data::new(SubscriptionManager::new());
172174

173175
println!("Starting distributed DB engine at http://{}", current_node);
174176

175177
// Build and run the HTTP server.
176178
HttpServer::new(move || {
177179
App::new()
180+
.app_data(subscription_manager.clone())
178181
.app_data(state.clone())
179182
.app_data(cluster_data.clone())
180183
.app_data(web::Data::new(current_node.clone()))
@@ -194,6 +197,9 @@ async fn main() -> std::io::Result<()> {
194197
// Endpoints to get keys from a table.
195198
.route("/{table}/keys", web::get().to(get_all_keys))
196199
.route("/{table}/keys", web::post().to(get_multiple_keys))
200+
.route("/{table}/subscribe/{key}", web::get().to(
201+
storage::subscription::subscribe_to_key
202+
))
197203
})
198204
.bind(bind_addr.as_str())?
199205
.run()

src/storage/engine.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::env;
99
use std::time::{SystemTime, UNIX_EPOCH};
1010
use tokio::sync::RwLock;
1111
use web::Json;
12-
12+
use crate::storage::subscription::{KeyEvent, SubscriptionManager};
1313
//
1414
// CORE DATA STRUCTURES
1515
//
@@ -237,18 +237,21 @@ pub async fn get_value(
237237
HttpResponse::Ok().json(latest)
238238
}
239239

240-
/// PUT handler: Inserts or updates a key-value pair and replicates to active nodes
240+
/// PUT handler: Inserts or updates a key–value pair and replicates to active nodes.
241+
/// Also sends a live update notification via the SubscriptionManager.
241242
pub async fn put_value(
242243
req: HttpRequest,
243244
path: web::Path<(String, String)>,
244245
state: web::Data<AppState>,
245246
cluster_data: web::Data<ClusterData>,
246247
current_addr: web::Data<String>,
248+
// New dependency injection for the subscription manager.
249+
sub_manager: web::Data<SubscriptionManager>,
247250
body: web::Json<HashMap<String, Value>>,
248251
) -> impl Responder {
249252
let (table_name, key_val) = path.into_inner();
250253

251-
// Handle internal replication request (no further replication)
254+
// Handle internal replication request (skip notifications).
252255
if req.headers().contains_key("X-Internal-Request") {
253256
let new_value = {
254257
let mut store = state.store.write().await;
@@ -262,10 +265,13 @@ pub async fn put_value(
262265
v
263266
}
264267
};
268+
sub_manager.notify(&table_name, &key_val, KeyEvent::Updated(new_value.clone())).await;
269+
265270
return HttpResponse::Created().json(new_value);
266271
}
267272

268-
// For external requests, update locally first
273+
274+
// External request: update local store.
269275
let new_value = {
270276
let mut store = state.store.write().await;
271277
let table = store.entry(table_name.clone()).or_insert_with(HashMap::new);
@@ -279,7 +285,11 @@ pub async fn put_value(
279285
}
280286
};
281287

282-
// Replicate to other nodes
288+
// Notify all subscribers that the key has been updated.
289+
sub_manager.notify(&table_name, &key_val, KeyEvent::Updated(new_value.clone())).await;
290+
291+
292+
// Replicate to other nodes.
283293
let cluster_guard = cluster_data.nodes.read().await;
284294
let active_nodes = get_active_nodes(&*cluster_guard);
285295
drop(cluster_guard);
@@ -289,7 +299,6 @@ pub async fn put_value(
289299

290300
let client = reqwest::Client::new();
291301
let mut replication_futures = Vec::new();
292-
293302
for target in targets {
294303
if target != *current_addr.get_ref() {
295304
let url = format!("http://{}/{}/key/{}", target, table_name, key_val);
@@ -314,35 +323,40 @@ pub async fn put_value(
314323
}
315324
}
316325

317-
// Fire replication requests, but don't wait for them to complete
326+
// Fire replication requests asynchronously.
318327
tokio::spawn(async move {
319328
futures_util::future::join_all(replication_futures).await;
320329
});
321330

322-
// Return immediately after local update
323331
HttpResponse::Created().json(new_value)
324332
}
325333

326-
/// DELETE handler: Removes a key and replicates the deletion to active nodes
334+
/// DELETE handler: Removes a key and replicates the deletion to active nodes.
335+
/// Also notifies subscribers of the deletion event.
327336
pub async fn delete_value(
328337
req: HttpRequest,
329338
path: web::Path<(String, String)>,
330339
state: web::Data<AppState>,
331340
cluster_data: web::Data<ClusterData>,
332341
current_addr: web::Data<String>,
342+
sub_manager: web::Data<SubscriptionManager>,
333343
) -> impl Responder {
334344
let (table_name, key_val) = path.into_inner();
335345

336-
// Handle internal replication request
346+
// Internal replication request.
337347
if req.headers().contains_key("X-Internal-Request") {
338348
let mut store = state.store.write().await;
339349
if let Some(table) = store.get_mut(&table_name) {
340350
table.remove(&key_val);
341351
}
352+
sub_manager
353+
.notify(&table_name, &key_val, KeyEvent::Deleted)
354+
.await;
355+
342356
return HttpResponse::Ok().json(json!({"message": "Deleted"}));
343357
}
344358

345-
// For external requests, delete locally first
359+
// External request: remove the key locally.
346360
let local_status = {
347361
let mut store = state.store.write().await;
348362
if let Some(table) = store.get_mut(&table_name) {
@@ -357,7 +371,12 @@ pub async fn delete_value(
357371
.to_string()
358372
};
359373

360-
// Replicate deletion to other nodes
374+
// Notify subscribers that the key has been deleted.
375+
sub_manager
376+
.notify(&table_name, &key_val, KeyEvent::Deleted)
377+
.await;
378+
379+
// Replicate deletion to other nodes.
361380
let cluster_guard = cluster_data.nodes.read().await;
362381
let active_nodes = get_active_nodes(&*cluster_guard);
363382
drop(cluster_guard);
@@ -367,12 +386,10 @@ pub async fn delete_value(
367386

368387
let client = reqwest::Client::new();
369388
let mut replication_futures = Vec::new();
370-
371389
for target in targets {
372390
if target != *current_addr.get_ref() {
373391
let url = format!("http://{}/{}/key/{}", target, table_name, key_val);
374392
let client_clone = client.clone();
375-
376393
let fut = async move {
377394
let result = client_clone
378395
.delete(&url)
@@ -385,20 +402,16 @@ pub async fn delete_value(
385402
println!("Deletion replication error to {}: {}", target, e);
386403
}
387404
};
388-
389405
replication_futures.push(fut);
390406
}
391407
}
392408

393-
// Fire replication requests in background
394409
tokio::spawn(async move {
395410
futures_util::future::join_all(replication_futures).await;
396411
});
397412

398-
// Return immediately after local deletion
399413
HttpResponse::Ok().json(json!({"message": local_status}))
400414
}
401-
402415
//
403416
// TABLE MANAGEMENT HANDLERS
404417
//

src/storage/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod engine;
2-
pub mod persistance;
2+
pub mod persistance;
3+
pub(crate) mod subscription;

src/storage/subscription.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// storage/subscription.rs
2+
3+
use serde::{Deserialize, Serialize};
4+
use std::collections::HashMap;
5+
use std::sync::Arc;
6+
use tokio::sync::{broadcast, RwLock};
7+
8+
/// An event describing a key update (or deletion).
9+
#[derive(Debug, Clone, Serialize, Deserialize)]
10+
pub enum KeyEvent {
11+
Updated(VersionedValue),
12+
Deleted,
13+
}
14+
15+
/// Manages broadcast channels for keys so that multiple subscribers can listen.
16+
#[derive(Clone)]
17+
pub struct SubscriptionManager {
18+
// The key is a combination of table name and key (e.g. "default/mykey")
19+
pub channels: Arc<RwLock<HashMap<String, broadcast::Sender<KeyEvent>>>>,
20+
}
21+
22+
impl SubscriptionManager {
23+
pub fn new() -> Self {
24+
Self {
25+
channels: Arc::new(RwLock::new(HashMap::new())),
26+
}
27+
}
28+
29+
/// Subscribe to updates for a given table/key.
30+
pub async fn subscribe(&self, table: &str, key: &str) -> broadcast::Receiver<KeyEvent> {
31+
let identifier = format!("{}/{}", table, key);
32+
let mut channels = self.channels.write().await;
33+
if let Some(sender) = channels.get(&identifier) {
34+
sender.subscribe()
35+
} else {
36+
// Create a new channel if none exists; here we use a buffer of 16 events.
37+
let (tx, rx) = broadcast::channel(16);
38+
channels.insert(identifier, tx);
39+
rx
40+
}
41+
}
42+
43+
/// Send a notification for the given table/key.
44+
pub async fn notify(&self, table: &str, key: &str, event: KeyEvent) {
45+
let identifier = format!("{}/{}", table, key);
46+
let channels = self.channels.read().await;
47+
if let Some(sender) = channels.get(&identifier) {
48+
// Ignore send error if there are no subscribers
49+
let _ = sender.send(event);
50+
}
51+
}
52+
}
53+
// storage/subscription.rs (continued)
54+
55+
use actix_web::{web, HttpResponse, Responder};
56+
use futures::stream::StreamExt;
57+
use crate::storage::engine::VersionedValue;
58+
59+
/// Subscribe to updates on a key. The URL contains the table and key.
60+
pub async fn subscribe_to_key(
61+
path: web::Path<(String, String)>,
62+
sub_manager: web::Data<SubscriptionManager>,
63+
) -> impl Responder {
64+
let (table, key) = path.into_inner();
65+
66+
// Get a broadcast receiver for this key.
67+
let mut rx = sub_manager.subscribe(&table, &key).await;
68+
69+
// Build an SSE stream. Each event is yielded as a chunk in the HTTP response.
70+
let stream = async_stream::stream! {
71+
loop {
72+
match rx.recv().await {
73+
Ok(event) => {
74+
// Serialize the event as JSON.
75+
let data = serde_json::to_string(&event)
76+
.unwrap_or_else(|_| "null".to_string());
77+
// SSE events MUST be formatted as "data: <payload>\n\n".
78+
yield Ok::<_, actix_web::Error>(
79+
web::Bytes::from(format!("data: {}\n\n", data))
80+
);
81+
}
82+
// If messages are dropped, simply continue.
83+
Err(broadcast::error::RecvError::Lagged(_)) => continue,
84+
// Exit the loop when the sender is dropped.
85+
Err(broadcast::error::RecvError::Closed) => break,
86+
}
87+
}
88+
};
89+
90+
HttpResponse::Ok()
91+
.append_header(("Content-Type", "text/event-stream"))
92+
.streaming(stream)
93+
}

0 commit comments

Comments
 (0)