@@ -31,6 +31,7 @@ pub enum ValidationResult {
3131 Unknown ,
3232 Invalidated ,
3333 IncompleteGroup ,
34+ FileNameResolutionFailed ,
3435}
3536
3637impl fmt:: Display for ValidationResult {
@@ -43,6 +44,7 @@ impl fmt::Display for ValidationResult {
4344 ValidationResult :: Unknown => write ! ( f, "unknown" ) ,
4445 ValidationResult :: Invalidated => write ! ( f, "invalidated" ) ,
4546 ValidationResult :: IncompleteGroup => write ! ( f, "incomplete_group" ) ,
47+ ValidationResult :: FileNameResolutionFailed => write ! ( f, "filename_resolution_failed" ) ,
4648 }
4749 }
4850}
@@ -338,6 +340,55 @@ impl<P: alloy::providers::Provider + Clone + 'static> SyntheticDataValidator<P>
338340
339341 Ok ( ( ) )
340342 }
343+ }
344+
345+ impl SyntheticDataValidator < WalletProvider > {
346+ pub async fn soft_invalidate_work ( & self , work_key : & str ) -> Result < ( ) , Error > {
347+ info ! ( "Soft invalidating work: {}" , work_key) ;
348+
349+ if self . disable_chain_invalidation {
350+ info ! ( "Chain invalidation is disabled, skipping work soft invalidation" ) ;
351+ return Ok ( ( ) ) ;
352+ }
353+
354+ // Special case for tests - skip actual blockchain interaction
355+ #[ cfg( test) ]
356+ {
357+ info ! ( "Test mode: skipping actual work soft invalidation" ) ;
358+ let _ = & self . prime_network ;
359+ Ok ( ( ) )
360+ }
361+
362+ #[ cfg( not( test) ) ]
363+ {
364+ let work_info = self
365+ . get_work_info_from_redis ( work_key)
366+ . await ?
367+ . ok_or_else ( || Error :: msg ( "Work info not found for soft invalidation" ) ) ?;
368+ let work_key_bytes = hex:: decode ( work_key)
369+ . map_err ( |e| Error :: msg ( format ! ( "Failed to decode hex work key: {}" , e) ) ) ?;
370+
371+ // Create 64-byte payload: work_key (32 bytes) + work_units (32 bytes)
372+ let mut data = Vec :: with_capacity ( 64 ) ;
373+ data. extend_from_slice ( & work_key_bytes) ;
374+
375+ // Convert work_units to 32-byte representation
376+ let work_units_bytes = work_info. work_units . to_be_bytes :: < 32 > ( ) ;
377+ data. extend_from_slice ( & work_units_bytes) ;
378+
379+ match self
380+ . prime_network
381+ . soft_invalidate_work ( self . pool_id , data)
382+ . await
383+ {
384+ Ok ( _) => Ok ( ( ) ) ,
385+ Err ( e) => {
386+ error ! ( "Failed to soft invalidate work {}: {}" , work_key, e) ;
387+ Err ( Error :: msg ( format ! ( "Failed to soft invalidate work: {}" , e) ) )
388+ }
389+ }
390+ }
391+ }
341392 /// Finds groups whose grace period has ended and cleans them up.
342393 async fn get_groups_past_grace_period ( & self ) -> Result < Vec < String > , Error > {
343394 if self . incomplete_group_grace_period_minutes == 0 {
@@ -567,6 +618,7 @@ impl<P: alloy::providers::Provider + Clone + 'static> SyntheticDataValidator<P>
567618 work_key : & str ,
568619 ) -> Result < String , ProcessWorkKeyError > {
569620 let redis_key = format ! ( "file_name:{}" , work_key) ;
621+ let attempts_key = format ! ( "file_name_attempts:{}" , work_key) ;
570622 let mut con = self
571623 . redis_store
572624 . client
@@ -582,6 +634,46 @@ impl<P: alloy::providers::Provider + Clone + 'static> SyntheticDataValidator<P>
582634 return Ok ( cached_file_name) ;
583635 }
584636
637+ // Increment attempts counter
638+ let attempts: i64 = con
639+ . incr ( & attempts_key, 1 )
640+ . await
641+ . map_err ( |e| ProcessWorkKeyError :: GenericError ( e. into ( ) ) ) ?;
642+
643+ // Set expiry on attempts counter (24 hours)
644+ let _: ( ) = con
645+ . expire ( & attempts_key, 24 * 60 * 60 )
646+ . await
647+ . map_err ( |e| ProcessWorkKeyError :: GenericError ( e. into ( ) ) ) ?;
648+
649+ const MAX_ATTEMPTS : i64 = 60 ;
650+ if attempts >= MAX_ATTEMPTS {
651+ // If we've tried too many times, soft invalidate the work and update its status
652+ if let Err ( e) = self . soft_invalidate_work ( work_key) . await {
653+ error ! (
654+ "Failed to soft invalidate work after max filename resolution attempts: {}" ,
655+ e
656+ ) ;
657+ }
658+ // Set the validation status to FileNameResolutionFailed to prevent future processing
659+ if let Err ( e) = self
660+ . update_work_validation_status (
661+ work_key,
662+ & ValidationResult :: FileNameResolutionFailed ,
663+ )
664+ . await
665+ {
666+ error ! (
667+ "Failed to update validation status after max attempts: {}" ,
668+ e
669+ ) ;
670+ }
671+ return Err ( ProcessWorkKeyError :: MaxAttemptsReached ( format ! (
672+ "Failed to resolve filename after {} attempts for work key: {}" ,
673+ MAX_ATTEMPTS , work_key
674+ ) ) ) ;
675+ }
676+
585677 let original_file_name = self
586678 . storage_provider
587679 . resolve_mapping_for_sha ( work_key)
@@ -611,17 +703,34 @@ impl<P: alloy::providers::Provider + Clone + 'static> SyntheticDataValidator<P>
611703 . await
612704 . map_err ( |e| ProcessWorkKeyError :: GenericError ( e. into ( ) ) ) ?;
613705
706+ // Reset attempts counter on success
707+ let _: ( ) = con
708+ . del ( & attempts_key)
709+ . await
710+ . map_err ( |e| ProcessWorkKeyError :: GenericError ( e. into ( ) ) ) ?;
711+
614712 Ok ( cleaned_file_name. to_string ( ) )
615713 }
616714
617715 async fn build_group_for_key ( & self , work_key : & str ) -> Result < String , Error > {
618- let file = self
619- . get_file_name_for_work_key ( work_key)
620- . await
621- . map_err ( |e| {
716+ let file = match self . get_file_name_for_work_key ( work_key) . await {
717+ Ok ( name) => name,
718+ Err ( ProcessWorkKeyError :: MaxAttemptsReached ( _) ) => {
719+ // Status is already set in get_file_name_for_work_key
720+ return Err ( Error :: msg ( format ! (
721+ "Failed to resolve filename after max attempts for work key: {}" ,
722+ work_key
723+ ) ) ) ;
724+ }
725+ Err ( e) => {
622726 error ! ( "Failed to get file name for work key: {}" , e) ;
623- Error :: msg ( format ! ( "Failed to get file name for work key: {}" , e) )
624- } ) ?;
727+ return Err ( Error :: msg ( format ! (
728+ "Failed to get file name for work key: {}" ,
729+ e
730+ ) ) ) ;
731+ }
732+ } ;
733+
625734 let group_info = GroupInformation :: from_str ( & file) ?;
626735 let group_key: String = format ! (
627736 "group:{}:{}:{}" ,
@@ -1370,53 +1479,6 @@ impl SyntheticDataValidator<WalletProvider> {
13701479 Ok ( ( ) )
13711480 }
13721481
1373- pub async fn soft_invalidate_work ( & self , work_key : & str ) -> Result < ( ) , Error > {
1374- info ! ( "Soft invalidating work: {}" , work_key) ;
1375-
1376- if self . disable_chain_invalidation {
1377- info ! ( "Chain invalidation is disabled, skipping work soft invalidation" ) ;
1378- return Ok ( ( ) ) ;
1379- }
1380-
1381- // Special case for tests - skip actual blockchain interaction
1382- #[ cfg( test) ]
1383- {
1384- info ! ( "Test mode: skipping actual work soft invalidation" ) ;
1385- let _ = & self . prime_network ;
1386- Ok ( ( ) )
1387- }
1388-
1389- #[ cfg( not( test) ) ]
1390- {
1391- let work_info = self
1392- . get_work_info_from_redis ( work_key)
1393- . await ?
1394- . ok_or_else ( || Error :: msg ( "Work info not found for soft invalidation" ) ) ?;
1395- let work_key_bytes = hex:: decode ( work_key)
1396- . map_err ( |e| Error :: msg ( format ! ( "Failed to decode hex work key: {}" , e) ) ) ?;
1397-
1398- // Create 64-byte payload: work_key (32 bytes) + work_units (32 bytes)
1399- let mut data = Vec :: with_capacity ( 64 ) ;
1400- data. extend_from_slice ( & work_key_bytes) ;
1401-
1402- // Convert work_units to 32-byte representation
1403- let work_units_bytes = work_info. work_units . to_be_bytes :: < 32 > ( ) ;
1404- data. extend_from_slice ( & work_units_bytes) ;
1405-
1406- match self
1407- . prime_network
1408- . soft_invalidate_work ( self . pool_id , data)
1409- . await
1410- {
1411- Ok ( _) => Ok ( ( ) ) ,
1412- Err ( e) => {
1413- error ! ( "Failed to soft invalidate work {}: {}" , work_key, e) ;
1414- Err ( Error :: msg ( format ! ( "Failed to soft invalidate work: {}" , e) ) )
1415- }
1416- }
1417- }
1418- }
1419-
14201482 pub async fn invalidate_work ( & self , work_key : & str ) -> Result < ( ) , Error > {
14211483 info ! ( "Invalidating work: {}" , work_key) ;
14221484
@@ -2529,4 +2591,60 @@ mod tests {
25292591
25302592 Ok ( ( ) )
25312593 }
2594+
2595+ #[ tokio:: test]
2596+ async fn test_filename_resolution_retries_and_soft_invalidation ( ) -> Result < ( ) , Error > {
2597+ let ( store, contracts) = setup_test_env ( ) ?;
2598+ let mock_storage = MockStorageProvider :: new ( ) ;
2599+ let storage_provider = Arc :: new ( mock_storage) ;
2600+
2601+ const WORK_KEY : & str = "1234567890123456789012345678901234567890123456789012345678901234" ;
2602+
2603+ let validator = SyntheticDataValidator :: new (
2604+ "0" . to_string ( ) ,
2605+ contracts. synthetic_data_validator . clone ( ) . unwrap ( ) ,
2606+ contracts. prime_network . clone ( ) ,
2607+ vec ! [ ToplocConfig {
2608+ server_url: "http://localhost:8080" . to_string( ) ,
2609+ ..Default :: default ( )
2610+ } ] ,
2611+ U256 :: from ( 0 ) ,
2612+ storage_provider,
2613+ store,
2614+ CancellationToken :: new ( ) ,
2615+ 10 ,
2616+ 60 ,
2617+ 1 ,
2618+ 10 ,
2619+ true ,
2620+ true , // disable_chain_invalidation for testing
2621+ 1 , // 1 minute grace period
2622+ None ,
2623+ ) ;
2624+
2625+ // Try 59 times to reach just before max attempts (60)
2626+ for _ in 0 ..59 {
2627+ let result = validator. get_file_name_for_work_key ( WORK_KEY ) . await ;
2628+ assert ! ( result. is_err( ) ) ;
2629+ assert ! ( matches!(
2630+ result. unwrap_err( ) ,
2631+ ProcessWorkKeyError :: FileNameResolutionError ( _)
2632+ ) ) ;
2633+ }
2634+
2635+ // 60th attempt should trigger soft invalidation and return MaxAttemptsReached
2636+ let result = validator. get_file_name_for_work_key ( WORK_KEY ) . await ;
2637+ assert ! ( matches!(
2638+ result. unwrap_err( ) ,
2639+ ProcessWorkKeyError :: MaxAttemptsReached ( _)
2640+ ) ) ;
2641+
2642+ // Verify work was marked as invalidated
2643+ let status = validator
2644+ . get_work_validation_status_from_redis ( WORK_KEY )
2645+ . await ?;
2646+ assert ! ( status. is_some( ) ) ;
2647+
2648+ Ok ( ( ) )
2649+ }
25322650}
0 commit comments