@@ -21,8 +21,8 @@ use itertools::Itertools;
2121use recon:: ReconItem ;
2222use tracing:: { trace, warn} ;
2323
24- use crate :: event :: validator :: ChainInclusionProvider ;
25- use crate :: store :: { EventAccess , EventInsertable , EventRowDelivered } ;
24+ use crate :: store :: { ChainProof , EventAccess , EventInsertable , EventRowDelivered } ;
25+ use crate :: { blockchain :: tx_hash_try_from_cid , event :: validator :: ChainInclusionProvider } ;
2626use crate :: { Error , Result } ;
2727
2828/// How many events to select at once to see if they've become deliverable when we have downtime
@@ -269,6 +269,7 @@ impl EventService {
269269 valid,
270270 unvalidated,
271271 invalid,
272+ proofs,
272273 } = self
273274 . event_validator
274275 . validate_events ( validation_requirement, to_validate)
@@ -279,6 +280,7 @@ impl EventService {
279280 valid,
280281 unvalidated,
281282 invalid : invalid_events,
283+ proofs,
282284 } )
283285 }
284286
@@ -293,6 +295,7 @@ impl EventService {
293295 valid,
294296 unvalidated,
295297 mut invalid,
298+ proofs,
296299 } = self . validate_events ( items, validation_req. as_ref ( ) ) . await ?;
297300
298301 let to_insert: Vec < EventInsertable > = valid
@@ -307,6 +310,14 @@ impl EventService {
307310 self . track_pending ( unvalidated) ;
308311 }
309312
313+ // Someday, we may want to have the validation/proof inclusion logic have knowledge of the database and persist/read
314+ // from it directly, rather than only keeping proofs in memory + RPC calls. But for now, it's simpler to persist everything once here
315+ // and then the pipeline is able to read from this table and use the timestamps for conclusion events etc.
316+ let proofs = proofs. into_iter ( ) . map ( |p| p. into ( ) ) . collect :: < Vec < _ > > ( ) ;
317+ self . event_access
318+ . persist_chain_inclusion_proofs ( & proofs)
319+ . await ?;
320+
310321 let ( new, existed) = self
311322 . persist_events ( to_insert, deliverable_req, & mut invalid)
312323 . await ?;
@@ -391,16 +402,48 @@ impl EventService {
391402
392403 match event {
393404 ceramic_event:: unvalidated:: Event :: Time ( time_event) => {
405+ let proof = match self . discover_chain_proof ( & time_event) . await {
406+ Ok ( proof) => {
407+ if proof. is_none ( ) {
408+ return Err ( Error :: new_app ( anyhow:: anyhow!(
409+ "Missing chain proof for time event: {}" ,
410+ event_cid
411+ ) ) ) ;
412+ }
413+ proof
414+ }
415+ Err ( e) => {
416+ return Err ( Error :: new_app ( anyhow:: anyhow!(
417+ "Failed to discover chain proof for time event: {}. Error: {}" ,
418+ event_cid,
419+ e
420+ ) ) ) ;
421+ }
422+ } ;
423+
394424 Ok ( ConclusionEvent :: Time ( ConclusionTime {
395425 event_cid,
396426 init,
397427 previous : vec ! [ * time_event. prev( ) ] ,
398428 order : delivered as u64 ,
399- before : todo ! ( ) ,
400- chain_id : todo ! ( ) ,
401- tx_hash : todo ! ( ) ,
402- tx_type : todo ! ( ) ,
403- root : todo ! ( ) ,
429+ before : proof
430+ . as_ref ( )
431+ . and_then ( |p| {
432+ p. timestamp
433+ . map ( |ts| ts. try_into ( ) . expect ( "conclusion timestamp overflow" ) )
434+ } )
435+ . unwrap_or_default ( ) ,
436+ chain_id : time_event. proof ( ) . chain_id ( ) . to_string ( ) ,
437+ tx_hash : time_event. proof ( ) . tx_hash ( ) . to_string ( ) ,
438+ tx_type : time_event. proof ( ) . tx_type ( ) . to_string ( ) ,
439+ root : time_event. proof ( ) . root ( ) ,
440+ tx_input : proof
441+ . as_ref ( )
442+ . map_or ( "" . to_string ( ) , |p| p. transaction_input . clone ( ) ) ,
443+ block_hash : proof
444+ . as_ref ( )
445+ . and_then ( |p| p. block_hash . clone ( ) )
446+ . unwrap_or_default ( ) ,
404447 } ) )
405448 }
406449 ceramic_event:: unvalidated:: Event :: Signed ( signed_event) => {
@@ -523,6 +566,42 @@ impl EventService {
523566 Ok ( ( ) )
524567 }
525568 }
569+
570+ /// This is a helper function for migrations to get the chain proof for a given event from the database,
571+ /// or to validate and store it if it doesn't exist.
572+ async fn discover_chain_proof (
573+ & self ,
574+ event : & ceramic_event:: unvalidated:: TimeEvent ,
575+ ) -> anyhow:: Result < Option < ChainProof > > {
576+ let tx_hash = event. proof ( ) . tx_hash ( ) ;
577+ let tx_hash = tx_hash_try_from_cid ( tx_hash) . unwrap ( ) . to_string ( ) ;
578+ let proof = self
579+ . event_access
580+ . get_chain_proof ( event. proof ( ) . chain_id ( ) , & tx_hash)
581+ . await ?;
582+
583+ if let Some ( proof) = proof {
584+ return Ok ( Some ( proof) ) ;
585+ }
586+
587+ // try using the RPC provider and store the proof afterward
588+ let proof = match self
589+ . event_validator
590+ . time_event_verifier ( )
591+ . validate_chain_inclusion ( event)
592+ . await
593+ {
594+ Ok ( proof) => proof,
595+ Err ( e) => anyhow:: bail!( "Failed to validate chain inclusion proof: {:?}" , e) ,
596+ } ;
597+
598+ let proof = ChainProof :: from ( proof) ;
599+ self . event_access
600+ . persist_chain_inclusion_proofs ( & [ proof. clone ( ) ] )
601+ . await ?;
602+
603+ Ok ( Some ( proof) )
604+ }
526605}
527606
528607// Small wrapper container around the data field to hold other mutable metadata for the
0 commit comments