From b411f2242c5a6200a313a3df81c9154037ae875b Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 19 Mar 2025 14:55:56 -0600 Subject: [PATCH 01/19] wip: adds scaffold for resolver actor --- pipeline/src/aggregator/mod.rs | 15 +- pipeline/src/lib.rs | 1 + pipeline/src/metrics.rs | 19 +- pipeline/src/resolver/metrics.rs | 24 ++ pipeline/src/resolver/mock.rs | 80 +++++ pipeline/src/resolver/mod.rs | 595 +++++++++++++++++++++++++++++++ pipeline/src/schemas.rs | 149 ++++++++ 7 files changed, 870 insertions(+), 13 deletions(-) create mode 100644 pipeline/src/resolver/metrics.rs create mode 100644 pipeline/src/resolver/mock.rs create mode 100644 pipeline/src/resolver/mod.rs diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index efd01ffc3..ddec2fd5f 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -870,6 +870,8 @@ actor_envelope! { AggregatorRecorder, SubscribeSince => SubscribeSinceMsg, NewConclusionEvents => NewConclusionEventsMsg, + // TODO: Remove this message and use the analogous message on the Resolver. + // This way the canonical stream state is provided via the API StreamState => StreamStateMsg, } @@ -903,13 +905,6 @@ impl Handler for Aggregator { // Execute query to get initial (historical) results let query_stream = df.execute_stream().await?; - // Create subscription stream - let subscription_stream = RecordBatchStreamAdapter::new( - schemas::event_states(), - tokio_stream::wrappers::BroadcastStream::new(subscription) - .map_err(|err| exec_datafusion_err!("{err}")), - ); - // Merge query results with subscription updates rows_since(RowsSinceInput { session_context: &ctx, @@ -918,7 +913,11 @@ impl Handler for Aggregator { projection: message.projection, filters: message.filters, limit: message.limit, - subscription: Box::pin(subscription_stream), + subscription: Box::pin(RecordBatchStreamAdapter::new( + schemas::event_states(), + tokio_stream::wrappers::BroadcastStream::new(subscription) + .map_err(|err| exec_datafusion_err!("{err}")), + )), since: query_stream, }) } diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index 2d792b29a..8f62e0d04 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -14,6 +14,7 @@ mod cache_table; pub mod cid_part; pub mod cid_string; pub mod concluder; +pub mod resolver; mod config; pub mod dimension_extract; mod metrics; diff --git a/pipeline/src/metrics.rs b/pipeline/src/metrics.rs index fb8d14c21..09f10d8e0 100644 --- a/pipeline/src/metrics.rs +++ b/pipeline/src/metrics.rs @@ -11,15 +11,24 @@ use prometheus_client::{ pub struct Metrics { pub(crate) message_count: Family, + pub(crate) concluder_poll_new_events_loop_count: Counter, + pub(crate) aggregator_new_conclusion_events_count: Counter, - pub(crate) concluder_poll_new_events_loop_count: Counter, + pub(crate) resolver_new_event_states_count: Counter, } impl Metrics { /// Register and construct Metrics pub fn register(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("pipeline"); + register!( + concluder_poll_new_events_loop_count, + "Number of times the loop to poll new conclusion events has run", + Counter::default(), + sub_registry + ); + register!( message_count, "Number of messages delivered to actors", @@ -32,18 +41,18 @@ impl Metrics { Counter::default(), sub_registry ); - register!( - concluder_poll_new_events_loop_count, - "Number of times the loop to poll new conclusion events has run", + resolver_new_event_states_count, + "Number of new event states delivered to the resolver", Counter::default(), sub_registry ); Self { message_count, - aggregator_new_conclusion_events_count, concluder_poll_new_events_loop_count, + aggregator_new_conclusion_events_count, + resolver_new_event_states_count, } } } diff --git a/pipeline/src/resolver/metrics.rs b/pipeline/src/resolver/metrics.rs new file mode 100644 index 000000000..f6c6798da --- /dev/null +++ b/pipeline/src/resolver/metrics.rs @@ -0,0 +1,24 @@ +use ceramic_actor::MessageEvent; +use ceramic_metrics::Recorder; + +use crate::metrics::{MessageLabels, Metrics}; + +use super::{NewEventStatesMsg, ResolverRecorder, StreamStateMsg}; + +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + self.resolver_new_event_states_count + .inc_by(event.message.events.num_rows() as u64); + } +} +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + } +} +impl ResolverRecorder for Metrics {} diff --git a/pipeline/src/resolver/mock.rs b/pipeline/src/resolver/mock.rs new file mode 100644 index 000000000..ae1dbf850 --- /dev/null +++ b/pipeline/src/resolver/mock.rs @@ -0,0 +1,80 @@ +//! Provides a mock implmentation of the aggregator actor. +use async_trait::async_trait; +use ceramic_actor::{Actor, Handler, Message}; +use mockall::mock; +use prometheus_client::registry::Registry; + +use crate::metrics::Metrics; + +use super::{ + NewEventStatesMsg, Resolver, ResolverActor, ResolverEnvelope, ResolverHandle, StreamStateMsg, + SubscribeSinceMsg, +}; + +mock! { + // mockall does not support multiple methods on the struct with the same name. + // This arises when implementing multiple traits that have methods with the same name as is + // the case with the [`ceramic_actor::Handler`] trait. + // + // We add a layer of indirection to get around this limitation. + pub Resolver { + #[allow(missing_docs)] + pub fn handle_subscribe_since( + &mut self, + message: SubscribeSinceMsg, + ) -> ::Result; + #[allow(missing_docs)] + pub fn handle_new_event_states( + &mut self, + message: NewEventStatesMsg, + ) -> ::Result; + #[allow(missing_docs)] + pub fn handle_stream_state( + &mut self, + message: StreamStateMsg, + ) -> ::Result; + } +} + +#[async_trait] +impl Handler for MockResolver { + async fn handle( + &mut self, + message: SubscribeSinceMsg, + ) -> ::Result { + self.handle_subscribe_since(message) + } +} + +#[async_trait] +impl Handler for MockResolver { + async fn handle( + &mut self, + message: NewEventStatesMsg, + ) -> ::Result { + self.handle_new_event_states(message) + } +} + +#[async_trait] +impl Handler for MockResolver { + async fn handle(&mut self, message: StreamStateMsg) -> ::Result { + self.handle_stream_state(message) + } +} + +impl Actor for MockResolver { + type Envelope = ResolverEnvelope; +} +impl ResolverActor for MockResolver {} + +impl MockResolver { + /// Spawn a mock aggregator actor. + pub fn spawn(mock_actor: MockResolver) -> ResolverHandle { + let metrics = Metrics::register(&mut Registry::default()); + let (handle, _task_handle) = + Resolver::spawn(1_000, mock_actor, metrics, std::future::pending()); + handle + } +} diff --git a/pipeline/src/resolver/mod.rs b/pipeline/src/resolver/mod.rs new file mode 100644 index 000000000..eef62e27b --- /dev/null +++ b/pipeline/src/resolver/mod.rs @@ -0,0 +1,595 @@ +//! Conflict resolution functions for Ceramic Model Instance Document streams. +//! +//! Subscribes to aggregated events and produces the tips and canonical tip for each stream. +//! + +mod metrics; +#[cfg(any(test, feature = "mock"))] +pub mod mock; + +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Context; +use arrow::{ + array::{Array as _, ArrayAccessor as _, BinaryArray, RecordBatch}, + compute::concat_batches, + datatypes::Int32Type, +}; +use arrow_schema::{DataType, SchemaRef}; +use async_trait::async_trait; +use ceramic_actor::{actor_envelope, Actor, Handler, Message}; +use ceramic_core::StreamId; +use cid::Cid; +use datafusion::{ + common::{ + cast::{ + as_binary_array, as_dictionary_array, as_map_array, as_string_array, as_uint64_array, + }, + exec_datafusion_err, + }, + datasource::{ + file_format::parquet::ParquetFormat, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + }, + execution::SendableRecordBatchStream, + functions_aggregate::expr_fn::last_value, + logical_expr::{col, lit, ExprFunctionExt as _}, + physical_plan::stream::RecordBatchStreamAdapter, + prelude::{wildcard, SessionContext}, +}; +use futures::TryStreamExt as _; +use shutdown::{Shutdown, ShutdownSignal}; +use tokio::{select, sync::broadcast, task::JoinHandle}; +use tracing::{debug, error, instrument}; + +use crate::{ + aggregator::AggregatorHandle, + cache_table::CacheTable, + metrics::Metrics, + schemas, + since::{rows_since, FeedTable, FeedTableSource}, + PipelineContext, Result, SessionContextRef, +}; +// Use the SubscribeSinceMsg so its clear its a message for this actor +pub use crate::since::SubscribeSinceMsg; + +const STREAM_TIPS_TABLE: &str = "ceramic.v0.stream_tips"; +const STREAM_TIPS_MEM_TABLE: &str = "ceramic._internal.stream_tips_mem"; +const STREAM_TIPS_PERSISTENT_TABLE: &str = "ceramic._internal.stream_tips_persistent"; +// Path within the object store where the stream tips table is stored +// This path should be updated when the underlying storage structure changes. (i.e. the parition +// columns change). The revision is a physical versioning number and not directly associated with +// the logical schema version of the table. There are many cases where the physical revision may +// need to change while the logical version remains the same. +const STREAM_TIPS_TABLE_OBJECT_STORE_PATH: &str = "ceramic/rev0/stream_tips/"; + +const STREAM_STATES_TABLE: &str = "ceramic.v0.stream_states"; +const STREAM_STATES_FEED_TABLE: &str = "ceramic.v0.stream_states_feed"; +const STREAM_STATES_MEM_TABLE: &str = "ceramic._internal.stream_states_mem"; +const STREAM_STATES_PERSISTENT_TABLE: &str = "ceramic._internal.stream_states_persistent"; +// Path within the object store where the stream states table is stored +// This path should be updated when the underlying storage structure changes. (i.e. the parition +// columns change). The revision is a physical versioning number and not directly associated with +// the logical schema version of the table. There are many cases where the physical revision may +// need to change while the logical version remains the same. +const STREAM_STATES_TABLE_OBJECT_STORE_PATH: &str = "ceramic/rev0/stream_states/"; + +// Maximum number of rows to cache in memory before writing to object store. +const DEFAULT_MAX_CACHED_ROWS: usize = 10_000; + +/// Resolver is responsible for computing the tips of a stream and resolving the set of tips to a +/// canonical tip. +/// The resolver only operates on model and model instance stream types. +#[derive(Actor)] +pub struct Resolver { + ctx: SessionContextRef, + broadcast_tx: broadcast::Sender, + max_cached_rows: usize, + order: u64, +} + +impl std::fmt::Debug for Resolver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Resolver") + .field("ctx", &"Pipeline(SessionContext)") + .field("broadcast_tx", &self.broadcast_tx) + .finish() + } +} + +impl Resolver { + /// Create a new resolver actor and spawn its tasks. + pub async fn spawn_new( + size: usize, + ctx: &PipelineContext, + max_cached_rows: Option, + aggregator: AggregatorHandle, + metrics: Metrics, + shutdown: Shutdown, + ) -> Result<(ResolverHandle, Vec>)> { + // Register stream_tips tables + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(".parquet") + .with_table_partition_cols(vec![("stream_cid_partition".to_owned(), DataType::Int32)]) + .with_file_sort_order(vec![vec![col("stream_cid").sort(true, true)]]); + + // Set the path within the bucket for the event_states table + let mut stream_tips_url = ctx.object_store_url.clone(); + stream_tips_url.set_path(STREAM_TIPS_TABLE_OBJECT_STORE_PATH); + // Register event_states_persistent as a listing table + ctx.session().register_table( + STREAM_TIPS_PERSISTENT_TABLE, + Arc::new(ListingTable::try_new( + ListingTableConfig::new(ListingTableUrl::parse(stream_tips_url)?) + .with_listing_options(listing_options) + // Use the non partitioned schema as the parquet files themselves do not + // contain the partition columns. + .with_schema(schemas::event_states()), + )?), + )?; + + ctx.session().register_table( + STREAM_TIPS_MEM_TABLE, + Arc::new(CacheTable::try_new( + schemas::event_states_partitioned(), + vec![vec![RecordBatch::new_empty( + schemas::event_states_partitioned(), + )]], + )?), + )?; + + ctx.session().register_table( + STREAM_TIPS_TABLE, + ctx.session() + .table(STREAM_TIPS_MEM_TABLE) + .await? + .union(ctx.session().table(STREAM_TIPS_PERSISTENT_TABLE).await?)? + .into_view(), + )?; + + // Register stream_states tables + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(".parquet") + .with_table_partition_cols(vec![("stream_cid_partition".to_owned(), DataType::Int32)]) + .with_file_sort_order(vec![vec![col("stream_cid").sort(true, true)]]); + + // Set the path within the bucket for the event_states table + let mut stream_states_url = ctx.object_store_url.clone(); + stream_states_url.set_path(STREAM_STATES_TABLE_OBJECT_STORE_PATH); + // Register event_states_persistent as a listing table + ctx.session().register_table( + STREAM_STATES_PERSISTENT_TABLE, + Arc::new(ListingTable::try_new( + ListingTableConfig::new(ListingTableUrl::parse(stream_states_url)?) + .with_listing_options(listing_options) + // Use the non partitioned schema as the parquet files themselves do not + // contain the partition columns. + .with_schema(schemas::event_states()), + )?), + )?; + + ctx.session().register_table( + STREAM_STATES_MEM_TABLE, + Arc::new(CacheTable::try_new( + schemas::event_states_partitioned(), + vec![vec![RecordBatch::new_empty( + schemas::event_states_partitioned(), + )]], + )?), + )?; + + ctx.session().register_table( + STREAM_STATES_TABLE, + ctx.session() + .table(STREAM_STATES_MEM_TABLE) + .await? + .union(ctx.session().table(STREAM_STATES_PERSISTENT_TABLE).await?)? + .into_view(), + )?; + + // Query for max event_state_order and stream_state_order in persistent stream_states, this is where we should start + // the new order values. + let batches = ctx + .session() + .table(STREAM_STATES_PERSISTENT_TABLE) + .await? + .aggregate( + vec![], + vec![ + datafusion::functions_aggregate::min_max::max(col("event_state_order")) + .alias("max_event_state_order"), + datafusion::functions_aggregate::min_max::max(col("stream_state_order")) + .alias("max_stream_state_order"), + ], + )? + .collect() + .await?; + let max_event_state_order = batches.first().and_then(|batch| { + batch + .column_by_name("max_event_state_order") + .and_then(|col| { + as_uint64_array(&col) + .ok() + .and_then(|col| col.is_valid(0).then(|| col.value(0))) + }) + }); + let max_stream_state_order = batches.first().and_then(|batch| { + batch + .column_by_name("max_stream_state_order") + .and_then(|col| { + as_uint64_array(&col) + .ok() + .and_then(|col| col.is_valid(0).then(|| col.value(0))) + }) + }); + + // Spawn actor + let (broadcast_tx, _broadcast_rx) = broadcast::channel(1_000); + let resolver = Resolver { + ctx: ctx.session(), + broadcast_tx, + max_cached_rows: max_cached_rows.unwrap_or(DEFAULT_MAX_CACHED_ROWS), + order: max_stream_state_order.unwrap_or_default(), + }; + + let (handle, task_handle) = Self::spawn(size, resolver, metrics, shutdown.wait_fut()); + let h = handle.clone(); + let sub_handle = tokio::spawn(async move { + if let Err(err) = + aggregator_subscription(shutdown.wait_fut(), aggregator, h, max_event_state_order) + .await + { + error!(%err, "resolver actor aggregator_subscription task failed"); + } + }); + + ctx.session() + .register_table( + STREAM_STATES_FEED_TABLE, + Arc::new(FeedTable::new(handle.clone())), + ) + .expect("should be able to register table"); + + Ok((handle, vec![task_handle, sub_handle])) + } + + // Process batch of new event states. + // Return any new stream states. + #[instrument(skip(self, event_states), err)] + async fn process_event_states_batch( + &mut self, + event_states: RecordBatch, + ) -> Result { + // Psuedocode of the data flow of conflict resolution. + // The process involves two tables and two udf. + // The first table and udf are responsible for simply determining the new tips. + // This should be possible with just information about the previous pointers. + // (TODO: The previous value is not preserved from conclusion_events to event_states but is + // straightforward to do) + // + // The second table and udf then does actual conflict resolution given all the tips. This can then + // use time information event height and cids to have a deterministic and canonical tip for + // a stream. + // + // event_states + // // Left join the new events with existing known stream tips + // >> join(stream_tips) + // // Now we have a new event and a list of existing tips. Pass all of this information + // // into a udf. That UDF will produce a new row that contains a list of the new tips. + // // The new tips may be the same, more or less than before. + // // This should be possible without knowing the contents of the events themselves + // >> compute_new_tips_udf() + // // Write the new tips row per stream + // >> write_to(stream_tips) + // // Unnest the list of tips so we have a row per tip per stream + // >> unnest(tips) + // // Join the tips with the event_states table as the tips are just cids + // >> join(event_states) + // // Now we have a set of rows per stream one per tip. + // // Use an aggregate udf to resolve the conflicts producing a single row per stream + // >> aggregate(resolve_conflicts_udf(), group_by(stream_cid)) + // // Write these final rows to the stream_states table and return them as a batch from + // // this function. + // >> write_to(stream_states) + // + // See process_conclusion_event_batch() as an example of how to build up these data flow + // pipelines. + todo!() + } +} + +async fn aggregator_subscription( + mut shutdown: ShutdownSignal, + aggregator: AggregatorHandle, + resolver: ResolverHandle, + offset: Option, +) -> Result<()> { + debug!(?offset, "starting aggregator subscription"); + let mut rx = aggregator + .send(SubscribeSinceMsg { + projection: None, + offset, + limit: None, + }) + .await??; + loop { + select! { + _ = &mut shutdown => { break } + r = rx.try_next() => { + match r { + Ok(Some(batch)) => { + if let Err(err) = resolver.send(NewEventStatesMsg { events: batch }).await? { + error!(?err, "aggregator subscription loop failed to process new event states"); + } + } + // Subscription has finished, this means we are shutting down. + Ok(None) => { break }, + Err(err) => { + error!(%err, "aggregator subscription loop failed to retrieve new event states."); + } + } + } + } + } + Ok(()) +} + +actor_envelope! { + ResolverEnvelope, + ResolverActor, + ResolverRecorder, + // Subscribe to the stream_states table, we do not yet expose a subscription to stream_tips + SubscribeSince => SubscribeSinceMsg, + NewEventStates => NewEventStatesMsg, + StreamState => StreamStateMsg, +} + +#[async_trait] +impl Handler for Resolver { + async fn handle( + &mut self, + message: SubscribeSinceMsg, + ) -> ::Result { + let subscription = self.broadcast_tx.subscribe(); + let ctx = self.ctx.clone(); + rows_since( + schemas::stream_states(), + "stream_state_order", + message.projection, + message.offset, + message.limit, + Box::pin(RecordBatchStreamAdapter::new( + schemas::stream_states(), + tokio_stream::wrappers::BroadcastStream::new(subscription) + .map_err(|err| exec_datafusion_err!("{err}")), + )), + // Future Optimization can be to send the projection and limit into the events_since call. + stream_states_since(&ctx, message.offset).await?, + ) + } +} + +async fn stream_states_since( + ctx: &SessionContext, + offset: Option, +) -> Result { + let mut stream_states = ctx + .table(STREAM_STATES_TABLE) + .await? + .select(vec![wildcard()])? + // Do not return the partition columns + .drop_columns(&["stream_cid_partition"])?; + if let Some(offset) = offset { + stream_states = stream_states.filter(col("stream_state_order").gt(lit(offset)))?; + } + Ok(stream_states + .sort(vec![col("stream_state_order").sort(true, true)])? + .execute_stream() + .await?) +} + +/// Inform the resolver about new event states. +#[derive(Debug)] +pub struct NewEventStatesMsg { + events: RecordBatch, +} +impl Message for NewEventStatesMsg { + type Result = anyhow::Result<()>; +} + +#[async_trait] +impl Handler for Resolver { + async fn handle( + &mut self, + message: NewEventStatesMsg, + ) -> ::Result { + debug!(event_count = message.events.num_rows(), "new event states"); + let batch = self.process_event_states_batch(message.events).await?; + if batch.num_rows() > 0 { + let _ = self.broadcast_tx.send(batch); + } + Ok(()) + } +} + +/// Request the state of a stream +#[derive(Debug)] +pub struct StreamStateMsg { + /// Id of the stream + pub id: StreamId, +} +impl Message for StreamStateMsg { + type Result = anyhow::Result>; +} + +/// State of a single stream +pub struct StreamState { + /// Multibase encoding of the stream id + pub id: StreamId, + + /// CID of the event that produced this state + pub event_cid: Cid, + + /// Controller of the stream + pub controller: String, + + /// Dimensions of the stream, each value is multibase encoded. + pub dimensions: HashMap>, + + /// The data of the stream. Content is stream type specific. + pub data: Vec, +} + +impl std::fmt::Debug for StreamState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if !f.alternate() { + f.debug_struct("StreamState") + .field("id", &self.id) + .field("event_cid", &self.event_cid) + .field("controller", &self.controller) + .field("dimensions", &self.dimensions) + .field("data", &self.data) + .finish() + } else { + f.debug_struct("StreamState") + .field("id", &self.id.to_string()) + .field("event_cid", &self.event_cid.to_string()) + .field("controller", &self.controller) + .field( + "dimensions", + &self + .dimensions + .iter() + .map(|(k, v)| (k, multibase::encode(multibase::Base::Base64Url, v))) + .collect::>(), + ) + .field( + "data", + &multibase::encode(multibase::Base::Base64Url, &self.data), + ) + .finish() + } + } +} + +#[async_trait] +impl Handler for Resolver { + #[instrument(skip(self), ret, err)] + async fn handle(&mut self, message: StreamStateMsg) -> ::Result { + let id = message.id; + let state_batch = self + .ctx + .table(STREAM_STATES_TABLE) + .await + .context("table not found {STREAM_STATES_TABLE}")? + .select(vec![ + col("stream_state_order"), + col("stream_cid"), + col("event_cid"), + col("dimensions"), + col("controller"), + col("data"), + ]) + .context("invalid select")? + .filter(col("stream_cid").eq(lit(id.cid.to_bytes()))) + .context("invalid filter")? + .aggregate( + vec![col("stream_cid"), col("controller")], + vec![ + last_value(vec![col("data")]) + .order_by(vec![col("stream_state_order").sort(true, true)]) + .build() + .context("invalid last_value data query")? + .alias("data"), + last_value(vec![col("event_cid")]) + .order_by(vec![col("stream_state_order").sort(true, true)]) + .build() + .context("invalid last_value event_cid query")? + .alias("event_cid"), + last_value(vec![col("dimensions")]) + .order_by(vec![col("stream_state_order").sort(true, true)]) + .build() + .context("invalid last_value dimensions query")? + .alias("dimensions"), + ], + ) + .context("invalid window")? + .collect() + .await + .context("invalid query")?; + + let num_rows: usize = state_batch.iter().map(|b| b.num_rows()).sum(); + if num_rows == 0 { + // No state for the stream id found + return Ok(None); + } + let batch = concat_batches(&state_batch[0].schema(), state_batch.iter()) + .context("concat batches")?; + + let data = as_binary_array( + batch + .column_by_name("data") + .ok_or_else(|| anyhow::anyhow!("state column should exist"))?, + ) + .context("data as a binary column")? + .value(0); + let event_cid = as_binary_array( + batch + .column_by_name("event_cid") + .ok_or_else(|| anyhow::anyhow!("event_cid column should exist"))?, + ) + .context("event_cid as a binary column")?; + let controller = as_string_array( + batch + .column_by_name("controller") + .ok_or_else(|| anyhow::anyhow!("controller column should exist"))?, + ) + .context("controller as a string column")?; + let dimensions = as_map_array( + batch + .column_by_name("dimensions") + .ok_or_else(|| anyhow::anyhow!("dimensions column should exist"))?, + )?; + let keys = + as_string_array(dimensions.keys()).context("dimesions_keys as a string column")?; + let values = as_dictionary_array::(dimensions.values())? + .downcast_dict::() + .ok_or_else(|| anyhow::anyhow!("dimensions values should be binary"))?; + let mut dimensions = HashMap::with_capacity(keys.len()); + for i in 0..keys.len() { + let key = keys.value(i); + let value = values.value(i); + dimensions.insert(key.to_string(), value.to_vec()); + } + Ok(Some(StreamState { + id, + event_cid: Cid::read_bytes(event_cid.value(0)).context("event_cid as a CID")?, + controller: controller.value(0).to_string(), + dimensions, + data: data.to_vec(), + })) + } +} + +#[async_trait] +impl FeedTableSource for ResolverHandle { + fn schema(&self) -> SchemaRef { + schemas::event_states() + } + async fn subscribe_since( + &self, + projection: Option>, + offset: Option, + limit: Option, + ) -> anyhow::Result { + Ok(self + .send(SubscribeSinceMsg { + projection, + offset, + limit, + }) + .await??) + } +} diff --git a/pipeline/src/schemas.rs b/pipeline/src/schemas.rs index c869b4a57..4279590d2 100644 --- a/pipeline/src/schemas.rs +++ b/pipeline/src/schemas.rs @@ -7,6 +7,10 @@ static CONCLUSION_EVENTS: OnceLock = OnceLock::new(); static EVENT_STATES: OnceLock = OnceLock::new(); static EVENT_STATES_PARTITIONED: OnceLock = OnceLock::new(); static PENDING_EVENT_STATES: OnceLock = OnceLock::new(); +static STREAM_TIPS: OnceLock = OnceLock::new(); +static STREAM_TIPS_PARTITIONED: OnceLock = OnceLock::new(); +static STREAM_STATES: OnceLock = OnceLock::new(); +static STREAM_STATES_PARTITIONED: OnceLock = OnceLock::new(); /// The `conclusion_events` table contains the raw events annotated with conclusions about each /// event. @@ -190,3 +194,148 @@ pub fn pending_event_states() -> SchemaRef { ) })) } + +/// The `stream_tips` table contains the set of tips for each stream +pub fn stream_tips() -> SchemaRef { + Arc::clone(STREAM_TIPS.get_or_init(|| { + Arc::new( + SchemaBuilder::from(&Fields::from(vec![ + // Used to know how far through the event_states table the stream_tips table has + // processed. + Field::new("event_state_order", DataType::UInt64, false), + // An order that ensures an stream tip row comes after all rows needed to + // compute the tips of a stream. This order is stream type dependent, but holds for all + // stream types. + Field::new("stream_tip_order", DataType::UInt64, false), + Field::new("stream_cid", DataType::Binary, false), + Field::new("stream_type", DataType::UInt8, false), + Field::new("controller", DataType::Utf8, false), + Field::new( + "dimensions", + DataType::Map( + Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Binary), + ), + true, + ), + ] + .into(), + ), + false, + ) + .into(), + false, + ), + true, + ), + Field::new("tips", DataType::new_list(DataType::Binary, false), false), + ])) + .finish(), + ) + })) +} + +/// The `stream_tips_partitioned` table contains the set of tips for a stream. +/// This schema includes the partition columns of the table. +pub fn stream_tips_partitioned() -> SchemaRef { + Arc::clone(STREAM_TIPS_PARTITIONED.get_or_init(|| { + Arc::new( + arrow_schema::SchemaBuilder::from(&arrow_schema::Fields::from( + // Append partition fields to the end of the unpartitioned schema + event_states() + .fields() + .into_iter() + .cloned() + .chain(vec![Arc::new(arrow_schema::Field::new( + "stream_cid_partition", + DataType::Int32, + false, + ))]) + .collect::>(), + )) + .finish(), + ) + })) +} + +/// The `stream_states` table contains the canonical tip for each stream +pub fn stream_states() -> SchemaRef { + Arc::clone(STREAM_STATES.get_or_init(|| { + Arc::new( + SchemaBuilder::from(&Fields::from(vec![ + // Used to know how far through the stream_tips table the stream_state table has + // processed. + Field::new("stream_tip_order", DataType::UInt64, false), + // An order that ensures each new stream state comes after all previous states of + // that stream. This order is stream type dependent, but holds for all + // stream types. + Field::new("stream_state_order", DataType::UInt64, false), + Field::new("stream_cid", DataType::Binary, false), + Field::new("stream_type", DataType::UInt8, false), + Field::new("controller", DataType::Utf8, false), + Field::new( + "dimensions", + DataType::Map( + Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Binary), + ), + true, + ), + ] + .into(), + ), + false, + ) + .into(), + false, + ), + true, + ), + Field::new("event_cid", DataType::Binary, false), + Field::new("event_type", DataType::UInt8, false), + Field::new("event_height", DataType::UInt32, true), + Field::new("data", DataType::Binary, true), + ])) + .finish(), + ) + })) +} + +/// The `stream_states_partitioned` table contains the canonical tip for each stream. +/// This schema includes the partition columns of the table. +pub fn stream_states_partitioned() -> SchemaRef { + Arc::clone(STREAM_TIPS_PARTITIONED.get_or_init(|| { + Arc::new( + arrow_schema::SchemaBuilder::from(&arrow_schema::Fields::from( + // Append partition fields to the end of the unpartitioned schema + event_states() + .fields() + .into_iter() + .cloned() + .chain(vec![Arc::new(arrow_schema::Field::new( + "stream_cid_partition", + DataType::Int32, + false, + ))]) + .collect::>(), + )) + .finish(), + ) + })) +} From a453ed5f3140167f5716940a9c7f79b93148f677 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 19 Mar 2025 15:22:59 -0600 Subject: [PATCH 02/19] wip: add concluder todos --- event-svc/src/event/service.rs | 5 ++++ pipeline/src/concluder/event.rs | 30 +++++++++++++++++++- pipeline/src/concluder/metrics.rs | 9 +----- pipeline/src/concluder/mock.rs | 15 +--------- pipeline/src/concluder/mod.rs | 18 ------------ pipeline/src/lib.rs | 2 +- pipeline/src/resolver/mod.rs | 46 ++++++++++++++++--------------- 7 files changed, 61 insertions(+), 64 deletions(-) diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index d6ca10f02..9e0783e86 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -396,6 +396,11 @@ impl EventService { init, previous: vec![*time_event.prev()], order: delivered as u64, + before: todo!(), + chain_id: todo!(), + tx_hash: todo!(), + tx_type: todo!(), + root: todo!(), })) } ceramic_event::unvalidated::Event::Signed(signed_event) => { diff --git a/pipeline/src/concluder/event.rs b/pipeline/src/concluder/event.rs index 3ce48f458..5e969ef91 100644 --- a/pipeline/src/concluder/event.rs +++ b/pipeline/src/concluder/event.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use arrow::datatypes::{DataType, Field, Int32Type}; +use ceramic_core::ssi::caip2; use ceramic_core::METAMODEL_STREAM_ID; use ceramic_event::{unvalidated, StreamId, StreamIdType}; use cid::Cid; @@ -110,7 +111,34 @@ pub struct ConclusionTime { pub init: ConclusionInit, /// Ordered list of previous events this event references. pub previous: Vec, - //TODO Add temporal conclusions, i.e the block timestamp of this event + + // TODO figure out how to populate these values. + // Once populated we need to update the conclusion_events table schema to include them + // Then we need to preserve them in the event_states table + // Finally in the resolver we can use this information to determine a canonical tip for a + // stream. + // + // Challenges: The anchor proof information is gather when validating the event but then + // forgotten. It likely needs to be persisteted into a new sqlite table and then the conclusion + // feed call needs to join against that table to produce these fields. + // + // How do we populate that table for existing data? + // Likely need some kind of explicit migration command that can be run to populate the table. + // + /// It is known that the time event existed before this time as a Unix timestamp in seconds. + pub before: u64, + + // Only before should be needed for conflict resolution but it may be nice to preserve these + // other values as well? + // + /// Chain ID where this time event was anchored. + pub chain_id: String, + /// Transaction hash that anchored this time event. + pub tx_hash: String, + /// Transaction type (TODO: not sure what this is). + pub tx_type: String, + /// Root cid of the proof. + pub root: Cid, } impl<'a> TryFrom<&'a unvalidated::Event> for ConclusionInit { diff --git a/pipeline/src/concluder/metrics.rs b/pipeline/src/concluder/metrics.rs index 75249804a..98d81f7de 100644 --- a/pipeline/src/concluder/metrics.rs +++ b/pipeline/src/concluder/metrics.rs @@ -3,7 +3,7 @@ use ceramic_metrics::Recorder; use crate::metrics::{MessageLabels, Metrics}; -use super::{ConcluderRecorder, EventsSinceMsg, NewEventsMsg}; +use super::{ConcluderRecorder, NewEventsMsg}; impl Recorder> for Metrics { fn record(&self, event: &MessageEvent) { @@ -12,11 +12,4 @@ impl Recorder> for Metrics { .inc(); } } -impl Recorder> for Metrics { - fn record(&self, event: &MessageEvent) { - self.message_count - .get_or_create(&MessageLabels::from(event)) - .inc(); - } -} impl ConcluderRecorder for Metrics {} diff --git a/pipeline/src/concluder/mock.rs b/pipeline/src/concluder/mock.rs index c4e08f311..52361d1c7 100644 --- a/pipeline/src/concluder/mock.rs +++ b/pipeline/src/concluder/mock.rs @@ -7,8 +7,7 @@ use prometheus_client::registry::Registry; use crate::metrics::Metrics; use super::{ - Concluder, ConcluderActor, ConcluderEnvelope, ConcluderHandle, EventsSinceMsg, NewEventsMsg, - SubscribeSinceMsg, + Concluder, ConcluderActor, ConcluderEnvelope, ConcluderHandle, NewEventsMsg, SubscribeSinceMsg, }; mock! { @@ -28,11 +27,6 @@ mock! { &mut self, message: SubscribeSinceMsg, ) -> ::Result; - #[allow(missing_docs)] - pub fn handle_events_since( - &mut self, - message: EventsSinceMsg, - ) -> ::Result; } } @@ -53,13 +47,6 @@ impl Handler for MockConcluder { } } -#[async_trait] -impl Handler for MockConcluder { - async fn handle(&mut self, message: EventsSinceMsg) -> ::Result { - self.handle_events_since(message) - } -} - impl Actor for MockConcluder { type Envelope = ConcluderEnvelope; } diff --git a/pipeline/src/concluder/mod.rs b/pipeline/src/concluder/mod.rs index e9383e7cf..f82c62f7b 100644 --- a/pipeline/src/concluder/mod.rs +++ b/pipeline/src/concluder/mod.rs @@ -169,7 +169,6 @@ actor_envelope! { ConcluderRecorder, NewEvents => NewEventsMsg, SubscribeSince => SubscribeSinceMsg, - EventsSince => EventsSinceMsg, } /// Notify actor of new events @@ -329,23 +328,6 @@ async fn events_since( Ok(conclusion_events.execute_stream().await?) } -/// Request the events since a highwater mark -#[derive(Debug)] -pub struct EventsSinceMsg { - /// Optional filters to apply to the query - pub filters: Vec, -} -impl Message for EventsSinceMsg { - type Result = anyhow::Result; -} - -#[async_trait] -impl Handler for Concluder { - async fn handle(&mut self, message: EventsSinceMsg) -> ::Result { - events_since(&self.ctx, Some(message.filters), None).await - } -} - #[async_trait] impl FeedTableSource for ConcluderHandle { fn schema(&self) -> SchemaRef { diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index 8f62e0d04..f7a45dc45 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -14,10 +14,10 @@ mod cache_table; pub mod cid_part; pub mod cid_string; pub mod concluder; -pub mod resolver; mod config; pub mod dimension_extract; mod metrics; +pub mod resolver; pub mod schemas; mod since; pub mod stream_id_string; diff --git a/pipeline/src/resolver/mod.rs b/pipeline/src/resolver/mod.rs index eef62e27b..b7ad1a499 100644 --- a/pipeline/src/resolver/mod.rs +++ b/pipeline/src/resolver/mod.rs @@ -35,7 +35,7 @@ use datafusion::{ functions_aggregate::expr_fn::last_value, logical_expr::{col, lit, ExprFunctionExt as _}, physical_plan::stream::RecordBatchStreamAdapter, - prelude::{wildcard, SessionContext}, + prelude::{wildcard, Expr, SessionContext}, }; use futures::TryStreamExt as _; use shutdown::{Shutdown, ShutdownSignal}; @@ -47,7 +47,7 @@ use crate::{ cache_table::CacheTable, metrics::Metrics, schemas, - since::{rows_since, FeedTable, FeedTableSource}, + since::{gt_expression, rows_since, FeedTable, FeedTableSource, RowsSinceInput}, PipelineContext, Result, SessionContextRef, }; // Use the SubscribeSinceMsg so its clear its a message for this actor @@ -310,7 +310,7 @@ async fn aggregator_subscription( let mut rx = aggregator .send(SubscribeSinceMsg { projection: None, - offset, + filters: offset.map(|o| vec![gt_expression("event_state_order", o)]), limit: None, }) .await??; @@ -354,40 +354,42 @@ impl Handler for Resolver { ) -> ::Result { let subscription = self.broadcast_tx.subscribe(); let ctx = self.ctx.clone(); - rows_since( - schemas::stream_states(), - "stream_state_order", - message.projection, - message.offset, - message.limit, - Box::pin(RecordBatchStreamAdapter::new( + rows_since(RowsSinceInput { + session_context: &ctx, + schema: schemas::stream_states(), + order_col: "stream_state_order", + projection: message.projection, + filters: message.filters.clone(), + limit: message.limit, + subscription: Box::pin(RecordBatchStreamAdapter::new( schemas::stream_states(), tokio_stream::wrappers::BroadcastStream::new(subscription) .map_err(|err| exec_datafusion_err!("{err}")), )), // Future Optimization can be to send the projection and limit into the events_since call. - stream_states_since(&ctx, message.offset).await?, - ) + since: stream_states_since(&ctx, message.filters).await?, + }) } } async fn stream_states_since( ctx: &SessionContext, - offset: Option, + filters: Option>, ) -> Result { let mut stream_states = ctx .table(STREAM_STATES_TABLE) .await? .select(vec![wildcard()])? // Do not return the partition columns - .drop_columns(&["stream_cid_partition"])?; - if let Some(offset) = offset { - stream_states = stream_states.filter(col("stream_state_order").gt(lit(offset)))?; + .drop_columns(&["stream_cid_partition"])? + .sort(vec![col("stream_state_order").sort(true, true)])?; + + if let Some(filters) = filters { + for filter in filters { + stream_states = stream_states.filter(filter)?; + } } - Ok(stream_states - .sort(vec![col("stream_state_order").sort(true, true)])? - .execute_stream() - .await?) + Ok(stream_states.execute_stream().await?) } /// Inform the resolver about new event states. @@ -581,13 +583,13 @@ impl FeedTableSource for ResolverHandle { async fn subscribe_since( &self, projection: Option>, - offset: Option, + filters: Option>, limit: Option, ) -> anyhow::Result { Ok(self .send(SubscribeSinceMsg { projection, - offset, + filters, limit, }) .await??) From 7719c93ff0528309f767867d9801606899789c23 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 19 Mar 2025 15:33:17 -0600 Subject: [PATCH 03/19] more notes --- pipeline/src/resolver/mod.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pipeline/src/resolver/mod.rs b/pipeline/src/resolver/mod.rs index b7ad1a499..22d3e12bb 100644 --- a/pipeline/src/resolver/mod.rs +++ b/pipeline/src/resolver/mod.rs @@ -189,6 +189,24 @@ impl Resolver { .into_view(), )?; + // TODO: This is a bit over simplified. + // + // Here we assume that the stream_tips and stream_states tables remain in lock step (i.e. + // we can atomically write to both) + // + // However that is not a safe assumption. + // + // I see two ways to solve this: + // + // 1. Use two different actors, a tips actor and a resolver actor. + // Then have the resolver actor subscribe the tips actor and use the stream_tip_order to + // ensure it makes forward progress atomically. + // + // 2. Keep this single actor but track the stream_tip_order and stream_state order + // separately so that we get the same atomicity with the complexity of two actors. + // + // In short one complicated actor or two simpler actors. + // Query for max event_state_order and stream_state_order in persistent stream_states, this is where we should start // the new order values. let batches = ctx From af197e9274e04881c4525149d2c62eaa1d05a3c0 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 10 Apr 2025 12:04:12 -0600 Subject: [PATCH 04/19] chore: move blockchain module into event-svc crate it will require a db connection so this avoids circular deps, plus it doesn't need to be in the validation crate --- Cargo.lock | 343 ++++++++++++------ Cargo.toml | 1 + event-svc/Cargo.toml | 6 + .../src/blockchain/eth_rpc/http.rs | 1 - .../src/blockchain/eth_rpc/mod.rs | 0 .../src/blockchain/eth_rpc/types.rs | 0 .../src/blockchain/mod.rs | 0 event-svc/src/event/validator/event.rs | 2 +- event-svc/src/event/validator/time.rs | 2 +- event-svc/src/lib.rs | 3 +- validation/Cargo.toml | 5 +- validation/src/lib.rs | 2 - 12 files changed, 239 insertions(+), 126 deletions(-) rename {validation => event-svc}/src/blockchain/eth_rpc/http.rs (99%) rename {validation => event-svc}/src/blockchain/eth_rpc/mod.rs (100%) rename {validation => event-svc}/src/blockchain/eth_rpc/types.rs (100%) rename {validation => event-svc}/src/blockchain/mod.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index f0d240e04..bc64e16d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,7 +75,7 @@ dependencies = [ "once_cell", "serde", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -110,9 +110,9 @@ checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "alloy" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f18703431261fdb54ca49eb949aa1bb8fcb436cf7c1283a0fbbc3d3cd9fadd7" +checksum = "056f2c01b2aed86e15b43c47d109bfc8b82553dc34e66452875e51247ec31ab2" dependencies = [ "alloy-consensus", "alloy-core", @@ -129,19 +129,20 @@ dependencies = [ [[package]] name = "alloy-chains" -version = "0.1.34" +version = "0.1.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8158b4878c67837e5413721cc44298e6a2d88d39203175ea025e51892a16ba4c" +checksum = "28e2652684758b0d9b389d248b209ed9fd9989ef489a550265fe4bb8454fe7eb" dependencies = [ + "alloy-primitives", "num_enum 0.7.3", - "strum", + "strum 0.27.1", ] [[package]] name = "alloy-consensus" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "089cd553dc0b372831f3ce871a31921ba8a851ddd51f312a89fb987a00fb6e2c" +checksum = "705687d5bfd019fee57cf9e206b27b30a9a9617535d5590a02b171e813208f8e" dependencies = [ "alloy-eips", "alloy-primitives", @@ -155,9 +156,9 @@ dependencies = [ [[package]] name = "alloy-core" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce854562e7cafd5049189d0268d6e5cba05fe6c9cb7c6f8126a79b94800629c" +checksum = "9d8bcce99ad10fe02640cfaec1c6bc809b837c783c1d52906aa5af66e2a196f6" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -168,9 +169,9 @@ dependencies = [ [[package]] name = "alloy-dyn-abi" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b499852e1d0e9b8c6db0f24c48998e647c0d5762a01090f955106a7700e4611" +checksum = "eb8e762aefd39a397ff485bc86df673465c4ad3ec8819cc60833a8a3ba5cdc87" dependencies = [ "alloy-json-abi", "alloy-primitives", @@ -180,7 +181,7 @@ dependencies = [ "itoa", "serde", "serde_json", - "winnow 0.6.20", + "winnow 0.7.6", ] [[package]] @@ -208,9 +209,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfe755025743aa133536db98904e6e4b5481c5b2ec864ff56b5b516611a8d34" +checksum = "6ffb906284a1e1f63c4607da2068c8197458a352d0b3e9796e67353d72a9be85" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -226,9 +227,9 @@ dependencies = [ [[package]] name = "alloy-genesis" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cbb8b3851cf6e8cc81c697352b4120f34bda362aa279e59e670c3660efa9db0" +checksum = "8429cf4554eed9b40feec7f4451113e76596086447550275e3def933faf47ce3" dependencies = [ "alloy-primitives", "alloy-serde", @@ -237,9 +238,9 @@ dependencies = [ [[package]] name = "alloy-json-abi" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a438d4486b5d525df3b3004188f9d5cd1d65cd30ecc41e5a3ccef6f6342e8af9" +checksum = "fe6beff64ad0aa6ad1019a3db26fef565aefeb011736150ab73ed3366c3cfd1b" dependencies = [ "alloy-primitives", "alloy-sol-type-parser", @@ -249,9 +250,9 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e136f67d5921b150124e8a76573357005a4ae419ddfe4a2a2b96d21e3a67b9c4" +checksum = "f8fa8a1a3c4cbd221f2b8e3693aeb328fca79a757fe556ed08e47bbbc2a70db7" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -263,9 +264,9 @@ dependencies = [ [[package]] name = "alloy-network" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7050e189311c607064b08383db700787f2ed49b8db4cd9b7e26f6f124729abac" +checksum = "85fa23a6a9d612b52e402c995f2d582c25165ec03ac6edf64c861a76bc5b87cd" dependencies = [ "alloy-consensus", "alloy-eips", @@ -284,9 +285,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915a5f401d05d9d721ca18b0498956c8e771285b3ab211d0695c3c6e8b9121ea" +checksum = "801492711d4392b2ccf5fc0bc69e299fa1aab15167d74dcaa9aab96a54f684bd" dependencies = [ "alloy-consensus", "alloy-eips", @@ -297,17 +298,17 @@ dependencies = [ [[package]] name = "alloy-primitives" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "260d3ff3bff0bb84599f032a2f2c6828180b0ea0cd41fdaf44f39cef3ba41861" +checksum = "8c77490fe91a0ce933a1f219029521f20fc28c2c0ca95d53fa4da9c00b8d9d4e" dependencies = [ "alloy-rlp", "bytes 1.7.2", "cfg-if", "const-hex", - "derive_more 1.0.0", - "hashbrown 0.14.5", - "hex-literal", + "derive_more 2.0.1", + "foldhash", + "hashbrown 0.15.2", "indexmap 2.7.1", "itoa", "k256 0.13.4", @@ -324,9 +325,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed3b9667c337d9d0fa25cb476f4d8af84aee8a25fbcccf9b1aa6d554306885e4" +checksum = "fcfaa4ffec0af04e3555686b8aacbcdf7d13638133a0672749209069750f78a6" dependencies = [ "alloy-chains", "alloy-consensus", @@ -358,9 +359,9 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.8" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26154390b1d205a4a7ac7352aa2eb4f81f391399d4e2f546fb81a2f8bb383f62" +checksum = "3d6c1d995bff8d011f7cd6c81820d51825e6e06d6db73914c1630ecf544d83d6" dependencies = [ "alloy-rlp-derive", "arrayvec 0.7.6", @@ -369,9 +370,9 @@ dependencies = [ [[package]] name = "alloy-rlp-derive" -version = "0.3.8" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d0f2d905ebd295e7effec65e5f6868d153936130ae718352771de3e7d03c75c" +checksum = "a40e1ef334153322fd878d07e86af7a529bcb86b2439525920a88eba87bcf943" dependencies = [ "proc-macro2", "quote", @@ -380,9 +381,9 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a86e548f0e92e2bcaafb0b3a8f4238384798dca780d69f3f1b1eb7ade2812924" +checksum = "370143ed581aace6e663342d21d209c6b2e34ee6142f7d6675adb518deeaf0dc" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -402,9 +403,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acf60f133ed3308e42ecff81b6ff76be2d4e2a14af2d537413267bcdccd1c49" +checksum = "9ffc534b7919e18f35e3aa1f507b6f3d9d92ec298463a9f6beaac112809d8d06" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", @@ -414,9 +415,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2f4ebadbae37cfd66c4912a56b1dde9573c2440533d43b90bf0c1740179ee4" +checksum = "413f4aa3ccf2c3e4234a047c5fa4727916d7daf25a89f9b765df0ba09784fd87" dependencies = [ "alloy-consensus", "alloy-eips", @@ -433,9 +434,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e75aed754756184a2071b0a0f5680b76ae5aaf3faf8006b461811c4c2cac06" +checksum = "9dff0ab1cdd43ca001e324dc27ee0e8606bd2161d6623c63e0e0b8c4dfc13600" dependencies = [ "alloy-primitives", "serde", @@ -444,9 +445,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e204a603220a68d07796d9e12ebf3a6f701c5d26925be3d0d7962901d6e037e" +checksum = "2fd4e0ad79c81a27ca659be5d176ca12399141659fef2bcbfdc848da478f4504" dependencies = [ "alloy-primitives", "async-trait", @@ -458,9 +459,9 @@ dependencies = [ [[package]] name = "alloy-sol-macro" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68e7f6e8fe5b443f82b3f1e15abfa191128f71569148428e49449d01f6f49e8b" +checksum = "e10ae8e9a91d328ae954c22542415303919aabe976fe7a92eb06db1b68fd59f2" dependencies = [ "alloy-sol-macro-expander", "alloy-sol-macro-input", @@ -472,9 +473,9 @@ dependencies = [ [[package]] name = "alloy-sol-macro-expander" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b96ce28d2fde09abb6135f410c41fad670a3a770b6776869bd852f1df102e6f" +checksum = "83ad5da86c127751bc607c174d6c9fe9b85ef0889a9ca0c641735d77d4f98f26" dependencies = [ "alloy-sol-macro-input", "const-hex", @@ -490,13 +491,14 @@ dependencies = [ [[package]] name = "alloy-sol-macro-input" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "906746396a8296537745711630d9185746c0b50c033d5e9d18b0a6eba3d53f90" +checksum = "ba3d30f0d3f9ba3b7686f3ff1de9ee312647aac705604417a2f40c604f409a9e" dependencies = [ "const-hex", "dunce", "heck 0.5.0", + "macro-string", "proc-macro2", "quote", "syn 2.0.98", @@ -505,19 +507,19 @@ dependencies = [ [[package]] name = "alloy-sol-type-parser" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc85178909a49c8827ffccfc9103a7ce1767ae66a801b69bdc326913870bf8e6" +checksum = "6d162f8524adfdfb0e4bd0505c734c985f3e2474eb022af32eef0d52a4f3935c" dependencies = [ "serde", - "winnow 0.6.20", + "winnow 0.7.6", ] [[package]] name = "alloy-sol-types" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d86a533ce22525969661b25dfe296c112d35eb6861f188fd284f8bd4bb3842ae" +checksum = "d43d5e60466a440230c07761aa67671d4719d46f43be8ea6e7ed334d8db4a9ab" dependencies = [ "alloy-json-abi", "alloy-primitives", @@ -528,9 +530,9 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c68e829f89855f79d9635800cf8d0c23f7faaecdade1dc446716f27304dd08e" +checksum = "2ac3e97dad3d31770db0fc89bd6a63b789fbae78963086733f960cf32c483904" dependencies = [ "alloy-json-rpc", "base64 0.22.1", @@ -547,9 +549,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91b27d8c6bed88f19e9c5da656cb9e5c69882d8ee8a457f0546ff12d89c766d1" +checksum = "b367dcccada5b28987c2296717ee04b9a5637aacd78eacb1726ef211678b5212" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -1467,9 +1469,9 @@ dependencies = [ [[package]] name = "auto_impl" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" +checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" dependencies = [ "proc-macro2", "quote", @@ -1852,9 +1854,9 @@ dependencies = [ [[package]] name = "blst" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4378725facc195f1a538864863f6de233b500a8862747e7f165078a419d5e874" +checksum = "47c79a94619fade3c0b887670333513a67ac28a6a7e653eb260bf0d4103db38d" dependencies = [ "cc", "glob", @@ -1930,9 +1932,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "byte-slice-cast" -version = "1.2.2" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" +checksum = "7575182f7272186991736b70173b0ea045398f984bf5ebbb3804736ce1330c9d" [[package]] name = "bytecount" @@ -2087,9 +2089,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.24" +version = "1.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938" +checksum = "525046617d8376e3db1deffb079e91cef90a89fc3ca5c185bbf8c9ecdd15cd5c" dependencies = [ "jobserver", "libc", @@ -2316,6 +2318,7 @@ dependencies = [ name = "ceramic-event-svc" version = "0.54.2" dependencies = [ + "alloy", "anyhow", "async-trait", "bytes 1.7.2", @@ -2336,19 +2339,23 @@ dependencies = [ "ipld-core", "iroh-bitswap", "itertools 0.13.0", + "lru 0.10.1", "mockall", "multibase 0.9.1", "multihash 0.19.1", "multihash-codetable", "multihash-derive 0.9.0", + "once_cell", "paste", "prettytable-rs", "prometheus-client", "rand 0.8.5", "recon", + "reqwest 0.11.27", "serde", "serde_ipld_dagcbor", "sqlx", + "ssi", "test-log", "thiserror 1.0.64", "tmpdir", @@ -2719,12 +2726,10 @@ dependencies = [ "ed25519-dalek 2.1.1", "ipld-core", "k256 0.13.4", - "lru 0.10.1", "multibase 0.9.1", "multihash-codetable", "once_cell", "p256 0.13.2", - "reqwest 0.11.27", "serde", "serde_ipld_dagcbor", "serde_json", @@ -2960,8 +2965,8 @@ version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", "unicode-width", ] @@ -3015,9 +3020,9 @@ dependencies = [ [[package]] name = "const-hex" -version = "1.13.1" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0121754e84117e65f9d90648ee6aa4882a6e63110307ab73967a4c5e7e69e586" +checksum = "4b0485bab839b018a8f1723fc5391819fea5f8f0f32288ef8a735fd096b6160c" dependencies = [ "cfg-if", "cpufeatures", @@ -4147,7 +4152,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" dependencies = [ - "derive_more-impl", + "derive_more-impl 1.0.0", +] + +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl 2.0.1", ] [[package]] @@ -4162,6 +4176,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", + "unicode-xid", +] + [[package]] name = "did-method-key" version = "0.2.2" @@ -4371,9 +4397,9 @@ dependencies = [ [[package]] name = "either" -version = "1.13.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" dependencies = [ "serde", ] @@ -4597,6 +4623,17 @@ dependencies = [ "bytes 1.7.2", ] +[[package]] +name = "fastrlp" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce8dba4714ef14b8274c371879b175aa55b16b30f269663f19d576f380018dc4" +dependencies = [ + "arrayvec 0.7.6", + "auto_impl", + "bytes 1.7.2", +] + [[package]] name = "ff" version = "0.12.1" @@ -5208,7 +5245,6 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash 0.8.11", "allocator-api2", - "serde", ] [[package]] @@ -5220,6 +5256,7 @@ dependencies = [ "allocator-api2", "equivalent", "foldhash", + "serde", ] [[package]] @@ -5289,12 +5326,6 @@ dependencies = [ "serde", ] -[[package]] -name = "hex-literal" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" - [[package]] name = "hex_fmt" version = "0.3.0" @@ -5897,13 +5928,13 @@ dependencies = [ [[package]] name = "impl-trait-for-tuples" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d7a9f6330b71fea57921c9b61c47ee6e84f72d394754eff6163ae67e7395eb" +checksum = "a0eb5a3343abf848c0984fe4604b2b105da9539376e24fc0a3b0007411ae4fd9" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.98", ] [[package]] @@ -7624,6 +7655,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "macro-string" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b27834086c65ec3f9387b096d66e99f221cf081c2b738042aa252bcd41204e3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -8312,12 +8354,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.1" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" -dependencies = [ - "portable-atomic", -] +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "oorandom" @@ -8737,12 +8776,12 @@ checksum = "b687ff7b5da449d39e418ad391e5e08da53ec334903ddbb921db208908fc372c" [[package]] name = "pest" -version = "2.7.13" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9" +checksum = "198db74531d58c70a361c42201efde7e2591e976d518caf7662a47dc5720e7b6" dependencies = [ "memchr", - "thiserror 1.0.64", + "thiserror 2.0.11", "ucd-trie", ] @@ -9033,7 +9072,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -9246,12 +9285,12 @@ dependencies = [ [[package]] name = "proptest" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c2511913b88df1637da85cc8d96ec8e43a3f8bb8ccb71ee1ac240d6f3df58d" +checksum = "14cae93065090804185d3b75f0bf93b8eeda30c7a9b4a33d3bdb3988d6229e50" dependencies = [ - "bit-set 0.5.3", - "bit-vec 0.6.3", + "bit-set 0.8.0", + "bit-vec 0.8.0", "bitflags 2.6.0", "lazy_static", "num-traits", @@ -9609,6 +9648,17 @@ dependencies = [ "serde", ] +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.24", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -9629,6 +9679,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.5.1" @@ -9647,6 +9707,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "rand_hc" version = "0.2.0" @@ -10136,21 +10205,24 @@ dependencies = [ [[package]] name = "ruint" -version = "1.12.3" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c3cc4c2511671f327125da14133d0c5c5d137f006a1017a16f557bc85b16286" +checksum = "78a46eb779843b2c4f21fac5773e25d6d5b7c8f0922876c91541790d2ca27eef" dependencies = [ "alloy-rlp", "ark-ff 0.3.0", "ark-ff 0.4.2", "bytes 1.7.2", - "fastrlp", + "fastrlp 0.3.1", + "fastrlp 0.4.0", "num-bigint", + "num-integer", "num-traits", "parity-scale-codec", "primitive-types 0.12.2", "proptest", "rand 0.8.5", + "rand 0.9.0", "rlp", "ruint-macro", "serde", @@ -10172,9 +10244,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" [[package]] name = "rustc-hex" @@ -10528,9 +10600,9 @@ dependencies = [ [[package]] name = "semver-parser" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +checksum = "9900206b54a3527fdc7b8a938bffd94a568bac4f4aa8113b209df75a09c0dec2" dependencies = [ "pest", ] @@ -11785,8 +11857,14 @@ name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" dependencies = [ - "strum_macros", + "strum_macros 0.27.1", ] [[package]] @@ -11802,6 +11880,19 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.98", +] + [[package]] name = "substrait" version = "0.52.3" @@ -11877,9 +11968,9 @@ dependencies = [ [[package]] name = "syn-solidity" -version = "0.8.5" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ab661c8148c2261222a4d641ad5477fd4bea79406a99056096a0b41b35617a5" +checksum = "4560533fbd6914b94a8fb5cc803ed6801c3455668db3b810702c57612bac9412" dependencies = [ "paste", "proc-macro2", @@ -13000,9 +13091,9 @@ checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" [[package]] name = "wait-timeout" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6" +checksum = "09ac3b126d3914f9849036f826e054cbabdc8519970b8998ddaf3b5bd3c65f11" dependencies = [ "libc", ] @@ -13427,9 +13518,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.20" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" dependencies = [ "memchr", ] @@ -13621,7 +13712,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive 0.8.24", ] [[package]] @@ -13635,6 +13735,17 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 1619779f0..ced2b3a8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ # e.g. anyhow's backtrace feature. ahash = "0.8" +alloy = { version = "0.4", features = ["k256", "provider-http", "rpc-types"] } anyhow = { version = "1" } arrow = { version = "54", features = ["prettyprint"] } arrow-array = "54" diff --git a/event-svc/Cargo.toml b/event-svc/Cargo.toml index 321c641f8..6b1a6b929 100644 --- a/event-svc/Cargo.toml +++ b/event-svc/Cargo.toml @@ -8,6 +8,8 @@ repository.workspace = true publish = false [dependencies] +# will probably need contract and signer-local features for self anchoring +alloy.workspace = true anyhow.workspace = true async-trait.workspace = true bytes.workspace = true @@ -29,10 +31,14 @@ itertools.workspace = true multihash-codetable.workspace = true multihash-derive.workspace = true multihash.workspace = true +lru.workspace = true +once_cell.workspace = true prometheus-client.workspace = true recon.workspace = true +reqwest.workspace = true serde.workspace = true serde_ipld_dagcbor.workspace = true +ssi.workspace = true sqlx.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/validation/src/blockchain/eth_rpc/http.rs b/event-svc/src/blockchain/eth_rpc/http.rs similarity index 99% rename from validation/src/blockchain/eth_rpc/http.rs rename to event-svc/src/blockchain/eth_rpc/http.rs index 146b47a31..8283dcaf8 100644 --- a/validation/src/blockchain/eth_rpc/http.rs +++ b/event-svc/src/blockchain/eth_rpc/http.rs @@ -60,7 +60,6 @@ const BLOCK_CACHE_SIZE: usize = 50; type Result = std::result::Result; -#[derive(Debug)] /// Http client to interact with EIP chains pub struct HttpEthRpc { chain_id: caip2::ChainId, diff --git a/validation/src/blockchain/eth_rpc/mod.rs b/event-svc/src/blockchain/eth_rpc/mod.rs similarity index 100% rename from validation/src/blockchain/eth_rpc/mod.rs rename to event-svc/src/blockchain/eth_rpc/mod.rs diff --git a/validation/src/blockchain/eth_rpc/types.rs b/event-svc/src/blockchain/eth_rpc/types.rs similarity index 100% rename from validation/src/blockchain/eth_rpc/types.rs rename to event-svc/src/blockchain/eth_rpc/types.rs diff --git a/validation/src/blockchain/mod.rs b/event-svc/src/blockchain/mod.rs similarity index 100% rename from validation/src/blockchain/mod.rs rename to event-svc/src/blockchain/mod.rs diff --git a/event-svc/src/event/validator/event.rs b/event-svc/src/event/validator/event.rs index d5dfb91d2..a251d0029 100644 --- a/event-svc/src/event/validator/event.rs +++ b/event-svc/src/event/validator/event.rs @@ -2,11 +2,11 @@ use std::sync::Arc; use ceramic_core::{Cid, EventId, NodeId}; use ceramic_event::unvalidated; -use ceramic_validation::eth_rpc; use ipld_core::ipld::Ipld; use recon::ReconItem; use tokio::try_join; +use crate::blockchain::eth_rpc; use crate::event::validator::ChainInclusionProvider; use crate::store::EventAccess; use crate::{ diff --git a/event-svc/src/event/validator/time.rs b/event-svc/src/event/validator/time.rs index 3ebacc043..a4eef1bba 100644 --- a/event-svc/src/event/validator/time.rs +++ b/event-svc/src/event/validator/time.rs @@ -5,7 +5,7 @@ use ceramic_core::ssi::caip2; use ceramic_event::unvalidated; use tracing::warn; -use ceramic_validation::eth_rpc::{self, ChainInclusion, HttpEthRpc}; +use crate::blockchain::eth_rpc::{self, ChainInclusion, HttpEthRpc}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Timestamp(u64); diff --git a/event-svc/src/lib.rs b/event-svc/src/lib.rs index 8d564111e..5b46ce3bc 100644 --- a/event-svc/src/lib.rs +++ b/event-svc/src/lib.rs @@ -1,13 +1,14 @@ //! The Event Service provides an API for ingesting and querying Ceramic Events. #![warn(missing_docs)] +mod blockchain; mod error; mod event; pub mod store; #[cfg(test)] mod tests; -pub use ceramic_validation::eth_rpc; +pub use blockchain::eth_rpc; pub use error::Error; pub use event::ChainInclusionProvider; pub use event::{BlockStore, EventService, UndeliveredEventReview}; diff --git a/validation/Cargo.toml b/validation/Cargo.toml index 2f0a4047b..f09b01fd4 100644 --- a/validation/Cargo.toml +++ b/validation/Cargo.toml @@ -10,6 +10,7 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +alloy.workspace = true anyhow.workspace = true async-trait.workspace = true base64.workspace = true @@ -19,12 +20,10 @@ chrono.workspace = true ed25519-dalek.workspace = true ipld-core.workspace = true k256.workspace = true -lru.workspace = true multibase.workspace = true multihash-codetable.workspace = true once_cell.workspace = true p256.workspace = true -reqwest.workspace = true serde.workspace = true serde_json.workspace = true serde_ipld_dagcbor.workspace = true @@ -33,8 +32,6 @@ ssi.workspace = true tokio.workspace = true tracing.workspace = true unsigned-varint.workspace = true -# will probably need contract and signer-local features for self anchoring -alloy = { version = "0.4", features = ["k256", "provider-http", "rpc-types"] } [dev-dependencies] ceramic-car.workspace = true diff --git a/validation/src/lib.rs b/validation/src/lib.rs index 5bb192cdd..bec3a1821 100644 --- a/validation/src/lib.rs +++ b/validation/src/lib.rs @@ -2,13 +2,11 @@ //! as well as interacting with blockchains for time event validation. #![warn(missing_docs)] -mod blockchain; mod signature; mod siwx_message; #[cfg(test)] mod test; mod verifier; -pub use blockchain::eth_rpc; pub use ceramic_event::unvalidated::signed::cacao; pub use verifier::{cacao_verifier, event_verifier, AtTime, VerifyCacaoOpts, VerifyJwsOpts}; From ea3fa074d802edd9aea0a5d3332800b49c06e4a8 Mon Sep 17 00:00:00 2001 From: David Estes Date: Fri, 11 Apr 2025 09:25:58 -0600 Subject: [PATCH 05/19] feat: store chain timestamp proof data in db and read in pipeline for conclusion events --- CONTRIBUTING.md | 12 +++ event-svc/src/blockchain/eth_rpc/http.rs | 22 ++--- event-svc/src/blockchain/eth_rpc/mod.rs | 5 +- event-svc/src/blockchain/eth_rpc/types.rs | 33 +++++++- event-svc/src/blockchain/mod.rs | 10 +++ event-svc/src/event/service.rs | 81 +++++++++++++++++-- event-svc/src/event/validator/event.rs | 21 +++-- event-svc/src/event/validator/time.rs | 47 ++++++----- event-svc/src/store/mod.rs | 2 +- event-svc/src/store/sql/access/event.rs | 43 +++++++++- .../src/store/sql/entities/chain_proof.rs | 33 ++++++++ event-svc/src/store/sql/entities/mod.rs | 2 + event-svc/src/store/sql/query.rs | 65 +++++++++++++++ event-svc/src/tests/event.rs | 61 +++++++++++++- flight/tests/server.rs | 1 + .../20250410224437_chain_timestamp.down.sql | 2 + .../20250410224437_chain_timestamp.up.sql | 10 +++ .../20250410224445_chain_proof.down.sql | 2 + .../sqlite/20250410224445_chain_proof.up.sql | 12 +++ one/src/migrations.rs | 30 +++++-- pipeline/src/aggregator/mod.rs | 12 ++- pipeline/src/concluder/event.rs | 23 +++--- pipeline/src/concluder/mod.rs | 10 ++- pipeline/src/since/feed.rs | 2 - 24 files changed, 461 insertions(+), 80 deletions(-) create mode 100644 event-svc/src/store/sql/entities/chain_proof.rs create mode 100644 migrations/sqlite/20250410224437_chain_timestamp.down.sql create mode 100644 migrations/sqlite/20250410224437_chain_timestamp.up.sql create mode 100644 migrations/sqlite/20250410224445_chain_proof.down.sql create mode 100644 migrations/sqlite/20250410224445_chain_proof.up.sql diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 70712debc..6c99e66df 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -15,6 +15,18 @@ Using the makefile is not necessary during your development cycle, feel free to However running `make` before publishing a PR will provide a good signal if you PR will pass CI. +### Migrations + +If you need to add to the sqlite database schema, you will need to add a migration using the sqlx CLI. + +```sh +cargo install sqlx-cli +# use the name of the migration and the source directory +sqlx migrate add -r "chain_proof" --source ./migrations/sqlite +``` + +After the up and down files are generated, write the apply/revert SQL in the up/down files. This will be applied automatically at startup. + ### Testing Specific Changes The above `make` targets test changes as a whole. diff --git a/event-svc/src/blockchain/eth_rpc/http.rs b/event-svc/src/blockchain/eth_rpc/http.rs index 8283dcaf8..f51c0100b 100644 --- a/event-svc/src/blockchain/eth_rpc/http.rs +++ b/event-svc/src/blockchain/eth_rpc/http.rs @@ -5,7 +5,9 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::eth_rpc::{ChainInclusion, ChainInclusionProof, Error}; +use crate::eth_rpc::{ + types::ChainProofMetadata, ChainInclusion, ChainInclusionProof, Error, Timestamp, +}; use alloy::{ hex, primitives::{BlockHash, TxHash}, @@ -194,11 +196,6 @@ fn get_root_cid_from_input(input: &str, tx_type: EthProofType) -> anyhow::Result } } -/// Get the expected transaction hash for a given root CID (this is v1 proof type) -fn expected_tx_hash(cid: Cid) -> anyhow::Result { - Ok(TxHash::from_str(&hex::encode(cid.hash().digest()))?) -} - #[async_trait::async_trait] impl ChainInclusion for HttpEthRpc { fn chain_id(&self) -> &caip2::ChainId { @@ -208,7 +205,7 @@ impl ChainInclusion for HttpEthRpc { /// Get the block chain transaction if it exists with the block timestamp information async fn get_chain_inclusion_proof(&self, input: &AnchorProof) -> Result { // transaction to blockHash, blockNumber, input - let tx_hash = expected_tx_hash(input.tx_hash()) + let tx_hash = crate::blockchain::tx_hash_try_from_cid(input.tx_hash()) .map_err(|e| Error::InvalidArgument(format!("invalid transaction hash: {}", e)))?; let tx_hash_res = match self.eth_transaction_by_hash(tx_hash).await? { Some(tx) => tx, @@ -239,7 +236,8 @@ impl ChainInclusion for HttpEthRpc { trace!(?blk_hash_res, "blockByHash response"); let tx_type = EthProofType::from_str(input.tx_type()) .map_err(|e| Error::InvalidProof(e.to_string()))?; - let root_cid = get_root_cid_from_input(&tx_hash_res.input.to_string(), tx_type) + let tx_input = tx_hash_res.input.to_string(); + let root_cid = get_root_cid_from_input(&tx_input, tx_type) .map_err(|e| Error::InvalidProof(e.to_string()))?; if let Some(threshold) = BLOCK_THRESHHOLDS.get(self.chain_id()) { @@ -252,8 +250,14 @@ impl ChainInclusion for HttpEthRpc { } Ok(ChainInclusionProof { - timestamp: blk_hash_res.header.timestamp, + timestamp: Timestamp::from_unix_ts(blk_hash_res.header.timestamp), + block_hash: block_hash.to_string(), root_cid, + meta_data: ChainProofMetadata { + chain_id: self.chain_id.clone(), + tx_hash: tx_hash.to_string(), + tx_input, + }, }) } else { Err(Error::TxNotMined { diff --git a/event-svc/src/blockchain/eth_rpc/mod.rs b/event-svc/src/blockchain/eth_rpc/mod.rs index b345b7f31..a8358032c 100644 --- a/event-svc/src/blockchain/eth_rpc/mod.rs +++ b/event-svc/src/blockchain/eth_rpc/mod.rs @@ -2,4 +2,7 @@ mod http; mod types; pub use http::HttpEthRpc; -pub use types::{BlockHash, ChainInclusion, ChainInclusionProof, Error, EthProofType, TxHash}; +pub use types::{ + BlockHash, ChainInclusion, ChainInclusionProof, ChainProofMetadata, Error, EthProofType, + Timestamp, TxHash, +}; diff --git a/event-svc/src/blockchain/eth_rpc/types.rs b/event-svc/src/blockchain/eth_rpc/types.rs index af4963752..9a4d384c8 100644 --- a/event-svc/src/blockchain/eth_rpc/types.rs +++ b/event-svc/src/blockchain/eth_rpc/types.rs @@ -92,13 +92,44 @@ impl FromStr for EthProofType { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// A timestamp that is able to provide seconds since the unix epoch +pub struct Timestamp(u64); + +impl Timestamp { + /// Create a timestamp from a unix epoch timestamp + pub const fn from_unix_ts(ts: u64) -> Self { + Self(ts) + } + + /// A unix epoch timestamp + pub fn as_unix_ts(&self) -> u64 { + self.0 + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] /// A proof of time derived from state on the blockchain pub struct ChainInclusionProof { /// The timestamp the proof was recorded - pub timestamp: u64, + pub timestamp: Timestamp, + /// The block hash in hex form with '0x' prefix + pub block_hash: String, /// The root CID of the proof pub root_cid: Cid, + /// The metadata about the proof and where it's stored + pub meta_data: ChainProofMetadata, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +/// Metadata about the proof and where it's stored +pub struct ChainProofMetadata { + /// Chain ID of the proof + pub chain_id: ssi::caip2::ChainId, + /// The transaction hash in hex form with '0x' prefix + pub tx_hash: String, + /// The transaction input in hex form with '0x' prefix + pub tx_input: String, } #[async_trait::async_trait] diff --git a/event-svc/src/blockchain/mod.rs b/event-svc/src/blockchain/mod.rs index 18385f4fd..5e35e113c 100644 --- a/event-svc/src/blockchain/mod.rs +++ b/event-svc/src/blockchain/mod.rs @@ -1,2 +1,12 @@ +use std::str::FromStr as _; + +use alloy::primitives::TxHash; +use ceramic_core::Cid; + /// The ethereum RPC provider module pub mod eth_rpc; + +/// Get the expected transaction hash for a given root CID (this is v1 proof type) +pub(crate) fn tx_hash_try_from_cid(cid: Cid) -> anyhow::Result { + Ok(TxHash::from_str(&hex::encode(cid.hash().digest()))?) +} diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index 9e0783e86..2a11be960 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -12,7 +12,7 @@ use super::{ }; use async_trait::async_trait; use ceramic_core::{EventId, Network, NodeId, SerializeExt}; -use ceramic_pipeline::{ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime}; +use ceramic_pipeline::{concluder::TimeProof, ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime}; use ceramic_sql::sqlite::SqlitePool; use cid::Cid; use futures::stream::BoxStream; @@ -21,8 +21,8 @@ use itertools::Itertools; use recon::ReconItem; use tracing::{trace, warn}; -use crate::event::validator::ChainInclusionProvider; -use crate::store::{EventAccess, EventInsertable, EventRowDelivered}; +use crate::store::{ChainProof, EventAccess, EventInsertable, EventRowDelivered}; +use crate::{blockchain::tx_hash_try_from_cid, event::validator::ChainInclusionProvider}; use crate::{Error, Result}; /// How many events to select at once to see if they've become deliverable when we have downtime @@ -269,6 +269,7 @@ impl EventService { valid, unvalidated, invalid, + proofs, } = self .event_validator .validate_events(validation_requirement, to_validate) @@ -279,6 +280,7 @@ impl EventService { valid, unvalidated, invalid: invalid_events, + proofs, }) } @@ -293,6 +295,7 @@ impl EventService { valid, unvalidated, mut invalid, + proofs, } = self.validate_events(items, validation_req.as_ref()).await?; let to_insert: Vec = valid @@ -307,6 +310,14 @@ impl EventService { self.track_pending(unvalidated); } + // Someday, we may want to have the validation/proof inclusion logic have knowledge of the database and persist/read + // from it directly, rather than only keeping proofs in memory + RPC calls. But for now, it's simpler to persist everything once here + // and then the pipeline is able to read from this table and use the timestamps for conclusion events etc. + let proofs = proofs.into_iter().map(|p| p.into()).collect::>(); + self.event_access + .persist_chain_inclusion_proofs(&proofs) + .await?; + let (new, existed) = self .persist_events(to_insert, deliverable_req, &mut invalid) .await?; @@ -391,16 +402,30 @@ impl EventService { match event { ceramic_event::unvalidated::Event::Time(time_event) => { + let proof = match self.discover_chain_proof(&time_event).await { + Ok(proof) => Some(proof), + Err(error) => { + tracing::warn!( + ?event_cid, + ?error, + "Failed to discover chain proof for time event" + ); + None + } + }; + Ok(ConclusionEvent::Time(ConclusionTime { event_cid, init, previous: vec![*time_event.prev()], order: delivered as u64, - before: todo!(), - chain_id: todo!(), - tx_hash: todo!(), - tx_type: todo!(), - root: todo!(), + time_proof: proof.map(|p| TimeProof { + before: p + .timestamp + .try_into() + .expect("conclusion timestamp overflow"), + chain_id: p.chain_id, + }), })) } ceramic_event::unvalidated::Event::Signed(signed_event) => { @@ -523,6 +548,40 @@ impl EventService { Ok(()) } } + + /// This is a helper function for migrations to get the chain proof for a given event from the database, + /// or to validate and store it if it doesn't exist. + async fn discover_chain_proof( + &self, + event: &ceramic_event::unvalidated::TimeEvent, + ) -> std::result::Result { + let tx_hash = event.proof().tx_hash(); + let tx_hash = tx_hash_try_from_cid(tx_hash).unwrap().to_string(); + let proof = self + .event_access + .get_chain_proof(event.proof().chain_id(), &tx_hash) + .await + .map_err(|e| crate::eth_rpc::Error::Application(e.into()))?; + + if let Some(proof) = proof { + return Ok(proof); + } + + // try using the RPC provider and store the proof afterward + let proof = self + .event_validator + .time_event_verifier() + .validate_chain_inclusion(event) + .await?; + + let proof = ChainProof::from(proof); + self.event_access + .persist_chain_inclusion_proofs(&[proof.clone()]) + .await + .map_err(|e| crate::eth_rpc::Error::Application(e.into()))?; + + Ok(proof) + } } // Small wrapper container around the data field to hold other mutable metadata for the @@ -579,6 +638,12 @@ pub enum ValidationError { key: EventId, reason: String, }, + /// 'Soft error' -> should not kill recon conversation but should not be persisted + /// A time event could not be validated because no RPC provider was available + SoftError { + key: EventId, + reason: String, + }, } #[derive(Debug, PartialEq, Eq, Default)] diff --git a/event-svc/src/event/validator/event.rs b/event-svc/src/event/validator/event.rs index a251d0029..32fc9090f 100644 --- a/event-svc/src/event/validator/event.rs +++ b/event-svc/src/event/validator/event.rs @@ -6,19 +6,19 @@ use ipld_core::ipld::Ipld; use recon::ReconItem; use tokio::try_join; -use crate::blockchain::eth_rpc; -use crate::event::validator::ChainInclusionProvider; -use crate::store::EventAccess; use crate::{ + blockchain::eth_rpc, + eth_rpc::ChainInclusionProof, event::{ service::{ValidationError, ValidationRequirement}, validator::{ grouped::{GroupedEvents, SignedValidationBatch, TimeValidationBatch}, signed::SignedEventValidator, time::TimeEventValidator, + ChainInclusionProvider, }, }, - store::EventInsertable, + store::{EventAccess, EventInsertable}, Result, }; @@ -31,6 +31,8 @@ pub struct ValidatedEvents { pub unvalidated: Vec, /// Events that failed validation pub invalid: Vec, + /// The proofs for the validated time events that can be stored for future time stamping + pub proofs: Vec, } #[derive(Debug)] @@ -107,6 +109,7 @@ impl ValidatedEvents { valid: Vec::with_capacity(valid), unvalidated: Vec::with_capacity(valid / 4), invalid: Vec::new(), + proofs: Vec::new(), } } @@ -141,6 +144,11 @@ impl EventValidator { }) } + /// Get the time event verifier + pub(crate) fn time_event_verifier(&self) -> &TimeEventValidator { + &self.time_event_verifier + } + /// Validates the events with the given validation requirement /// If the [`ValidationRequirement`] is None, it just returns every event as valid pub(crate) async fn validate_events( @@ -159,6 +167,7 @@ impl EventValidator { .collect(), unvalidated: Vec::new(), invalid: Vec::new(), + proofs: Vec::new(), }); }; @@ -211,8 +220,8 @@ impl EventValidator { .validate_chain_inclusion(time_event.as_time()) .await { - Ok(_t) => { - // TODO(AES-345): Someday, we will use `t.as_unix_ts()` and care about the actual timestamp, but for now we just consider it valid + Ok(t) => { + validated_events.proofs.push(t); validated_events.valid.push(time_event.into()); } Err(err) => { diff --git a/event-svc/src/event/validator/time.rs b/event-svc/src/event/validator/time.rs index a4eef1bba..342d3b299 100644 --- a/event-svc/src/event/validator/time.rs +++ b/event-svc/src/event/validator/time.rs @@ -5,18 +5,10 @@ use ceramic_core::ssi::caip2; use ceramic_event::unvalidated; use tracing::warn; -use crate::blockchain::eth_rpc::{self, ChainInclusion, HttpEthRpc}; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Timestamp(u64); - -impl Timestamp { - /// A unix epoch timestamp - #[allow(dead_code)] - pub fn as_unix_ts(&self) -> u64 { - self.0 - } -} +use crate::{ + blockchain::eth_rpc::{self, ChainInclusion, HttpEthRpc}, + eth_rpc::ChainInclusionProof, +}; /// Provider for validating chain inclusion of an AnchorProof on a remote blockchain. pub type ChainInclusionProvider = Arc; @@ -25,6 +17,7 @@ pub struct TimeEventValidator { /// we could support multiple providers for each chain (to get around rate limits) /// but we'll just force people to run a light client if they really need the throughput chain_providers: HashMap, + // add a sql connection / block table access } impl std::fmt::Debug for TimeEventValidator { @@ -83,7 +76,7 @@ impl TimeEventValidator { pub async fn validate_chain_inclusion( &self, event: &unvalidated::TimeEvent, - ) -> Result { + ) -> Result { let chain_id = caip2::ChainId::from_str(event.proof().chain_id()) .map_err(|e| eth_rpc::Error::InvalidArgument(format!("invalid chain ID: {}", e)))?; @@ -104,14 +97,17 @@ impl TimeEventValidator { ))); } - Ok(Timestamp(chain_proof.timestamp)) + Ok(chain_proof) } } #[cfg(test)] mod test { + use crate::{ + blockchain::eth_rpc, + eth_rpc::{ChainProofMetadata, Timestamp}, + }; use ceramic_event::unvalidated; - use ceramic_validation::eth_rpc; use cid::Cid; use ipld_core::ipld::Ipld; use mockall::{mock, predicate}; @@ -119,7 +115,7 @@ mod test { use super::*; - const BLOCK_TIMESTAMP: u64 = 1725913338; + const BLOCK_TIMESTAMP: Timestamp = Timestamp::from_unix_ts(1725913338); fn time_event_single_event_batch() -> unvalidated::TimeEvent { unvalidated::Builder::time() @@ -255,10 +251,13 @@ mod test { root_cid: Cid, ) -> TimeEventValidator { let mut mock_provider = MockEthRpcProviderTest::new(); - let chain = + let chain_id = caip2::ChainId::from_str("eip155:11155111").expect("eip155:11155111 is a valid chain"); - mock_provider.expect_chain_id().once().return_const(chain); + mock_provider + .expect_chain_id() + .once() + .return_const(chain_id.clone()); mock_provider .expect_get_chain_inclusion_proof() .once() @@ -267,6 +266,12 @@ mod test { Ok(eth_rpc::ChainInclusionProof { timestamp: BLOCK_TIMESTAMP, root_cid, + block_hash: "0x0".to_string(), + meta_data: ChainProofMetadata { + chain_id: chain_id, + tx_hash: "0x0".to_string(), + tx_input: "0x0".to_string(), + }, }) }); TimeEventValidator::new_with_providers(vec![Arc::new(mock_provider)]) @@ -278,8 +283,8 @@ mod test { let verifier = get_mock_provider(event.proof().clone(), event.proof().root()).await; match verifier.validate_chain_inclusion(&event).await { - Ok(ts) => { - assert_eq!(ts.as_unix_ts(), BLOCK_TIMESTAMP); + Ok(proof) => { + assert_eq!(proof.timestamp, BLOCK_TIMESTAMP); } Err(e) => panic!("should have passed: {:?}", e), } @@ -314,7 +319,7 @@ mod test { match verifier.validate_chain_inclusion(&event).await { Ok(ts) => { - assert_eq!(ts.as_unix_ts(), BLOCK_TIMESTAMP); + assert_eq!(ts.timestamp, BLOCK_TIMESTAMP); } Err(e) => panic!("should have passed: {:?}", e), } diff --git a/event-svc/src/store/mod.rs b/event-svc/src/store/mod.rs index fd68d407e..8331cb5a7 100644 --- a/event-svc/src/store/mod.rs +++ b/event-svc/src/store/mod.rs @@ -5,7 +5,7 @@ mod sql; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ - entities::{BlockHash, EventBlockRaw, EventInsertable}, + entities::{BlockHash, ChainProof, EventBlockRaw, EventInsertable}, BlockAccess, Error, EventAccess, EventBlockAccess, EventRowDelivered, InsertResult, InsertedEvent, Result, SqlitePool, SqliteRootStore, }; diff --git a/event-svc/src/store/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs index 28d51ae08..6458c2dfc 100644 --- a/event-svc/src/store/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -15,10 +15,10 @@ use recon::{AssociativeHash, HashCount, Key, Sha256a}; use crate::store::{ sql::{ entities::{ - rebuild_car, BlockRow, CountRow, EventInsertable, OrderKey, ReconEventBlockRaw, - ReconHash, + rebuild_car, BlockRow, ChainProof, CountRow, EventInsertable, OrderKey, + ReconEventBlockRaw, ReconHash, }, - query::{EventQuery, ReconQuery, SqlBackend}, + query::{ChainProofQuery, EventQuery, ReconQuery, SqlBackend}, }, BlockAccess, Error, EventBlockAccess, Result, }; @@ -573,4 +573,41 @@ impl EventAccess { .collect::>>()?; Ok(rows) } + + /// Persist chain inclusion proofs + pub async fn persist_chain_inclusion_proofs(&self, proofs: &[ChainProof]) -> Result<()> { + let mut tx = self.pool.writer().begin().await?; + for proof in proofs { + sqlx::query(ChainProofQuery::upsert_timestamp()) + .bind(&proof.chain_id) + .bind(&proof.block_hash) + .bind(&proof.timestamp) + .execute(&mut *tx) + .await?; + + sqlx::query(ChainProofQuery::upsert_proof()) + .bind(&proof.chain_id) + .bind(&proof.block_hash) + .bind(&proof.transaction_hash) + .bind(&proof.transaction_input) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + + /// Get chain inclusion proof for a transaction hash + pub async fn get_chain_proof( + &self, + chain_id: &str, + tx_hash: &str, + ) -> Result> { + let row: Option = sqlx::query_as(ChainProofQuery::by_chain_id_and_tx_hash()) + .bind(chain_id) + .bind(tx_hash) + .fetch_optional(self.pool.reader()) + .await?; + Ok(row) + } } diff --git a/event-svc/src/store/sql/entities/chain_proof.rs b/event-svc/src/store/sql/entities/chain_proof.rs new file mode 100644 index 000000000..2914f459a --- /dev/null +++ b/event-svc/src/store/sql/entities/chain_proof.rs @@ -0,0 +1,33 @@ +use crate::eth_rpc; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::FromRow)] +/// A chain inclusion proof stored in the database +/// Unique by chain_id and transaction_hash +pub struct ChainProof { + /// The chain ID of the proof + pub chain_id: String, + /// The transaction hash of the proof + pub transaction_hash: String, + /// The transaction input of the proof + pub transaction_input: String, + /// The block hash of the proof + pub block_hash: String, + /// The timestamp of the proof + pub timestamp: i64, +} + +impl From for ChainProof { + fn from(value: eth_rpc::ChainInclusionProof) -> Self { + Self { + chain_id: value.meta_data.chain_id.to_string(), + block_hash: value.block_hash, + transaction_hash: value.meta_data.tx_hash, + transaction_input: value.meta_data.tx_input, + timestamp: value + .timestamp + .as_unix_ts() + .try_into() + .expect("chain proof timestamp overflow"), + } + } +} diff --git a/event-svc/src/store/sql/entities/mod.rs b/event-svc/src/store/sql/entities/mod.rs index 059cee3b0..cc2a9a5f3 100644 --- a/event-svc/src/store/sql/entities/mod.rs +++ b/event-svc/src/store/sql/entities/mod.rs @@ -1,10 +1,12 @@ mod block; +mod chain_proof; mod event; mod event_block; mod hash; mod utils; pub use block::{BlockBytes, BlockRow}; +pub use chain_proof::ChainProof; pub use event::{rebuild_car, EventInsertable}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; pub use hash::{BlockHash, ReconHash}; diff --git a/event-svc/src/store/sql/query.rs b/event-svc/src/store/sql/query.rs index fddda2afb..faeae0237 100644 --- a/event-svc/src/store/sql/query.rs +++ b/event-svc/src/store/sql/query.rs @@ -274,3 +274,68 @@ impl ReconQuery { } } } + +/// Represnts access to the ceramic_one_chain_proof and ceramic_one_chain_timestamp tables +/// In the future, transactions could be stored without block information, and retried until we discover something. +/// For now, we only store the transaction when we discover the block information and timestamp. +pub struct ChainProofQuery; + +impl ChainProofQuery { + /// Requires binding 3 parameters + /// $1: chain_id + /// $2: block_hash + /// $3: timestamp + pub fn upsert_timestamp() -> &'static str { + r#" + INSERT INTO ceramic_one_chain_timestamp ( + chain_id, block_hash, timestamp + ) VALUES ( + $1, $2, $3 + ) ON CONFLICT DO UPDATE SET timestamp = $3; + "# + } + + /// Requires binding 4 parameters + /// $1: chain_id + /// $2: block_hash + /// $3: transaction_hash + /// $4: transaction_input + pub fn upsert_proof() -> &'static str { + r#" + INSERT INTO ceramic_one_chain_proof ( + chain_id, block_hash, transaction_hash, transaction_input + ) VALUES ( + $1, $2, $3, $4 + ) ON CONFLICT DO UPDATE SET block_hash = $2; + "# + } + + /// Requires binding 2 parameters + /// $1: chain_id + /// $2: transaction_hash + pub fn by_chain_id_and_tx_hash() -> &'static str { + r#" + SELECT + p.chain_id, + p.transaction_hash, + p.transaction_input, + p.block_hash, + COALESCE( + (SELECT t.timestamp + FROM ceramic_one_chain_timestamp t + WHERE t.chain_id = p.chain_id + AND t.block_hash = p.block_hash), + NULL + ) as timestamp + FROM ceramic_one_chain_proof p + WHERE p.chain_id = $1 + AND p.transaction_hash = $2 + AND EXISTS ( + SELECT 1 + FROM ceramic_one_chain_timestamp t + WHERE t.chain_id = p.chain_id + AND t.block_hash = p.block_hash + ) + "# + } +} diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index f7ebbdf60..d77f75c54 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -661,11 +661,70 @@ where ); } +mockall::mock! { + #[derive(Debug)] + pub ChainInclusion {} + #[async_trait::async_trait] + impl crate::eth_rpc::ChainInclusion for ChainInclusion { + fn chain_id(&self) -> &ssi::caip2::ChainId; + async fn get_chain_inclusion_proof( + &self, + input: &unvalidated::AnchorProof, + ) -> Result; + } +} + +fn get_mock_chain_provider( + chain_id: &str, + inputs: Vec, +) -> Arc { + let mut mock_provider = MockChainInclusion::new(); + let chain_id = ssi::caip2::ChainId::from_str(chain_id).expect("valid chain"); + mock_provider + .expect_chain_id() + .once() + .return_const(chain_id.clone()); + for input in inputs { + let root_cid = input.root(); + let chain_id = chain_id.clone(); + mock_provider + .expect_get_chain_inclusion_proof() + .times(2) + .with(mockall::predicate::eq(input)) + .returning(move |p| { + Ok(crate::eth_rpc::ChainInclusionProof { + timestamp: crate::eth_rpc::Timestamp::from_unix_ts(1744383131980), + root_cid, + block_hash: format!("0xblock_hash{}", p.tx_hash().to_string()), + meta_data: crate::eth_rpc::ChainProofMetadata { + chain_id: chain_id.clone(), + tx_hash: p.tx_hash().to_string(), + tx_input: format!("0x{}{}", p.tx_type(), p.tx_hash().to_string()), + }, + }) + }); + } + Arc::new(mock_provider) +} + #[test(tokio::test)] async fn test_conclusion_events_since() -> Result<(), Box> { let pool = SqlitePool::connect_in_memory().await?; - let service = EventService::try_new(pool, UndeliveredEventReview::Skip, false, vec![]).await?; + let test_events = generate_chained_events().await; + let proofs: Vec = test_events + .iter() + .filter_map(|(_, e)| match e { + unvalidated::Event::Time(time_event) => Some(time_event.proof().clone()), + _ => None, + }) + .collect(); + let chain_provider = get_mock_chain_provider("test:chain", proofs); + + let providers = vec![chain_provider]; + + let service = + EventService::try_new(pool, UndeliveredEventReview::Skip, false, providers).await?; ceramic_api::EventService::insert_many( &service, diff --git a/flight/tests/server.rs b/flight/tests/server.rs index deb84a857..096e791bb 100644 --- a/flight/tests/server.rs +++ b/flight/tests/server.rs @@ -191,6 +191,7 @@ fn events(start_index: u64, highwater_mark: u64, limit: usize) -> Vec, @@ -117,9 +118,10 @@ pub struct FromIpfsOpts { #[clap(long, env = "CERAMIC_ONE_VALIDATE_SIGNATURES")] validate_signatures: bool, - /// Whether to validate the chain of time events. + /// Whether to validate the chain of time events matches the expected CAIP2 chain id. /// Events with invalid chains will be skipped and counted as errors. - #[clap(long, env = "CERAMIC_ONE_VALIDATE_CHAINS")] + /// For example, on mainnet, events are expected to have a chain id of 'eip155:1'. + #[clap(long, env = "CERAMIC_ONE_VALIDATE_CHAIN")] validate_chain: bool, } @@ -195,17 +197,29 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { .collect::>, _>>()?, opts.validate_signatures, opts.validate_chain - .then(|| { - opts.network - .supported_chain_ids() - .map(|chains| chains.into_iter().map(|chain| chain.to_string()).collect()) - }) + .then(|| allowed_chains(opts.network)) .flatten(), ) .await?; Ok(()) } +/// Returns the allowed chain ids (historically) for the given network. +/// [`Network::supported_chain_ids`] is for current chains +fn allowed_chains(network: Network) -> Option> { + match network { + Network::Mainnet => Some(vec!["eip155:1".to_string()]), + Network::TestnetClay => Some(vec![ + "eip155:3".to_string(), + "eip155:5".to_string(), + "eip155:100".to_string(), + ]), + Network::DevUnstable => Some(vec!["eip155:11155111".to_string()]), + Network::Local => None, + Network::InMemory => None, + } +} + struct FSBlockStore { input_ipfs_path: PathBuf, sharded_paths: bool, diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index ddec2fd5f..950e3e868 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -1187,9 +1187,11 @@ mod tests { use validation::model::{ModelAccountRelationV2, ModelDefinition}; use crate::{ - cid_string::cid_string, concluder::mock::MockConcluder, conclusion_events_to_record_batch, - pipeline_ctx, tests::TestContext, ConclusionData, ConclusionEvent, ConclusionInit, - ConclusionTime, + cid_string::cid_string, + concluder::{mock::MockConcluder, TimeProof}, + conclusion_events_to_record_batch, pipeline_ctx, + tests::TestContext, + ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime, }; async fn init() -> anyhow::Result> { @@ -1393,6 +1395,10 @@ mod tests { "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", ) .unwrap()], + time_proof: Some(TimeProof { + before: 0, + chain_id: String::default(), + }), }), ConclusionEvent::Data(ConclusionData { order: 3, diff --git a/pipeline/src/concluder/event.rs b/pipeline/src/concluder/event.rs index 5e969ef91..2e5054931 100644 --- a/pipeline/src/concluder/event.rs +++ b/pipeline/src/concluder/event.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use arrow::datatypes::{DataType, Field, Int32Type}; -use ceramic_core::ssi::caip2; use ceramic_core::METAMODEL_STREAM_ID; use ceramic_event::{unvalidated, StreamId, StreamIdType}; use cid::Cid; @@ -124,21 +123,17 @@ pub struct ConclusionTime { // // How do we populate that table for existing data? // Likely need some kind of explicit migration command that can be run to populate the table. - // + /// Proof of time event anchoring + pub time_proof: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +/// Time event proof has been validated and resulted in a proof of time event anchoring +pub struct TimeProof { /// It is known that the time event existed before this time as a Unix timestamp in seconds. pub before: u64, - - // Only before should be needed for conflict resolution but it may be nice to preserve these - // other values as well? - // /// Chain ID where this time event was anchored. pub chain_id: String, - /// Transaction hash that anchored this time event. - pub tx_hash: String, - /// Transaction type (TODO: not sure what this is). - pub tx_type: String, - /// Root cid of the proof. - pub root: Cid, } impl<'a> TryFrom<&'a unvalidated::Event> for ConclusionInit { @@ -491,6 +486,10 @@ mod tests { ], }, order: 2, + time_proof: Some(TimeProof { + before: 0, + chain_id: String::default(), + }), }), ConclusionEvent::Data(ConclusionData { event_cid: Cid::from_str( diff --git a/pipeline/src/concluder/mod.rs b/pipeline/src/concluder/mod.rs index f82c62f7b..0b98a9dc1 100644 --- a/pipeline/src/concluder/mod.rs +++ b/pipeline/src/concluder/mod.rs @@ -42,7 +42,7 @@ use crate::{ pub use crate::since::SubscribeSinceMsg; pub use event::{ conclusion_events_to_record_batch, ConclusionData, ConclusionEvent, ConclusionInit, - ConclusionTime, + ConclusionTime, TimeProof, }; pub use table::ConclusionFeed; @@ -363,9 +363,7 @@ mod tests { use test_log::test; use crate::{ - pipeline_ctx, - tests::{MockConclusionFeed, TestContext}, - ConclusionData, ConclusionFeedSource, ConclusionInit, ConclusionTime, Metrics, + concluder::TimeProof, pipeline_ctx, tests::{MockConclusionFeed, TestContext}, ConclusionData, ConclusionFeedSource, ConclusionInit, ConclusionTime, Metrics }; async fn init(feed: MockConclusionFeed) -> anyhow::Result> { @@ -448,6 +446,10 @@ mod tests { "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", ) .unwrap()], + time_proof: Some(TimeProof { + before: 0, + chain_id: String::default(), + }), })]) }); mock_feed diff --git a/pipeline/src/since/feed.rs b/pipeline/src/since/feed.rs index 92c1255b3..90d5b0c9d 100644 --- a/pipeline/src/since/feed.rs +++ b/pipeline/src/since/feed.rs @@ -44,8 +44,6 @@ pub trait FeedTableSource: Clone + std::fmt::Debug + Sync + Send + 'static { } /// A table that when queried produces an unbounded stream of data. -/// It is assumed that the table contains an "conclusion_event_order" column and new data arrives in increasing -/// "conclusion_event_order" order. #[derive(Debug)] pub struct FeedTable { source: S, From 83a88b28fe0ee01671e32a4994dcbbc643299d45 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi Date: Sun, 11 May 2025 09:45:27 -0400 Subject: [PATCH 06/19] feat: validate time events and propagate proofs through aggregator --- event-svc/src/event/service.rs | 32 ++++-------- event-svc/src/event/validator/event.rs | 42 ++++++++-------- event-svc/src/event/validator/grouped.rs | 17 +++++-- one/src/daemon.rs | 58 ++-------------------- one/src/lib.rs | 62 +++++++++++++++++++++--- one/src/migrations.rs | 16 +++++- pipeline/src/aggregator/mod.rs | 28 +++++++++-- pipeline/src/concluder/event.rs | 33 +++++++------ pipeline/src/concluder/mod.rs | 9 ++-- pipeline/src/resolver/mod.rs | 12 ++--- pipeline/src/schemas.rs | 8 ++- 11 files changed, 179 insertions(+), 138 deletions(-) diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index 2a11be960..630b76354 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -12,7 +12,9 @@ use super::{ }; use async_trait::async_trait; use ceramic_core::{EventId, Network, NodeId, SerializeExt}; -use ceramic_pipeline::{concluder::TimeProof, ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime}; +use ceramic_pipeline::{ + concluder::TimeProof, ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime, +}; use ceramic_sql::sqlite::SqlitePool; use cid::Cid; use futures::stream::BoxStream; @@ -402,30 +404,22 @@ impl EventService { match event { ceramic_event::unvalidated::Event::Time(time_event) => { - let proof = match self.discover_chain_proof(&time_event).await { - Ok(proof) => Some(proof), - Err(error) => { - tracing::warn!( - ?event_cid, - ?error, - "Failed to discover chain proof for time event" - ); - None - } - }; + let proof = self.discover_chain_proof(&time_event).await.map_err(|e| { + Error::new_app(anyhow::anyhow!("Failed to discover chain proof: {:?}", e)) + })?; Ok(ConclusionEvent::Time(ConclusionTime { event_cid, init, previous: vec![*time_event.prev()], order: delivered as u64, - time_proof: proof.map(|p| TimeProof { - before: p + time_proof: TimeProof { + before: proof .timestamp .try_into() .expect("conclusion timestamp overflow"), - chain_id: p.chain_id, - }), + chain_id: proof.chain_id, + }, })) } ceramic_event::unvalidated::Event::Signed(signed_event) => { @@ -638,12 +632,6 @@ pub enum ValidationError { key: EventId, reason: String, }, - /// 'Soft error' -> should not kill recon conversation but should not be persisted - /// A time event could not be validated because no RPC provider was available - SoftError { - key: EventId, - reason: String, - }, } #[derive(Debug, PartialEq, Eq, Default)] diff --git a/event-svc/src/event/validator/event.rs b/event-svc/src/event/validator/event.rs index 32fc9090f..0802cc520 100644 --- a/event-svc/src/event/validator/event.rs +++ b/event-svc/src/event/validator/event.rs @@ -4,7 +4,6 @@ use ceramic_core::{Cid, EventId, NodeId}; use ceramic_event::unvalidated; use ipld_core::ipld::Ipld; use recon::ReconItem; -use tokio::try_join; use crate::{ blockchain::eth_rpc, @@ -117,6 +116,7 @@ impl ValidatedEvents { self.valid.extend(other.valid); self.invalid.extend(other.invalid); self.unvalidated.extend(other.unvalidated); + self.proofs.extend(other.proofs); } } @@ -150,38 +150,34 @@ impl EventValidator { } /// Validates the events with the given validation requirement - /// If the [`ValidationRequirement`] is None, it just returns every event as valid + /// Regardless of the validation requirement, time events are always validated. + /// If the [`ValidationRequirement`] is None, it just returns every data event as valid. pub(crate) async fn validate_events( &self, validation_req: Option<&ValidationRequirement>, parsed_events: Vec, ) -> Result { - let validation_req = if let Some(req) = validation_req { - req - } else { - // we don't validate so we just return done - return Ok(ValidatedEvents { - valid: parsed_events - .into_iter() - .map(ValidatedEvent::from_unvalidated_unchecked) - .collect(), - unvalidated: Vec::new(), - invalid: Vec::new(), - proofs: Vec::new(), - }); - }; - let mut validated = ValidatedEvents::new_with_expected_valid(parsed_events.len()); - // partition the events by type of validation needed and delegate to validators + + // Partition the events by type of validation needed and delegate to validators let grouped = GroupedEvents::from(parsed_events); - let (validated_signed, validated_time) = try_join!( - self.validate_signed_events(grouped.signed_batch, validation_req), - self.validate_time_events(grouped.time_batch) - )?; - validated.extend_with(validated_signed); + // Time events are always validated + let validated_time = self.validate_time_events(grouped.time_batch).await?; validated.extend_with(validated_time); + if let Some(req) = validation_req { + let validated_signed = self + .validate_signed_events(grouped.signed_batch, req) + .await?; + validated.extend_with(validated_signed); + } else { + // Return all data events as valid + validated + .valid + .extend(Vec::::from(grouped.signed_batch)); + }; + if !validated.invalid.is_empty() { tracing::warn!(count=%validated.invalid.len(), "invalid events discovered"); } diff --git a/event-svc/src/event/validator/grouped.rs b/event-svc/src/event/validator/grouped.rs index 846adfd20..7618f456f 100644 --- a/event-svc/src/event/validator/grouped.rs +++ b/event-svc/src/event/validator/grouped.rs @@ -104,11 +104,22 @@ pub struct TimeValidationBatch(pub(crate) Vec