1+ use tokio:: sync:: Semaphore ;
2+ use crate :: Arc ;
13use actix_web:: { web, HttpRequest , HttpResponse , Responder } ;
24use futures:: future:: BoxFuture ;
35use serde:: { Deserialize , Serialize } ;
@@ -285,59 +287,35 @@ pub async fn get_value(
285287 HttpResponse :: Ok ( ) . json ( latest)
286288}
287289
288- /// PUT handler: Inserts or updates a key–value pair with ownership verification.
289- /// For external requests, a valid JWT token in the Authorization header is required.
290+ // external PUT handler
291+ # [ allow ( clippy :: too_many_arguments ) ]
290292pub async fn put_value (
291293 req : HttpRequest ,
292294 path : web:: Path < ( String , String ) > ,
293295 state : web:: Data < AppState > ,
294296 cluster_data : web:: Data < ClusterData > ,
295297 current_addr : web:: Data < String > ,
296298 sub_manager : web:: Data < SubscriptionManager > ,
299+ client : web:: Data < reqwest:: Client > ,
300+ sem : web:: Data < Arc < Semaphore > > ,
297301 body : web:: Json < HashMap < String , Value > > ,
298302) -> impl Responder {
299303 let ( table_name, key_val) = path. into_inner ( ) ;
300304
301- // Internal replication requests bypass authentication.
302- if req. headers ( ) . contains_key ( "X-Internal-Request" ) {
303- if env:: var ( "CLUSTER_SECRET" ) . unwrap_or_default ( ) . as_str ( ) == req. headers ( ) . get ( "SECRET" ) . unwrap ( ) . to_str ( ) . unwrap ( ) {
304- let new_value = {
305- let mut store = state. store . write ( ) . await ;
306- let table = store. entry ( table_name. clone ( ) ) . or_insert_with ( HashMap :: new) ;
307- if let Some ( existing) = table. get_mut ( & key_val) {
308- existing. update ( body. 0 . clone ( ) ) ;
309- existing. clone ( )
310- } else {
311- let user_header = req. headers ( ) . get ( "User" ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
312-
313- // For internal requests, set owner as "internal"
314- let v = VersionedValue :: new ( body. 0 . clone ( ) , String :: from ( user_header) ) ;
315- table. insert ( key_val. clone ( ) , v. clone ( ) ) ;
316- v
317- }
318- } ;
319- sub_manager
320- . notify ( & table_name, & key_val, KeyEvent :: Updated ( new_value. clone ( ) ) )
321- . await ;
322- return HttpResponse :: Created ( ) . json ( new_value) ;
323- } else {
324- return HttpResponse :: Unauthorized ( ) . json ( json ! ( { } ) )
325- }
326- }
327-
328- // External requests require a valid JWT token.
305+ // 1️⃣ Authenticate external user via JWT
329306 let user = match extract_user_from_token ( & req) {
330307 Ok ( u) => u,
331- Err ( err_resp ) => return err_resp ,
308+ Err ( resp ) => return resp ,
332309 } ;
333310
311+ // 2️⃣ Apply local update with ownership check
334312 let new_value = {
335- let mut store = state. store . write ( ) . await ;
336- let table = store. entry ( table_name. clone ( ) ) . or_insert_with ( HashMap :: new) ;
313+ let mut db = state. store . write ( ) . await ;
314+ let table = db. entry ( table_name. clone ( ) ) . or_default ( ) ;
315+
337316 if let Some ( existing) = table. get_mut ( & key_val) {
338- // Only allow the owner to update this record.
339317 if existing. owner != user {
340- return HttpResponse :: Unauthorized ( ) . json ( json ! ( {
318+ return HttpResponse :: Unauthorized ( ) . json ( serde_json :: json!( {
341319 "error" : "Not authorized to update this record"
342320 } ) ) ;
343321 }
@@ -350,50 +328,91 @@ pub async fn put_value(
350328 }
351329 } ;
352330
331+ // 3️⃣ Notify any SSE subscriptions
353332 sub_manager
354333 . notify ( & table_name, & key_val, KeyEvent :: Updated ( new_value. clone ( ) ) )
355334 . await ;
356335
357- // Replicate to other nodes.
358- let cluster_guard = cluster_data. nodes . read ( ) . await ;
359- let active_nodes = get_active_nodes ( & * cluster_guard) ;
360- drop ( cluster_guard) ;
361-
362- let replication_factor = active_nodes. len ( ) ;
363- let targets = get_replication_nodes ( & key_val, & active_nodes, replication_factor) ;
336+ // 4️⃣ Replicate *full* VersionedValue to other nodes, but cap concurrency
337+ let cluster = cluster_data. nodes . read ( ) . await ;
338+ let active = get_active_nodes ( & * cluster) ;
339+ drop ( cluster) ;
364340
365- let client = reqwest :: Client :: new ( ) ;
366- let mut replication_futures = Vec :: new ( ) ;
341+ let targets = get_replication_nodes ( & key_val , & active , active . len ( ) ) ;
342+ let secret = env :: var ( "CLUSTER_SECRET" ) . unwrap_or_default ( ) ;
367343
368344 for target in targets {
369- if target != * current_addr. get_ref ( ) {
370- let user2 = user. clone ( ) ;
371- let url = format ! ( "http://{}/{}/key/{}" , target, table_name, key_val) ;
372- let client_clone = client. clone ( ) ;
373- let value_clone = new_value. clone ( ) ;
374- let fut = async move {
375- let result = client_clone
376- . put ( & url)
377- . header ( "X-Internal-Request" , "true" )
378- . header ( "User" , user2. clone ( ) )
379- . header ( "SECRET" , env:: var ( "CLUSTER_SECRET" ) . unwrap_or_default ( ) )
380- . json ( & value_clone. value ) // HERE'S THE FIX: Send only the inner value data
381- . timeout ( std:: time:: Duration :: from_secs ( 3 ) )
382- . send ( )
383- . await ;
384- if let Err ( e) = result {
385- println ! ( "Replication error to {}: {}" , target, e) ;
386- }
387- } ;
388- replication_futures. push ( fut) ;
345+ if target == * current_addr. get_ref ( ) {
346+ continue ;
389347 }
348+ // acquire a permit (will await if > 20 in flight)
349+ let permit = <Arc < Semaphore > as Clone >:: clone ( & sem) . acquire_owned ( ) . await . unwrap ( ) ;
350+ let cli = client. clone ( ) ;
351+ let payload = new_value. clone ( ) ;
352+ let url = format ! (
353+ "http://{}/internal/{}/key/{}" ,
354+ target, table_name, key_val
355+ ) ;
356+
357+ let secret_clone = secret. clone ( ) ;
358+
359+ tokio:: spawn ( async move {
360+ let _permit = permit; // held until this future ends
361+ if let Err ( e) = cli
362+ . put ( & url)
363+ . header ( "X-Internal-Request" , "true" )
364+ . header ( "SECRET" , & secret_clone)
365+ . json ( & payload)
366+ . send ( )
367+ . await
368+ {
369+ eprintln ! ( "replication to {} failed: {}" , target, e) ;
370+ }
371+ } ) ;
390372 }
391- tokio:: spawn ( async move {
392- futures_util:: future:: join_all ( replication_futures) . await ;
393- } ) ;
373+
394374 HttpResponse :: Created ( ) . json ( new_value)
395375}
396376
377+ // internal PUT handler (must be mounted at: /internal/{table}/key/{key})
378+ pub async fn put_value_internal (
379+ req : HttpRequest ,
380+ path : web:: Path < ( String , String ) > ,
381+ state : web:: Data < AppState > ,
382+ sub_manager : web:: Data < SubscriptionManager > ,
383+ body : web:: Json < VersionedValue > ,
384+ ) -> impl Responder {
385+ let ( table_name, key_val) = path. into_inner ( ) ;
386+
387+ // 1️⃣ Cluster‐secret check
388+ let cluster_secret = env:: var ( "CLUSTER_SECRET" ) . unwrap_or_default ( ) ;
389+ match req. headers ( ) . get ( "SECRET" ) {
390+ Some ( h) if h. to_str ( ) . unwrap_or ( "" ) == cluster_secret => { }
391+ _ => return HttpResponse :: Unauthorized ( ) . finish ( ) ,
392+ }
393+
394+ // 2️⃣ Merge the incoming VersionedValue by version number
395+ let incoming = body. into_inner ( ) ;
396+ let mut db = state. store . write ( ) . await ;
397+ let table = db. entry ( table_name. clone ( ) ) . or_default ( ) ;
398+
399+ let cached = table
400+ . entry ( key_val. clone ( ) )
401+ . and_modify ( |existing| {
402+ if incoming. version > existing. version {
403+ * existing = incoming. clone ( ) ;
404+ }
405+ } )
406+ . or_insert_with ( || incoming. clone ( ) )
407+ . clone ( ) ;
408+
409+ // 3️⃣ Notify subscribers
410+ sub_manager
411+ . notify ( & table_name, & key_val, KeyEvent :: Updated ( cached. clone ( ) ) )
412+ . await ;
413+
414+ HttpResponse :: Created ( ) . json ( cached)
415+ }
397416
398417/// DELETE handler: Removes a key with an ownership check.
399418pub async fn delete_value (
0 commit comments