diff --git a/actor/src/lib.rs b/actor/src/lib.rs index 8c69663cb..31c228a8b 100644 --- a/actor/src/lib.rs +++ b/actor/src/lib.rs @@ -61,6 +61,16 @@ pub trait Actor { } } +/// Optional trait for actors that need to perform cleanup operations during shutdown. +/// Only actors that need cleanup should implement this trait. +#[async_trait] +pub trait Shutdown { + /// Called when the actor is shutting down to allow for cleanup operations. + /// This method is called after all pending messages have been processed + /// but before the actor loop exits. + async fn shutdown(&mut self); +} + /// Wrapper of any T with its tracing span context. #[derive(Debug)] pub struct Traced { diff --git a/actor/src/macros.rs b/actor/src/macros.rs index e5a68f8b8..eb706b4e3 100644 --- a/actor/src/macros.rs +++ b/actor/src/macros.rs @@ -1,8 +1,34 @@ /// Constructs an envelope enumeration that contains all messages for an actor. /// +/// # Usage +/// +/// Basic usage (no shutdown support): +/// ```ignore +/// actor_envelope! { +/// MyEnvelope, +/// MyActor, +/// MyRecorder, +/// Message1 => Msg1Type, +/// Message2 => Msg2Type, +/// } +/// ``` +/// +/// With shutdown support (actor must implement `Shutdown` trait): +/// ```ignore +/// actor_envelope! { +/// MyEnvelope, +/// MyActor, +/// MyRecorder, +/// with_shutdown, +/// Message1 => Msg1Type, +/// Message2 => Msg2Type, +/// } +/// ``` +/// /// The first identifier is the name of the enum. /// The second identifier is the name of a trait specific to the actor. /// The third identifier is the name of a trait for recording message events. +/// Optional `with_shutdown` parameter adds shutdown support for actors that implement the Shutdown trait. /// The remaining pairs are the variants of the envelope indicating the messages the actor handles. /// /// The constructed actor trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait. @@ -10,10 +36,40 @@ /// The constructed recorder trait is a union of the [`ceramic_metrics::Recorder`] traits for each message. #[macro_export] macro_rules! actor_envelope { + // Version with shutdown support + ( + $enum_name:ident, + $actor_trait:ident, + $recorder_trait:ident, + with_shutdown, + $( + $variant_name:ident => $message_type:ty, + )* + ) => { + $crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, with_shutdown, $($variant_name => $message_type,)*); + }; + + // Default version without shutdown + ( + $enum_name:ident, + $actor_trait:ident, + $recorder_trait:ident, + $( + $variant_name:ident => $message_type:ty, + )* + ) => { + $crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, without_shutdown, $($variant_name => $message_type,)*); + }; +} + +/// Internal implementation macro that handles both with/without shutdown cases +#[macro_export] +macro_rules! actor_envelope_impl { ( $enum_name:ident, $actor_trait:ident, $recorder_trait:ident, + $shutdown_mode:ident, $( $variant_name:ident => $message_type:ty, )* @@ -51,11 +107,7 @@ macro_rules! actor_envelope { } } } - #[doc = std::stringify!($actor_trait)] - #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] - #[doc = stringify!($enum_name)] - #[doc = "."] - pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { } + $crate::actor_trait_def!($actor_trait, $enum_name, $shutdown_mode, $($message_type,)*); #[doc = std::stringify!($recorder_trait)] #[doc = " is an [`ceramic_metrics::Recorder`] for each message type in the actor envelope "] @@ -64,6 +116,7 @@ macro_rules! actor_envelope { pub trait $recorder_trait : $(::ceramic_metrics::Recorder<$crate::MessageEvent<$message_type>> +)* ::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static { } + impl $enum_name { /// Runs the actor handling messages as they arrive. pub async fn run(mut actor: A, mut receiver: $crate::Receiver, mut shutdown: impl ::std::future::Future + ::std::marker::Send + 'static) @@ -111,6 +164,7 @@ macro_rules! actor_envelope { } } } + $crate::actor_shutdown_call!($shutdown_mode, actor); } } $( @@ -132,3 +186,33 @@ macro_rules! actor_envelope { )* }; } + +/// Helper macro to define actor trait with or without shutdown requirement +#[macro_export] +macro_rules! actor_trait_def { + ($actor_trait:ident, $enum_name:ident, with_shutdown, $($message_type:ty,)*) => { + #[doc = std::stringify!($actor_trait)] + #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] + #[doc = stringify!($enum_name)] + #[doc = ". This actor must also implement [`ceramic_actor::Shutdown`]."] + pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + $crate::Shutdown + ::std::marker::Send + 'static { } + }; + ($actor_trait:ident, $enum_name:ident, without_shutdown, $($message_type:ty,)*) => { + #[doc = std::stringify!($actor_trait)] + #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] + #[doc = stringify!($enum_name)] + #[doc = "."] + pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { } + }; +} + +/// Helper macro to conditionally call shutdown +#[macro_export] +macro_rules! actor_shutdown_call { + (with_shutdown, $actor:ident) => { + $actor.shutdown().await; + }; + (without_shutdown, $actor:ident) => { + // No shutdown call for actors that don't implement shutdown + }; +} diff --git a/pipeline/src/aggregator/mock.rs b/pipeline/src/aggregator/mock.rs index e18f5b851..c73159c69 100644 --- a/pipeline/src/aggregator/mock.rs +++ b/pipeline/src/aggregator/mock.rs @@ -1,6 +1,6 @@ //! Provides a mock implmentation of the aggregator actor. use async_trait::async_trait; -use ceramic_actor::{Actor, Handler, Message}; +use ceramic_actor::{Actor, Handler, Message, Shutdown}; use mockall::mock; use prometheus_client::registry::Registry; @@ -69,6 +69,13 @@ impl Actor for MockAggregator { } impl AggregatorActor for MockAggregator {} +#[async_trait] +impl Shutdown for MockAggregator { + async fn shutdown(&mut self) { + // Mock implementation - no cleanup needed + } +} + impl MockAggregator { /// Spawn a mock aggregator actor. pub fn spawn(mock_actor: MockAggregator) -> AggregatorHandle { diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index f8a0d7dfd..d3d11510f 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -35,7 +35,7 @@ use arrow::{ }; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; -use ceramic_actor::{actor_envelope, Actor, Handler, Message}; +use ceramic_actor::{actor_envelope, Actor, Handler, Message, Shutdown as ActorShutdown}; use ceramic_core::{StreamId, StreamIdType}; use cid::Cid; use datafusion::{ @@ -67,7 +67,7 @@ use model_instance_patch::ModelInstancePatch; use model_patch::ModelPatch; use shutdown::{Shutdown, ShutdownSignal}; use tokio::{select, sync::broadcast, task::JoinHandle}; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, info, instrument}; use crate::{ cache_table::CacheTable, @@ -802,38 +802,47 @@ impl Aggregator { .await .context("writing to mem table data")?; - let count = self - .ctx - .table(EVENT_STATES_MEM_TABLE) - .await? - .count() - .await - .context("count mem table")?; + let count = self.count_cache().await?; // If we have enough data cached in memory write it out to persistent store if count >= self.max_cached_rows { - self.ctx - .table(EVENT_STATES_MEM_TABLE) - .await? - .write_table( - EVENT_STATES_PERSISTENT_TABLE, - DataFrameWriteOptions::new() - .with_partition_by(vec!["event_cid_partition".to_owned()]), - ) - .await - .context("writing to persistent table")?; - // Clear all data in the memory batch, by writing an empty batch - self.ctx - .read_batch(RecordBatch::new_empty(schemas::event_states_partitioned())) - .context("reading empty batch")? - .write_table( - EVENT_STATES_MEM_TABLE, - DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite), - ) - .await - .context("clearing mem table")?; + self.flush_cache().await?; } ordered.collect().await.context("collecting ordered events") } + + async fn count_cache(&self) -> Result { + self.ctx + .table(EVENT_STATES_MEM_TABLE) + .await? + .count() + .await + .context("count mem table") + } + + async fn flush_cache(&self) -> Result { + let cnt = self.count_cache().await?; + self.ctx + .table(EVENT_STATES_MEM_TABLE) + .await? + .write_table( + EVENT_STATES_PERSISTENT_TABLE, + DataFrameWriteOptions::new() + .with_partition_by(vec!["event_cid_partition".to_owned()]), + ) + .await + .context("writing to persistent table")?; + // Clear all data in the memory batch, by writing an empty batch + self.ctx + .read_batch(RecordBatch::new_empty(schemas::event_states_partitioned())) + .context("reading empty batch")? + .write_table( + EVENT_STATES_MEM_TABLE, + DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite), + ) + .await + .context("clearing mem table")?; + Ok(cnt) + } } // Construct a column for the field in the UNNAMED_TABLE. @@ -888,6 +897,7 @@ actor_envelope! { AggregatorEnvelope, AggregatorActor, AggregatorRecorder, + with_shutdown, SubscribeSince => SubscribeSinceMsg, NewConclusionEvents => NewConclusionEventsMsg, // TODO: Remove this message and use the analogous message on the Resolver. @@ -895,6 +905,25 @@ actor_envelope! { StreamState => StreamStateMsg, } +#[async_trait] +impl ActorShutdown for Aggregator { + async fn shutdown(&mut self) { + info!("Aggregator shutdown: flushing cached data to persistent storage..."); + + match self.flush_cache().await { + Ok(rows) => { + debug!( + "Aggregator shutdown: successfully flushed {} rows to persistent storage", + rows + ); + } + Err(e) => { + error!("Aggregator shutdown: failed to flush cached data: {}", e); + } + } + } +} + #[async_trait] impl Handler for Aggregator { async fn handle( @@ -3265,4 +3294,213 @@ mod tests { | 1 | baeabeibrydyefwuzizf6s32lbuu27rpjibrbtsziqukscbnuo3y52pgmha | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeibrydyefwuzizf6s32lbuu27rpjibrbtsziqukscbnuo3y52pgmha | 0 | 0 | {"content":{"accountRelation":{"fields":[],"type":"set"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{}} | [Account relation of type Set must include at least one field] | | | +-------------------+-------------------------------------------------------------+-------------+-------------+-----------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------+--------+----------+"#]].assert_eq(&event_states.to_string()); } + + #[tokio::test] + async fn cache_flush_on_shutdown() { + use object_store::memory::InMemory; + let object_store = Arc::new(InMemory::new()); + + // Create test events (just 2 events - will stay in cache with threshold of 5) + let events = &model_and_mids_events()[0..2]; + let conclusion_events = conclusion_events_to_record_batch(events).unwrap(); + + let phase1_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store( + mock_concluder, + object_store.clone(), + Some(5), // Cache threshold of 5 - our 2 events will stay in memory + ) + .await + .unwrap(); + + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events.clone()), + ) + .await + .unwrap(); + + ctx.shutdown().await.unwrap(); + result + }; + + // Phase 2: Restart aggregator and query for existing data + let phase2_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store(mock_concluder, object_store.clone(), Some(5)) + .await + .unwrap(); + + // Query for existing data using direct subscription - finds nothing due to cache loss bug + let mut subscription = ctx + .actor_handle + .send(SubscribeSinceMsg { + projection: None, + filters: None, + limit: None, + }) + .await + .unwrap() + .unwrap(); + + // DO NOT send new events - we want to see what persisted from Phase 1 + // Wait briefly for any persisted data, then timeout + let result = tokio::time::timeout( + std::time::Duration::from_millis(100), + subscription.try_next(), + ) + .await; + + let event_states = match result { + Ok(Ok(Some(batch))) => pretty_event_states(vec![batch]).await.unwrap().to_string(), + _ => "No persisted data found".to_string(), + }; + + ctx.shutdown().await.unwrap(); + event_states + }; + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+"# + ]].assert_eq(&phase1_result.to_string()); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+"# + ]].assert_eq(&phase2_result.to_string()); + } + + #[tokio::test] + async fn cache_flush_on_threshold() { + use object_store::memory::InMemory; + let object_store = Arc::new(InMemory::new()); + + // Create test events - we'll use 4 events with threshold of 3 + let events = &model_and_mids_events()[0..4]; // Get first 4 events + let conclusion_events = conclusion_events_to_record_batch(events).unwrap(); + + // Phase 1: Process events that will exceed cache threshold + let phase1_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store( + mock_concluder, + object_store.clone(), + Some(3), // Cache threshold of 3 - our 4 events should trigger flush + ) + .await + .unwrap(); + + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events.clone()), + ) + .await + .unwrap(); + + ctx.shutdown().await.unwrap(); + result + }; + + // Phase 2: Restart aggregator and query for persisted data + let phase2_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store(mock_concluder, object_store.clone(), Some(3)) + .await + .unwrap(); + + // Query for existing data using direct subscription - should find persisted data + let mut subscription = ctx + .actor_handle + .send(SubscribeSinceMsg { + projection: None, + filters: None, + limit: None, + }) + .await + .unwrap() + .unwrap(); + + // DO NOT send new events - we want to see what persisted from Phase 1 + // The data should be available immediately since it was flushed to persistent storage + let batch = subscription.try_next().await.unwrap().unwrap(); + let event_states = pretty_event_states(vec![batch]).await.unwrap(); + + ctx.shutdown().await.unwrap(); + event_states + }; + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&phase1_result.to_string()); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&phase2_result.to_string()); + } }