@@ -12,7 +12,7 @@ use super::{
1212} ;
1313use async_trait:: async_trait;
1414use ceramic_core:: { EventId , Network , NodeId , SerializeExt } ;
15- use ceramic_pipeline:: { ConclusionData , ConclusionEvent , ConclusionInit , ConclusionTime } ;
15+ use ceramic_pipeline:: { concluder :: TimeProof , ConclusionData , ConclusionEvent , ConclusionInit , ConclusionTime } ;
1616use ceramic_sql:: sqlite:: SqlitePool ;
1717use cid:: Cid ;
1818use futures:: stream:: BoxStream ;
@@ -21,8 +21,8 @@ use itertools::Itertools;
2121use recon:: ReconItem ;
2222use tracing:: { trace, warn} ;
2323
24- use crate :: event :: validator :: ChainInclusionProvider ;
25- use crate :: store :: { EventAccess , EventInsertable , EventRowDelivered } ;
24+ use crate :: store :: { ChainProof , EventAccess , EventInsertable , EventRowDelivered } ;
25+ use crate :: { blockchain :: tx_hash_try_from_cid , event :: validator :: ChainInclusionProvider } ;
2626use crate :: { Error , Result } ;
2727
2828/// How many events to select at once to see if they've become deliverable when we have downtime
@@ -269,6 +269,7 @@ impl EventService {
269269 valid,
270270 unvalidated,
271271 invalid,
272+ proofs,
272273 } = self
273274 . event_validator
274275 . validate_events ( validation_requirement, to_validate)
@@ -279,6 +280,7 @@ impl EventService {
279280 valid,
280281 unvalidated,
281282 invalid : invalid_events,
283+ proofs,
282284 } )
283285 }
284286
@@ -293,6 +295,7 @@ impl EventService {
293295 valid,
294296 unvalidated,
295297 mut invalid,
298+ proofs,
296299 } = self . validate_events ( items, validation_req. as_ref ( ) ) . await ?;
297300
298301 let to_insert: Vec < EventInsertable > = valid
@@ -307,6 +310,14 @@ impl EventService {
307310 self . track_pending ( unvalidated) ;
308311 }
309312
313+ // Someday, we may want to have the validation/proof inclusion logic have knowledge of the database and persist/read
314+ // from it directly, rather than only keeping proofs in memory + RPC calls. But for now, it's simpler to persist everything once here
315+ // and then the pipeline is able to read from this table and use the timestamps for conclusion events etc.
316+ let proofs = proofs. into_iter ( ) . map ( |p| p. into ( ) ) . collect :: < Vec < _ > > ( ) ;
317+ self . event_access
318+ . persist_chain_inclusion_proofs ( & proofs)
319+ . await ?;
320+
310321 let ( new, existed) = self
311322 . persist_events ( to_insert, deliverable_req, & mut invalid)
312323 . await ?;
@@ -391,16 +402,30 @@ impl EventService {
391402
392403 match event {
393404 ceramic_event:: unvalidated:: Event :: Time ( time_event) => {
405+ let proof = match self . discover_chain_proof ( & time_event) . await {
406+ Ok ( proof) => Some ( proof) ,
407+ Err ( error) => {
408+ tracing:: warn!(
409+ ?event_cid,
410+ ?error,
411+ "Failed to discover chain proof for time event"
412+ ) ;
413+ None
414+ }
415+ } ;
416+
394417 Ok ( ConclusionEvent :: Time ( ConclusionTime {
395418 event_cid,
396419 init,
397420 previous : vec ! [ * time_event. prev( ) ] ,
398421 order : delivered as u64 ,
399- before : todo ! ( ) ,
400- chain_id : todo ! ( ) ,
401- tx_hash : todo ! ( ) ,
402- tx_type : todo ! ( ) ,
403- root : todo ! ( ) ,
422+ time_proof : proof. map ( |p| TimeProof {
423+ before : p
424+ . timestamp
425+ . try_into ( )
426+ . expect ( "conclusion timestamp overflow" ) ,
427+ chain_id : p. chain_id ,
428+ } ) ,
404429 } ) )
405430 }
406431 ceramic_event:: unvalidated:: Event :: Signed ( signed_event) => {
@@ -523,6 +548,40 @@ impl EventService {
523548 Ok ( ( ) )
524549 }
525550 }
551+
552+ /// This is a helper function for migrations to get the chain proof for a given event from the database,
553+ /// or to validate and store it if it doesn't exist.
554+ async fn discover_chain_proof (
555+ & self ,
556+ event : & ceramic_event:: unvalidated:: TimeEvent ,
557+ ) -> std:: result:: Result < ChainProof , crate :: eth_rpc:: Error > {
558+ let tx_hash = event. proof ( ) . tx_hash ( ) ;
559+ let tx_hash = tx_hash_try_from_cid ( tx_hash) . unwrap ( ) . to_string ( ) ;
560+ let proof = self
561+ . event_access
562+ . get_chain_proof ( event. proof ( ) . chain_id ( ) , & tx_hash)
563+ . await
564+ . map_err ( |e| crate :: eth_rpc:: Error :: Application ( e. into ( ) ) ) ?;
565+
566+ if let Some ( proof) = proof {
567+ return Ok ( proof) ;
568+ }
569+
570+ // try using the RPC provider and store the proof afterward
571+ let proof = self
572+ . event_validator
573+ . time_event_verifier ( )
574+ . validate_chain_inclusion ( event)
575+ . await ?;
576+
577+ let proof = ChainProof :: from ( proof) ;
578+ self . event_access
579+ . persist_chain_inclusion_proofs ( & [ proof. clone ( ) ] )
580+ . await
581+ . map_err ( |e| crate :: eth_rpc:: Error :: Application ( e. into ( ) ) ) ?;
582+
583+ Ok ( proof)
584+ }
526585}
527586
528587// Small wrapper container around the data field to hold other mutable metadata for the
@@ -579,6 +638,12 @@ pub enum ValidationError {
579638 key : EventId ,
580639 reason : String ,
581640 } ,
641+ /// 'Soft error' -> should not kill recon conversation but should not be persisted
642+ /// A time event could not be validated because no RPC provider was available
643+ SoftError {
644+ key : EventId ,
645+ reason : String ,
646+ } ,
582647}
583648
584649#[ derive( Debug , PartialEq , Eq , Default ) ]
0 commit comments