Skip to content

Commit ea11c40

Browse files
author
test
committed
Refactoring
1 parent c19097b commit ea11c40

2 files changed

Lines changed: 137 additions & 73 deletions

File tree

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ async fn main() -> std::io::Result<()> {
168168
// Spawn the membership synchronization (heartbeat) task.
169169
let cluster_clone = cluster_data.clone();
170170
let current_clone = current_node.clone();
171-
tokio::spawn(membership_sync(cluster_clone, current_clone, 60));
171+
tokio::spawn(membership_sync(cluster_clone, current_clone, 10));
172172

173173
println!("Starting distributed DB engine at http://{}", current_node);
174174

src/network/broadcaster.rs

Lines changed: 136 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,119 +5,183 @@ use actix_web::{web, HttpResponse, Responder};
55
use serde_json::json;
66
use crate::storage::engine::{current_timestamp, ClusterData, NodeInfo, NodeStatus};
77

8+
//
9+
// MEMBERSHIP BACKGROUND TASK
10+
//
11+
812
pub async fn membership_sync(
913
cluster_data: web::Data<ClusterData>,
1014
current_addr: String,
1115
interval_sec: u64,
1216
) {
1317
let client = reqwest::Client::new();
18+
1419
loop {
15-
// Wait for the specified interval.
20+
// Wait for the specified interval
1621
tokio::time::sleep(Duration::from_secs(interval_sec)).await;
1722

18-
// Update local membership state via heartbeat checks.
19-
{
20-
let mut nodes_guard = cluster_data.nodes.write().await;
21-
for (node, info) in nodes_guard.iter_mut() {
22-
// Skip self.
23-
if node == &current_addr {
24-
continue;
25-
}
26-
let url = format!("http://{}/heartbeat", node);
27-
match client.get(&url).timeout(Duration::from_secs(2)).send().await {
28-
Ok(resp) => {
29-
if resp.status().is_success() {
30-
info.last_heartbeat = current_timestamp();
31-
info.status = NodeStatus::Active;
32-
} else if info.status == NodeStatus::Active {
33-
info.status = NodeStatus::Suspect;
34-
}
35-
}
36-
Err(_) => {
37-
if info.status == NodeStatus::Active {
38-
info.status = NodeStatus::Suspect;
39-
} else if info.status == NodeStatus::Suspect {
40-
let now = current_timestamp();
41-
if now - info.last_heartbeat > (interval_sec as u128 * 2000) {
42-
info.status = NodeStatus::Down;
43-
}
44-
}
45-
}
46-
}
47-
}
23+
// Run health checks and update node statuses
24+
check_node_health(&cluster_data, &client, &current_addr, interval_sec).await;
25+
26+
// Distribute membership information to other nodes
27+
gossip_membership(&cluster_data, &client, &current_addr).await;
28+
}
29+
}
30+
31+
/// Check health of all nodes and update their status
32+
async fn check_node_health(
33+
cluster_data: &web::Data<ClusterData>,
34+
client: &reqwest::Client,
35+
current_addr: &str,
36+
interval_sec: u64,
37+
) {
38+
let mut nodes_guard = cluster_data.nodes.write().await;
39+
let timeout_threshold = interval_sec as u128 * 2000; // 2x interval in milliseconds
40+
41+
for (node, info) in nodes_guard.iter_mut() {
42+
// Skip self-check
43+
if node == current_addr {
44+
continue;
4845
}
4946

50-
// Print the current membership snapshot.
51-
let nodes_snapshot = cluster_data.nodes.read().await;
52-
println!("Node {} membership: {:?}", current_addr, *nodes_snapshot);
47+
let url = format!("http://{}/heartbeat", node);
48+
match client.get(&url).timeout(Duration::from_secs(2)).send().await {
49+
Ok(resp) if resp.status().is_success() => {
50+
// Successful heartbeat - mark node as active
51+
info.last_heartbeat = current_timestamp();
52+
info.status = NodeStatus::Active;
53+
},
54+
_ => {
55+
// Failed heartbeat - update status based on current state
56+
update_node_status(info, timeout_threshold);
57+
}
58+
}
59+
}
5360

54-
// Gossip: send your current membership state to all other nodes.
55-
let membership_snapshot = nodes_snapshot.clone();
56-
drop(nodes_snapshot); // Release lock before making outbound requests.
61+
println!("Node {} membership: {:?}", current_addr, *nodes_guard);
62+
}
5763

58-
for (node, _) in membership_snapshot.iter() {
59-
if node == &current_addr {
60-
continue;
64+
/// Update a node's status based on heartbeat failures
65+
fn update_node_status(info: &mut NodeInfo, timeout_threshold: u128) {
66+
match info.status {
67+
NodeStatus::Active => {
68+
// First failure - mark as suspect
69+
info.status = NodeStatus::Suspect;
70+
},
71+
NodeStatus::Suspect => {
72+
// Check if it's been down too long
73+
let now = current_timestamp();
74+
if now - info.last_heartbeat > timeout_threshold {
75+
info.status = NodeStatus::Down;
6176
}
62-
let gossip_url = format!("http://{}/update_membership", node);
63-
let membership_clone = membership_snapshot.clone();
64-
let client_clone = client.clone();
65-
let node_clone = node.clone();
66-
67-
// Spawn a task to send the membership update asynchronously.
68-
tokio::spawn(async move {
69-
match client_clone
70-
.post(&gossip_url)
71-
.json(&membership_clone)
72-
.send()
73-
.await
74-
{
75-
Ok(resp) => {
76-
if !resp.status().is_success() {
77-
eprintln!("Failed to gossip membership to {}: {}", node_clone, resp.status());
78-
}
79-
}
80-
Err(e) => {
81-
eprintln!("Error gossiping membership to {}: {}", node_clone, e);
82-
}
83-
}
84-
});
77+
},
78+
_ => {} // For other states, do nothing
79+
}
80+
}
81+
82+
/// Send membership information to all other nodes
83+
async fn gossip_membership(
84+
cluster_data: &web::Data<ClusterData>,
85+
client: &reqwest::Client,
86+
current_addr: &str,
87+
) {
88+
// Get a snapshot of current membership
89+
let nodes_snapshot = {
90+
let guard = cluster_data.nodes.read().await;
91+
guard.clone()
92+
};
93+
94+
// Send membership updates to each node
95+
for (node, _) in nodes_snapshot.iter() {
96+
if node == current_addr {
97+
continue;
8598
}
99+
100+
let gossip_url = format!("http://{}/update_membership", node);
101+
let membership_clone = nodes_snapshot.clone();
102+
let client_clone = client.clone();
103+
let node_clone = node.clone();
104+
105+
// Send update asynchronously
106+
tokio::spawn(async move {
107+
if let Err(e) = send_membership_update(&client_clone, &gossip_url, &membership_clone).await {
108+
eprintln!("Error gossiping membership to {}: {}", node_clone, e);
109+
}
110+
});
86111
}
87112
}
88113

89-
pub async fn heartbeat(_cluster_data: web::Data<ClusterData>) -> impl Responder {
90-
HttpResponse::Ok().json(json!({"status": "ok"}))
114+
/// Send a membership update to a specific node
115+
async fn send_membership_update(
116+
client: &reqwest::Client,
117+
url: &str,
118+
membership: &HashMap<String, NodeInfo>,
119+
) -> Result<(), String> {
120+
match client.post(url).json(membership).send().await {
121+
Ok(resp) => {
122+
if resp.status().is_success() {
123+
Ok(())
124+
} else {
125+
Err(format!("Failed with status: {}", resp.status()))
126+
}
127+
},
128+
Err(e) => Err(e.to_string()),
129+
}
91130
}
92131

132+
//
133+
// MEMBERSHIP HTTP ENDPOINTS
134+
//
135+
136+
/// Respond to heartbeat requests from other nodes
137+
pub async fn heartbeat() -> impl Responder {
138+
HttpResponse::Ok().json(json!({"status": "ok", "timestamp": current_timestamp()}))
139+
}
140+
141+
/// Process membership updates received from other nodes
93142
pub async fn update_membership(
94143
cluster_data: web::Data<ClusterData>,
95144
payload: web::Json<HashMap<String, NodeInfo>>,
96145
) -> impl Responder {
97146
let mut local_guard = cluster_data.nodes.write().await;
98147
let incoming = payload.into_inner();
99148
let mut updated = false;
149+
100150
for (node, info) in incoming.into_iter() {
151+
// For nodes not in our membership, add them
101152
if !local_guard.contains_key(&node) {
102153
local_guard.insert(node, info);
103154
updated = true;
104-
} else {
105-
let existing = local_guard.get(&node).unwrap();
106-
if info.last_heartbeat > existing.last_heartbeat {
107-
local_guard.insert(node, info);
108-
updated = true;
109-
}
155+
continue;
156+
}
157+
158+
// For existing nodes, only update if the incoming data is newer
159+
let existing = local_guard.get(&node).unwrap();
160+
if info.last_heartbeat > existing.last_heartbeat {
161+
local_guard.insert(node, info);
162+
updated = true;
110163
}
111164
}
165+
112166
if updated {
113167
println!("Updated membership: {:?}", *local_guard);
114168
}
169+
115170
HttpResponse::Ok().finish()
116171
}
117172

173+
/// Get the current cluster membership state
118174
pub async fn get_membership(
119175
cluster_data: web::Data<ClusterData>,
120176
) -> impl Responder {
121177
let nodes = cluster_data.nodes.read().await;
122-
HttpResponse::Ok().json(nodes.clone())
178+
179+
// Create a response with additional metadata
180+
let response = json!({
181+
"nodes": nodes.clone(),
182+
"active_count": nodes.values().filter(|n| n.status == NodeStatus::Active).count(),
183+
"timestamp": current_timestamp()
184+
});
185+
186+
HttpResponse::Ok().json(response)
123187
}

0 commit comments

Comments
 (0)