Skip to content

Commit 8b5d05c

Browse files
author
test
committed
syncing auth table issues and https node syncing issue fixed
1 parent 41f6f99 commit 8b5d05c

3 files changed

Lines changed: 74 additions & 25 deletions

File tree

src/network/broadcaster.rs

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,31 @@
11
use std::collections::HashMap;
2+
use std::env;
23
use std::time::Duration;
34
use reqwest;
45
use actix_web::{web, HttpResponse, Responder};
56
use serde_json::json;
6-
use crate::storage::engine::{current_timestamp, ClusterData, NodeInfo, NodeStatus};
7+
use crate::storage::engine::{
8+
current_timestamp, ClusterData, NodeInfo, NodeStatus,
9+
};
10+
11+
/// Read DYNA_MODE and return either "https" or "http".
12+
fn get_scheme() -> &'static str {
13+
match env::var("DYNA_MODE").map(|m| m.to_lowercase()).as_deref() {
14+
Ok("https") => "https",
15+
_ => "http",
16+
}
17+
}
18+
19+
/// Strip any existing scheme/trailing slash from `node` and build a full URL.
20+
fn build_url(node: &str, endpoint: &str) -> String {
21+
let scheme = get_scheme();
22+
let host = node
23+
.trim_start_matches("http://")
24+
.trim_start_matches("https://")
25+
.trim_end_matches('/');
26+
let ep = endpoint.trim_start_matches('/');
27+
format!("{}://{}/{}", scheme, host, ep)
28+
}
729

830
//
931
// MEMBERSHIP BACKGROUND TASK
@@ -21,7 +43,13 @@ pub async fn membership_sync(
2143
tokio::time::sleep(Duration::from_secs(interval_sec)).await;
2244

2345
// Run health checks and update node statuses
24-
check_node_health(&cluster_data, &client, &current_addr, interval_sec).await;
46+
check_node_health(
47+
&cluster_data,
48+
&client,
49+
&current_addr,
50+
interval_sec,
51+
)
52+
.await;
2553

2654
// Distribute membership information to other nodes
2755
gossip_membership(&cluster_data, &client, &current_addr).await;
@@ -36,27 +64,27 @@ async fn check_node_health(
3664
interval_sec: u64,
3765
) {
3866
let mut nodes_guard = cluster_data.nodes.write().await;
39-
let timeout_threshold = interval_sec as u128 * 2000; // 2x interval in milliseconds
67+
// 2x interval in milliseconds
68+
let timeout_threshold = (interval_sec as u128) * 2000;
4069

4170
for (node, info) in nodes_guard.iter_mut() {
4271
// Skip self-check
4372
if node == current_addr {
4473
continue;
4574
}
46-
let url = if node.starts_with("http://") || node.starts_with("https://") {
47-
// Remove any trailing slash for consistency, then append the endpoint.
48-
format!("{}/heartbeat", node.trim_end_matches('/'))
49-
} else {
50-
// If no scheme is present, use http as the default.
51-
format!("http://{}/heartbeat", node)
52-
};
53-
54-
match client.get(&url).timeout(Duration::from_secs(2)).send().await {
75+
let url = build_url(node, "heartbeat");
76+
77+
match client
78+
.get(&url)
79+
.timeout(Duration::from_secs(2))
80+
.send()
81+
.await
82+
{
5583
Ok(resp) if resp.status().is_success() => {
5684
// Successful heartbeat - mark node as active
5785
info.last_heartbeat = current_timestamp();
5886
info.status = NodeStatus::Active;
59-
},
87+
}
6088
_ => {
6189
// Failed heartbeat - update status based on current state
6290
update_node_status(info, timeout_threshold);
@@ -73,20 +101,20 @@ fn update_node_status(info: &mut NodeInfo, timeout_threshold: u128) {
73101
NodeStatus::Active => {
74102
// First failure - mark as suspect
75103
info.status = NodeStatus::Suspect;
76-
},
104+
}
77105
NodeStatus::Suspect => {
78106
// Check if it's been down too long
79107
let now = current_timestamp();
80108
if now - info.last_heartbeat > timeout_threshold {
81109
info.status = NodeStatus::Down;
82110
}
83-
},
111+
}
84112
_ => {} // For other states, do nothing
85113
}
86114
}
87115

88116
/// Send membership information to all other nodes
89-
async fn gossip_membership(
117+
pub async fn gossip_membership(
90118
cluster_data: &web::Data<ClusterData>,
91119
client: &reqwest::Client,
92120
current_addr: &str,
@@ -103,15 +131,21 @@ async fn gossip_membership(
103131
continue;
104132
}
105133

106-
let gossip_url = format!("http://{}/update_membership", node);
134+
let gossip_url = build_url(node, "update_membership");
107135
let membership_clone = nodes_snapshot.clone();
108136
let client_clone = client.clone();
109137
let node_clone = node.clone();
110138

111139
// Send update asynchronously
112140
tokio::spawn(async move {
113-
if let Err(e) = send_membership_update(&client_clone, &gossip_url, &membership_clone).await {
114-
eprintln!("Error gossiping membership to {}: {}", node_clone, e);
141+
if let Err(e) =
142+
send_membership_update(&client_clone, &gossip_url, &membership_clone)
143+
.await
144+
{
145+
eprintln!(
146+
"Error gossiping membership to {}: {}",
147+
node_clone, e
148+
);
115149
}
116150
});
117151
}
@@ -130,7 +164,7 @@ async fn send_membership_update(
130164
} else {
131165
Err(format!("Failed with status: {}", resp.status()))
132166
}
133-
},
167+
}
134168
Err(e) => Err(e.to_string()),
135169
}
136170
}
@@ -141,7 +175,10 @@ async fn send_membership_update(
141175

142176
/// Respond to heartbeat requests from other nodes
143177
pub async fn heartbeat() -> impl Responder {
144-
HttpResponse::Ok().json(json!({"status": "ok", "timestamp": current_timestamp()}))
178+
HttpResponse::Ok().json(json!({
179+
"status": "ok",
180+
"timestamp": current_timestamp(),
181+
}))
145182
}
146183

147184
/// Process membership updates received from other nodes
@@ -185,8 +222,11 @@ pub async fn get_membership(
185222
// Create a response with additional metadata
186223
let response = json!({
187224
"nodes": nodes.clone(),
188-
"active_count": nodes.values().filter(|n| n.status == NodeStatus::Active).count(),
189-
"timestamp": current_timestamp()
225+
"active_count": nodes
226+
.values()
227+
.filter(|n| n.status == NodeStatus::Active)
228+
.count(),
229+
"timestamp": current_timestamp(),
190230
});
191231

192232
HttpResponse::Ok().json(response)

src/security/authentication.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use sha2::{Digest, Sha256};
66
use hex;
77
use chrono::{Utc, Duration};
88
use std::collections::HashMap;
9-
use std::env;
109
use jsonwebtoken::{encode, Header, EncodingKey};
1110

1211
use crate::storage::engine::{get_active_nodes, get_replication_nodes, AppState, ClusterData, VersionedValue};

src/storage/engine.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use tokio::sync::RwLock;
1111
use web::Json;
1212
use crate::storage::subscription::{KeyEvent, SubscriptionManager};
1313
use jsonwebtoken::{decode, DecodingKey, Validation};
14-
14+
use tokio::time;
15+
use crate::network::broadcaster::{gossip_membership};
1516
//
1617
// CONSTANTS & TYPES
1718
//
@@ -559,6 +560,7 @@ pub async fn join_cluster(
559560
}
560561

561562
let new_node = request.node.clone();
563+
let cluster2 = cluster.clone();
562564
let mut nodes_guard = cluster.nodes.write().await;
563565
if !nodes_guard.contains_key(&new_node) {
564566
nodes_guard.insert(
@@ -568,6 +570,14 @@ pub async fn join_cluster(
568570
last_heartbeat: current_timestamp(),
569571
},
570572
);
573+
574+
tokio::spawn(async move {
575+
time::sleep(std::time::Duration::from_secs(5)).await;
576+
let args: Vec<String> = env::args().collect();
577+
let current_node = args[1].clone();
578+
let client = reqwest::Client::new();
579+
gossip_membership(&cluster2, &client, &*current_node).await;
580+
});
571581
}
572582
HttpResponse::Ok().json(nodes_guard.clone())
573583
}

0 commit comments

Comments
 (0)