Skip to content

Commit 87aeb62

Browse files
Istrate Andrei-EduardIstrate Andrei-Eduard
authored andcommitted
FIX
1 parent ffa9599 commit 87aeb62

3 files changed

Lines changed: 112 additions & 17 deletions

File tree

src/security/authentication.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ pub async fn access(
192192
}
193193

194194
// Generate JWT token for the authenticated user
195-
let exp = (Utc::now() + Duration::hours(1)).timestamp() as usize;
195+
let exp = (Utc::now() + Duration::hours(24)).timestamp() as usize;
196196
let claims = Claims { sub: username.clone(), exp };
197197

198198
let token = match encode(

src/storage/engine.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,9 @@ pub async fn delete_value(
440440
cluster_data: web::Data<ClusterData>,
441441
current_addr: web::Data<String>,
442442
sub_manager: web::Data<SubscriptionManager>,
443+
// ADDED: Shared client and semaphore
444+
client: web::Data<reqwest::Client>,
445+
sem: web::Data<Arc<Semaphore>>,
443446
) -> impl Responder {
444447
let (table_name, key_val) = path.into_inner();
445448

@@ -492,7 +495,7 @@ pub async fn delete_value(
492495
} else {
493496
"Table not found locally"
494497
}
495-
.to_string()
498+
.to_string()
496499
};
497500

498501
sub_manager
@@ -506,13 +509,16 @@ pub async fn delete_value(
506509
let replication_factor = active_nodes.len();
507510
let targets = get_replication_nodes(&key_val, &active_nodes, replication_factor);
508511

509-
let client = reqwest::Client::new();
510-
let mut replication_futures = Vec::new();
511512
for target in targets {
512513
if target != *current_addr.get_ref() {
513514
let url = format!("http://{}/{}/key/{}", target, table_name, key_val);
514515
let client_clone = client.clone();
515-
let fut = async move {
516+
517+
// Acquire permit to cap concurrent outgoing replication tasks
518+
let permit = <Arc<Semaphore> as Clone>::clone(&sem).acquire_owned().await.unwrap();
519+
520+
tokio::spawn(async move {
521+
let _permit = permit; // Permit is held until the deletion request completes
516522
let result = client_clone
517523
.delete(&url)
518524
.header("X-Internal-Request", "true")
@@ -522,16 +528,12 @@ pub async fn delete_value(
522528
if let Err(e) = result {
523529
println!("Deletion replication error to {}: {}", target, e);
524530
}
525-
};
526-
replication_futures.push(fut);
531+
});
527532
}
528533
}
529-
tokio::spawn(async move {
530-
futures_util::future::join_all(replication_futures).await;
531-
});
534+
532535
HttpResponse::Ok().json(json!({"message": local_status}))
533536
}
534-
535537
//
536538
// TABLE MANAGEMENT HANDLERS
537539
//
@@ -592,6 +594,9 @@ pub async fn get_multiple_keys(
592594
pub async fn join_cluster(
593595
cluster: web::Data<ClusterData>,
594596
request: Json<JoinRequest>,
597+
// ADDED: Shared client and semaphore
598+
client: web::Data<reqwest::Client>,
599+
sem: web::Data<Arc<Semaphore>>,
595600
) -> impl Responder {
596601
let cluster_secret = env::var("CLUSTER_SECRET")
597602
.unwrap_or_else(|_| "default_secret".to_string());
@@ -603,6 +608,7 @@ pub async fn join_cluster(
603608
let new_node = request.node.clone();
604609
let cluster2 = cluster.clone();
605610
let mut nodes_guard = cluster.nodes.write().await;
611+
606612
if !nodes_guard.contains_key(&new_node) {
607613
nodes_guard.insert(
608614
new_node.clone(),
@@ -612,17 +618,26 @@ pub async fn join_cluster(
612618
},
613619
);
614620

615-
tokio::spawn(async move {
621+
// Safely bound the background sleep/gossip task.
622+
// If the system is maxed out, we skip the background gossip rather than queuing unbound tasks.
623+
if let Ok(permit) = <Arc<Semaphore> as Clone>::clone(&sem).try_acquire_owned() {
624+
tokio::spawn(async move {
625+
let _permit = permit; // Permit held during the sleep and gossip phase
616626
time::sleep(std::time::Duration::from_secs(5)).await;
627+
617628
let args: Vec<String> = env::args().collect();
618-
let current_node = args[1].clone();
619-
let client = reqwest::Client::new();
620-
gossip_membership(&cluster2, &client, &*current_node).await;
621-
});
629+
if args.len() > 1 {
630+
let current_node = args[1].clone();
631+
gossip_membership(&cluster2, &client, &current_node).await;
632+
}
633+
});
634+
} else {
635+
eprintln!("Warning: Dropping gossip broadcast for new node {} due to high load", new_node);
636+
}
622637
}
638+
623639
HttpResponse::Ok().json(nodes_guard.clone())
624640
}
625-
626641
pub async fn get_global_store(
627642
req: HttpRequest,
628643
state: web::Data<AppState>,

stress.sh

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/bin/bash
2+
3+
# Configuration
4+
PORT=6660
5+
BASE_URL="http://localhost:$PORT"
6+
TOTAL_RECORDS=10000 # Change to 100000 for the full test
7+
CONCURRENCY=50 # Number of parallel requests
8+
TABLE="test_perf"
9+
10+
echo "=== Starting Performance Test ($TOTAL_RECORDS records) ==="
11+
12+
# 1) Register & Authenticate Alice
13+
echo "[1/4] Registering and authenticating user..."
14+
curl -s -X POST "$BASE_URL/auth/alice" \
15+
-H "Content-Type: application/json" \
16+
-d '{"secret":"s3cr3t"}' > /dev/null
17+
18+
TOKEN=$(curl -s -X POST "$BASE_URL/auth/alice" \
19+
-H "Content-Type: application/json" \
20+
-d '{"secret":"s3cr3t"}' | jq -r .token)
21+
22+
if [ -z "$TOKEN" ] || [ "$TOKEN" == "null" ]; then
23+
echo "Error: Failed to retrieve JWT token. Is the server running?"
24+
exit 1
25+
fi
26+
27+
# 50-word Lorem Ipsum JSON payload
28+
PAYLOAD='{"value": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident."}'
29+
30+
# 2) Parallel PUT Operations (Data Storage Test)
31+
echo "[2/4] Inserting $TOTAL_RECORDS records with $CONCURRENCY concurrent workers..."
32+
START_TIME=$(date +%s)
33+
34+
# We use seq and xargs to blast the server with concurrent requests
35+
seq 1 $TOTAL_RECORDS | xargs -n 1 -P $CONCURRENCY -I {} \
36+
curl -s -X PUT "$BASE_URL/$TABLE/key/req_{}" \
37+
-H "Content-Type: application/json" \
38+
-H "Authorization: Bearer $TOKEN" \
39+
-d "$PAYLOAD" -o /dev/null
40+
41+
END_TIME=$(date +%s)
42+
echo "Done! Insertion took $((END_TIME - START_TIME)) seconds."
43+
44+
# 3) GET Operation Latency Test
45+
echo "[3/4] Measuring GET Latency..."
46+
TOTAL_TIME=0
47+
SAMPLE_SIZE=100
48+
49+
for i in $(seq 1 $SAMPLE_SIZE); do
50+
# Fetch a random key from the ones we just created
51+
RANDOM_KEY="req_$((1 + $RANDOM % $TOTAL_RECORDS))"
52+
53+
# Use curl's write-out feature to get the exact time in seconds, then convert to ms
54+
TIME=$(curl -s -o /dev/null -w "%{time_total}" "$BASE_URL/$TABLE/key/$RANDOM_KEY")
55+
TOTAL_TIME=$(awk "BEGIN {print $TOTAL_TIME + $TIME}")
56+
done
57+
58+
AVG_LATENCY=$(awk "BEGIN {print ($TOTAL_TIME / $SAMPLE_SIZE) * 1000}")
59+
echo "Average GET Latency over $SAMPLE_SIZE requests: ${AVG_LATENCY} ms"
60+
61+
# 4) System Stats & Memory
62+
echo "[4/4] Fetching Node Stats..."
63+
curl -s "$BASE_URL/stats" | jq .
64+
65+
echo "=== System Resource Check ==="
66+
# Find the PID of the server running on the port
67+
PID=$(lsof -t -i:$PORT)
68+
if [ -n "$PID" ]; then
69+
echo "Memory Consumption of Server (PID $PID):"
70+
ps -p $PID -o %cpu,%mem,rss | awk 'NR==1{print $0; next} {print $1"% ", $2"% ", $3/1024 " MB"}'
71+
else
72+
echo "Could not find PID for port $PORT to check memory."
73+
fi
74+
75+
# Check disk space used by the cold storage (assuming it's in the current directory or a specific data dir)
76+
# Adjust the path './' if your AppState base_dir is located elsewhere
77+
echo "Disk Space Used by storage directories:"
78+
du -sh ./*/ 2>/dev/null | grep -E "auth|test|default|$TABLE" || echo "No data directories found in current path."
79+
80+
echo "=== Test Complete ==="

0 commit comments

Comments
 (0)