Skip to content

Commit 41f6f99

Browse files
author
test
committed
users did not replicate across nodes
1 parent ef3190e commit 41f6f99

2 files changed

Lines changed: 157 additions & 44 deletions

File tree

src/security/authentication.rs

Lines changed: 155 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
use actix_web::{web, HttpResponse, Responder};
1+
use actix_web::{web, HttpRequest, HttpResponse, Responder};
22
use actix_web::web::Json;
33
use serde::{Deserialize, Serialize};
44
use serde_json::json;
55
use sha2::{Digest, Sha256};
66
use hex;
77
use chrono::{Utc, Duration};
88
use std::collections::HashMap;
9+
use std::env;
910
use jsonwebtoken::{encode, Header, EncodingKey};
1011

11-
use crate::storage::engine::{AppState, VersionedValue};
12+
use crate::storage::engine::{get_active_nodes, get_replication_nodes, AppState, ClusterData, VersionedValue};
13+
use crate::storage::subscription::{KeyEvent, SubscriptionManager};
1214

1315
/// Incoming JSON payload.
1416
#[derive(Debug, Deserialize)]
@@ -27,68 +29,173 @@ struct Claims {
2729
const JWT_SECRET: &[u8] = b"kajdOsndmalskfi";
2830

2931
pub async fn access(
32+
req: HttpRequest,
3033
user: web::Path<String>,
3134
secret_req: Json<AccessToken>,
3235
state: web::Data<AppState>,
36+
cluster_data: web::Data<ClusterData>,
37+
current_addr: web::Data<String>,
38+
sub_manager: web::Data<SubscriptionManager>,
3339
) -> impl Responder {
3440
let username = user.into_inner();
3541
let provided_secret = secret_req.into_inner().secret;
3642

37-
// Acquire a write lock so we can create "auth" if missing.
38-
let mut store = state.store.write().await;
43+
println!("Access request for user: {}", username);
3944

40-
// Lazily create the auth table if this is the first call.
41-
let auth_table: &mut HashMap<String, VersionedValue> =
42-
store.entry("auth".to_string()).or_insert_with(HashMap::new);
45+
// Internal replication requests bypass authentication
46+
if req.headers().contains_key("X-Internal-Request") {
47+
println!("Processing internal replication request for {}", username);
4348

44-
// Look up the user record.
45-
let record = auth_table.get(&username);
46-
if record.is_none() {
47-
// User not found ⇒ registration step.
48-
// Hash the provided secret.
49-
let mut hasher = Sha256::new();
50-
hasher.update(&provided_secret);
51-
let hashed = hex::encode(hasher.finalize());
49+
// Simplified internal replication handling
50+
let mut store = state.store.write().await;
51+
let auth_table: &mut HashMap<String, VersionedValue> =
52+
store.entry("auth".to_string()).or_insert_with(HashMap::new);
5253

53-
// Build a small map { "secret": "<hex>" } and store it.
54+
// For internal requests, use the provided hash directly
5455
let mut value_map = HashMap::new();
55-
value_map.insert("secret".to_string(), json!(hashed));
56+
value_map.insert("secret".to_string(), json!(provided_secret));
5657

57-
let new_rec = VersionedValue::new(value_map, username.clone());
58+
let user_header = req.headers().get("User")
59+
.and_then(|h| h.to_str().ok())
60+
.unwrap_or(&username);
61+
62+
let new_rec = VersionedValue::new(value_map, String::from(user_header));
5863
auth_table.insert(username.clone(), new_rec.clone());
5964

60-
return HttpResponse::Ok().json(json!({
61-
"status": "User created"
65+
println!("Successfully replicated user: {}", username);
66+
67+
sub_manager
68+
.notify(&"auth".to_string(), &username, KeyEvent::Updated(new_rec.clone()))
69+
.await;
70+
71+
return HttpResponse::Created().json(json!({
72+
"status": "User replicated"
6273
}));
6374
}
6475

65-
// Otherwise this is a login attempt.
66-
let user_record = record.unwrap();
67-
68-
// Extract stored hash.
69-
let stored_hash = match user_record
70-
.value
71-
.get("secret")
72-
.and_then(|v| v.as_str())
73-
{
74-
Some(h) => h,
75-
None => {
76-
return HttpResponse::InternalServerError()
77-
.json(json!({"error":"Corrupt user record"}));
76+
// Regular authentication flow
77+
let is_new_user;
78+
let new_value = {
79+
let mut store = state.store.write().await;
80+
let auth_table: &mut HashMap<String, VersionedValue> =
81+
store.entry("auth".to_string()).or_insert_with(HashMap::new);
82+
83+
let record = auth_table.get(&username);
84+
if record.is_none() {
85+
println!("New user registration: {}", username);
86+
// User registration
87+
let mut hasher = Sha256::new();
88+
hasher.update(&provided_secret);
89+
let hashed = hex::encode(hasher.finalize());
90+
91+
let mut value_map = HashMap::new();
92+
value_map.insert("secret".to_string(), json!(hashed));
93+
94+
let new_rec = VersionedValue::new(value_map, username.clone());
95+
auth_table.insert(username.clone(), new_rec.clone());
96+
is_new_user = true;
97+
new_rec.clone()
98+
} else {
99+
println!("Login attempt for existing user: {}", username);
100+
// Login attempt
101+
let user_record = record.unwrap();
102+
103+
let stored_hash = match user_record
104+
.value
105+
.get("secret")
106+
.and_then(|v| v.as_str())
107+
{
108+
Some(h) => h,
109+
None => {
110+
return HttpResponse::InternalServerError()
111+
.json(json!({"error":"Corrupt user record"}));
112+
}
113+
};
114+
115+
let mut hasher = Sha256::new();
116+
hasher.update(&provided_secret);
117+
let provided_hash = hex::encode(hasher.finalize());
118+
119+
if stored_hash != provided_hash {
120+
return HttpResponse::Unauthorized()
121+
.json(json!({"error":"Invalid credentials"}));
122+
}
123+
124+
is_new_user = false;
125+
user_record.clone()
78126
}
79127
};
80128

81-
// Hash the provided secret.
82-
let mut hasher = Sha256::new();
83-
hasher.update(&provided_secret);
84-
let provided_hash = hex::encode(hasher.finalize());
129+
// Notify subscribers about the update
130+
if is_new_user {
131+
sub_manager
132+
.notify(&"auth".to_string(), &username, KeyEvent::Updated(new_value.clone()))
133+
.await;
134+
}
135+
136+
// Replicate new user to other nodes
137+
if is_new_user {
138+
println!("Starting replication for new user: {}", username);
139+
let cluster_guard = cluster_data.nodes.read().await;
140+
let active_nodes = get_active_nodes(&*cluster_guard);
141+
drop(cluster_guard);
142+
143+
let replication_factor = active_nodes.len();
144+
let targets = get_replication_nodes(&username, &active_nodes, replication_factor);
145+
println!("Replication targets: {:?}", targets);
146+
147+
let client = reqwest::Client::new();
148+
let mut replication_futures = Vec::new();
149+
150+
// Get the hashed secret for replication
151+
let hashed_secret = new_value.value.get("secret").unwrap().as_str().unwrap();
152+
153+
for target in targets {
154+
if target != *current_addr.get_ref() {
155+
// Make sure URL format matches your API route configuration
156+
let url = format!("http://{}/auth/{}", target, username);
157+
println!("Preparing replication to: {}", url);
158+
159+
let client_clone = client.clone();
160+
let username_clone = username.clone();
161+
let secret_clone = hashed_secret.to_string();
162+
163+
let fut = async move {
164+
println!("Sending replication request to: {}", url);
165+
// Create the payload with the same structure as AccessToken
166+
let payload = json!({"secret": secret_clone});
167+
168+
let result = client_clone
169+
.post(&url)
170+
.header("X-Internal-Request", "true")
171+
.header("User", &username_clone)
172+
.json(&payload)
173+
.send()
174+
.await;
175+
176+
match result {
177+
Ok(response) => {
178+
println!("Replication response from {}: status={}",
179+
url, response.status());
180+
},
181+
Err(e) => {
182+
println!("Replication error to {}: {}", url, e);
183+
}
184+
}
185+
};
186+
replication_futures.push(fut);
187+
}
188+
}
85189

86-
if stored_hash != provided_hash {
87-
return HttpResponse::Unauthorized()
88-
.json(json!({"error":"Invalid credentials"}));
190+
// Use a separate clone for the tokio::spawn task
191+
let username_for_spawn = username.clone();
192+
tokio::spawn(async move {
193+
futures_util::future::join_all(replication_futures).await;
194+
println!("Replication completed for user: {}", username_for_spawn);
195+
});
89196
}
90197

91-
// Generate a JWT token, 1h expiry.
198+
// Generate JWT token for the authenticated user
92199
let exp = (Utc::now() + Duration::hours(1)).timestamp() as usize;
93200
let claims = Claims { sub: username.clone(), exp };
94201

@@ -105,5 +212,11 @@ pub async fn access(
105212
}
106213
};
107214

108-
HttpResponse::Ok().json(json!({ "token": token }))
215+
if is_new_user {
216+
println!("New user created and token generated for: {}", username);
217+
HttpResponse::Created().json(json!({ "token": token, "status": "User created" }))
218+
} else {
219+
println!("Token generated for existing user: {}", username);
220+
HttpResponse::Ok().json(json!({ "token": token }))
221+
}
109222
}

src/storage/engine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pub fn current_timestamp() -> u128 {
107107
}
108108

109109
/// Extract active nodes from the cluster membership map.
110-
fn get_active_nodes(nodes: &HashMap<String, NodeInfo>) -> Vec<String> {
110+
pub fn get_active_nodes(nodes: &HashMap<String, NodeInfo>) -> Vec<String> {
111111
nodes
112112
.iter()
113113
.filter(|(_, info)| info.status == NodeStatus::Active)
@@ -116,7 +116,7 @@ fn get_active_nodes(nodes: &HashMap<String, NodeInfo>) -> Vec<String> {
116116
}
117117

118118
/// Computes replication targets for a given key.
119-
fn get_replication_nodes(key: &str, nodes: &[String], replication_factor: usize) -> Vec<String> {
119+
pub fn get_replication_nodes(key: &str, nodes: &[String], replication_factor: usize) -> Vec<String> {
120120
if nodes.is_empty() {
121121
return Vec::new();
122122
}

0 commit comments

Comments
 (0)