@@ -599,7 +599,7 @@ impl SyntheticDataValidator<WalletProvider> {
599599
600600 if is_rejected {
601601 // Add to rejections set with current timestamp
602- let timestamp = chrono:: Utc :: now ( ) . timestamp ( ) as f64 ;
602+ let timestamp = chrono:: Utc :: now ( ) . timestamp ( ) ;
603603 let _: ( ) = con
604604 . zadd ( rejection_set_key, work_key, timestamp)
605605 . await
@@ -1425,153 +1425,6 @@ impl SyntheticDataValidator<WalletProvider> {
14251425
14261426 Ok ( rejections)
14271427 }
1428-
1429- /// Idempotent migration of old rejection data to new optimized format.
1430- /// Safe to run multiple times - uses migration marker to avoid duplicate work.
1431- pub async fn migrate_rejection_data ( & self ) -> Result < ( ) , Error > {
1432- let migration_key = "rejection_migration_completed" ;
1433- let mut con = self
1434- . redis_store
1435- . client
1436- . get_multiplexed_async_connection ( )
1437- . await ?;
1438-
1439- // Check if migration was already completed
1440- let migration_completed: Option < String > = con
1441- . get ( migration_key)
1442- . await
1443- . map_err ( |e| Error :: msg ( format ! ( "Failed to check migration status: {}" , e) ) ) ?;
1444-
1445- if migration_completed. is_some ( ) {
1446- debug ! ( "Rejection data migration already completed, skipping" ) ;
1447- return Ok ( ( ) ) ;
1448- }
1449-
1450- info ! ( "Starting rejection data migration..." ) ;
1451- warn ! ( "Migration uses KEYS command - this is a one-time operation during startup" ) ;
1452-
1453- // Get all work validation status keys
1454- let keys: Vec < String > = con
1455- . keys ( "work_validation_status:*" )
1456- . await
1457- . map_err ( |e| Error :: msg ( format ! ( "Failed to get validation status keys: {}" , e) ) ) ?;
1458-
1459- if keys. is_empty ( ) {
1460- info ! ( "No validation status keys found, migration completed" ) ;
1461- // Still mark as completed
1462- let completion_timestamp = chrono:: Utc :: now ( ) . to_rfc3339 ( ) ;
1463- let _: ( ) = con
1464- . set ( migration_key, & completion_timestamp)
1465- . await
1466- . map_err ( |e| Error :: msg ( format ! ( "Failed to mark migration complete: {}" , e) ) ) ?;
1467- return Ok ( ( ) ) ;
1468- }
1469-
1470- info ! ( "Found {} validation status keys to check" , keys. len( ) ) ;
1471-
1472- // Process in batches to avoid overwhelming Redis
1473- let batch_size = 100 ;
1474- let mut migrated_count = 0 ;
1475-
1476- for batch in keys. chunks ( batch_size) {
1477- // Batch fetch validation data
1478- let validation_data: Vec < Option < String > > = con
1479- . mget ( batch)
1480- . await
1481- . map_err ( |e| Error :: msg ( format ! ( "Failed to batch get validation data: {}" , e) ) ) ?;
1482-
1483- // Process each key in this batch
1484- for ( i, key) in batch. iter ( ) . enumerate ( ) {
1485- if let Some ( Some ( data) ) = validation_data. get ( i) {
1486- if let Ok ( validation_info) = self . parse_validation_data ( data) {
1487- if validation_info. status == ValidationResult :: Reject {
1488- let work_key =
1489- key. strip_prefix ( "work_validation_status:" ) . unwrap_or ( key) ;
1490-
1491- // Check if already in rejection set (idempotency)
1492- let exists: bool = con
1493- . zscore ( "work_rejections" , work_key)
1494- . await
1495- . map ( |score : Option < f64 > | score. is_some ( ) )
1496- . unwrap_or ( false ) ;
1497-
1498- if !exists {
1499- // Add to rejection set with current timestamp
1500- let timestamp = chrono:: Utc :: now ( ) . timestamp ( ) as f64 ;
1501- let _: ( ) = con
1502- . zadd ( "work_rejections" , work_key, timestamp)
1503- . await
1504- . map_err ( |e| {
1505- Error :: msg ( format ! ( "Failed to migrate rejection: {}" , e) )
1506- } ) ?;
1507-
1508- // Create rejection detail if reason exists (unlikely for old data)
1509- if let Some ( reason) = & validation_info. reason {
1510- let rejection_detail = serde_json:: json!( {
1511- "reason" : reason,
1512- "timestamp" : timestamp
1513- } ) ;
1514- let rejection_data_key =
1515- format ! ( "work_rejection_data:{}" , work_key) ;
1516- let _: ( ) = con
1517- . set ( & rejection_data_key, rejection_detail. to_string ( ) )
1518- . await
1519- . map_err ( |e| {
1520- Error :: msg ( format ! (
1521- "Failed to set migrated rejection data: {}" ,
1522- e
1523- ) )
1524- } ) ?;
1525- }
1526-
1527- migrated_count += 1 ;
1528- }
1529- }
1530- }
1531- }
1532- }
1533-
1534- // Progress reporting and throttling
1535- if migrated_count > 0 && migrated_count % 100 == 0 {
1536- info ! ( "Migration progress: {} rejections migrated" , migrated_count) ;
1537- }
1538-
1539- // Small delay between batches to avoid overwhelming Redis
1540- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 5 ) ) . await ;
1541- }
1542-
1543- // Mark migration as completed with timestamp
1544- let completion_timestamp = chrono:: Utc :: now ( ) . to_rfc3339 ( ) ;
1545- let _: ( ) = con
1546- . set ( migration_key, & completion_timestamp)
1547- . await
1548- . map_err ( |e| Error :: msg ( format ! ( "Failed to mark migration complete: {}" , e) ) ) ?;
1549-
1550- info ! (
1551- "Rejection data migration completed: {} rejections migrated" ,
1552- migrated_count
1553- ) ;
1554- Ok ( ( ) )
1555- }
1556-
1557- fn parse_validation_data ( & self , data : & str ) -> Result < WorkValidationInfo , Error > {
1558- // Try to parse as WorkValidationInfo first (new format)
1559- if let Ok ( validation_info) = serde_json:: from_str :: < WorkValidationInfo > ( data) {
1560- Ok ( validation_info)
1561- } else {
1562- // Fall back to old format (just ValidationResult)
1563- match serde_json:: from_str :: < ValidationResult > ( data) {
1564- Ok ( status) => Ok ( WorkValidationInfo {
1565- status,
1566- reason : None ,
1567- } ) ,
1568- Err ( e) => Err ( Error :: msg ( format ! (
1569- "Failed to parse validation data: {}" ,
1570- e
1571- ) ) ) ,
1572- }
1573- }
1574- }
15751428}
15761429
15771430impl SyntheticDataValidator < WalletProvider > {
@@ -3106,95 +2959,4 @@ mod tests {
31062959
31072960 Ok ( ( ) )
31082961 }
3109-
3110- #[ tokio:: test]
3111- async fn test_migration_idempotency ( ) -> Result < ( ) , Error > {
3112- let ( store, contracts) = setup_test_env ( ) ?;
3113- let mock_storage = MockStorageProvider :: new ( ) ;
3114- let storage_provider = Arc :: new ( mock_storage) ;
3115- let validator = SyntheticDataValidator :: new (
3116- "0" . to_string ( ) ,
3117- contracts. synthetic_data_validator . unwrap ( ) ,
3118- contracts. prime_network ,
3119- vec ! [ ] ,
3120- U256 :: from ( 0 ) ,
3121- storage_provider,
3122- store,
3123- CancellationToken :: new ( ) ,
3124- 10 ,
3125- 60 ,
3126- 1 ,
3127- 10 ,
3128- false ,
3129- false ,
3130- 0 ,
3131- None ,
3132- ) ;
3133-
3134- // Create some old-format rejection data manually
3135- let mut con = validator
3136- . redis_store
3137- . client
3138- . get_multiplexed_async_connection ( )
3139- . await ?;
3140-
3141- // Old format: just ValidationResult enum
3142- let _: ( ) = con
3143- . set ( "work_validation_status:old_reject_1" , "\" Reject\" " )
3144- . await ?;
3145- let _: ( ) = con
3146- . set ( "work_validation_status:old_reject_2" , "\" Reject\" " )
3147- . await ?;
3148- let _: ( ) = con
3149- . set ( "work_validation_status:old_accept_1" , "\" Accept\" " )
3150- . await ?;
3151-
3152- // Run migration first time
3153- validator. migrate_rejection_data ( ) . await ?;
3154-
3155- // Check rejections were migrated
3156- let rejections_first = validator. get_all_rejections ( ) . await ?;
3157- assert_eq ! ( rejections_first. len( ) , 2 ) ;
3158-
3159- let rejection_keys: Vec < & str > = rejections_first
3160- . iter ( )
3161- . map ( |r| r. work_key . as_str ( ) )
3162- . collect ( ) ;
3163- assert ! ( rejection_keys. contains( & "old_reject_1" ) ) ;
3164- assert ! ( rejection_keys. contains( & "old_reject_2" ) ) ;
3165-
3166- // Run migration second time (should be idempotent)
3167- validator. migrate_rejection_data ( ) . await ?;
3168-
3169- // Check rejections are still the same (no duplicates)
3170- let rejections_second = validator. get_all_rejections ( ) . await ?;
3171- assert_eq ! ( rejections_second. len( ) , 2 ) ;
3172-
3173- // Add some new rejection data in new format
3174- validator
3175- . update_work_validation_info (
3176- "new_reject_1" ,
3177- & WorkValidationInfo {
3178- status : ValidationResult :: Reject ,
3179- reason : Some ( "New rejection" . to_string ( ) ) ,
3180- } ,
3181- )
3182- . await ?;
3183-
3184- // Run migration third time (should skip already-migrated data)
3185- validator. migrate_rejection_data ( ) . await ?;
3186-
3187- // Check we have all rejections (old + new)
3188- let rejections_final = validator. get_all_rejections ( ) . await ?;
3189- assert_eq ! ( rejections_final. len( ) , 3 ) ;
3190-
3191- // Verify the new rejection has a reason
3192- let new_rejection = rejections_final
3193- . iter ( )
3194- . find ( |r| r. work_key == "new_reject_1" )
3195- . expect ( "New rejection should exist" ) ;
3196- assert_eq ! ( new_rejection. reason, Some ( "New rejection" . to_string( ) ) ) ;
3197-
3198- Ok ( ( ) )
3199- }
32002962}
0 commit comments