diff --git a/actor/src/lib.rs b/actor/src/lib.rs index 8c69663cb..31c228a8b 100644 --- a/actor/src/lib.rs +++ b/actor/src/lib.rs @@ -61,6 +61,16 @@ pub trait Actor { } } +/// Optional trait for actors that need to perform cleanup operations during shutdown. +/// Only actors that need cleanup should implement this trait. +#[async_trait] +pub trait Shutdown { + /// Called when the actor is shutting down to allow for cleanup operations. + /// This method is called after all pending messages have been processed + /// but before the actor loop exits. + async fn shutdown(&mut self); +} + /// Wrapper of any T with its tracing span context. #[derive(Debug)] pub struct Traced { diff --git a/actor/src/macros.rs b/actor/src/macros.rs index e5a68f8b8..eb706b4e3 100644 --- a/actor/src/macros.rs +++ b/actor/src/macros.rs @@ -1,8 +1,34 @@ /// Constructs an envelope enumeration that contains all messages for an actor. /// +/// # Usage +/// +/// Basic usage (no shutdown support): +/// ```ignore +/// actor_envelope! { +/// MyEnvelope, +/// MyActor, +/// MyRecorder, +/// Message1 => Msg1Type, +/// Message2 => Msg2Type, +/// } +/// ``` +/// +/// With shutdown support (actor must implement `Shutdown` trait): +/// ```ignore +/// actor_envelope! { +/// MyEnvelope, +/// MyActor, +/// MyRecorder, +/// with_shutdown, +/// Message1 => Msg1Type, +/// Message2 => Msg2Type, +/// } +/// ``` +/// /// The first identifier is the name of the enum. /// The second identifier is the name of a trait specific to the actor. /// The third identifier is the name of a trait for recording message events. +/// Optional `with_shutdown` parameter adds shutdown support for actors that implement the Shutdown trait. /// The remaining pairs are the variants of the envelope indicating the messages the actor handles. /// /// The constructed actor trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait. @@ -10,10 +36,40 @@ /// The constructed recorder trait is a union of the [`ceramic_metrics::Recorder`] traits for each message. #[macro_export] macro_rules! actor_envelope { + // Version with shutdown support + ( + $enum_name:ident, + $actor_trait:ident, + $recorder_trait:ident, + with_shutdown, + $( + $variant_name:ident => $message_type:ty, + )* + ) => { + $crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, with_shutdown, $($variant_name => $message_type,)*); + }; + + // Default version without shutdown + ( + $enum_name:ident, + $actor_trait:ident, + $recorder_trait:ident, + $( + $variant_name:ident => $message_type:ty, + )* + ) => { + $crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, without_shutdown, $($variant_name => $message_type,)*); + }; +} + +/// Internal implementation macro that handles both with/without shutdown cases +#[macro_export] +macro_rules! actor_envelope_impl { ( $enum_name:ident, $actor_trait:ident, $recorder_trait:ident, + $shutdown_mode:ident, $( $variant_name:ident => $message_type:ty, )* @@ -51,11 +107,7 @@ macro_rules! actor_envelope { } } } - #[doc = std::stringify!($actor_trait)] - #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] - #[doc = stringify!($enum_name)] - #[doc = "."] - pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { } + $crate::actor_trait_def!($actor_trait, $enum_name, $shutdown_mode, $($message_type,)*); #[doc = std::stringify!($recorder_trait)] #[doc = " is an [`ceramic_metrics::Recorder`] for each message type in the actor envelope "] @@ -64,6 +116,7 @@ macro_rules! actor_envelope { pub trait $recorder_trait : $(::ceramic_metrics::Recorder<$crate::MessageEvent<$message_type>> +)* ::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static { } + impl $enum_name { /// Runs the actor handling messages as they arrive. pub async fn run(mut actor: A, mut receiver: $crate::Receiver, mut shutdown: impl ::std::future::Future + ::std::marker::Send + 'static) @@ -111,6 +164,7 @@ macro_rules! actor_envelope { } } } + $crate::actor_shutdown_call!($shutdown_mode, actor); } } $( @@ -132,3 +186,33 @@ macro_rules! actor_envelope { )* }; } + +/// Helper macro to define actor trait with or without shutdown requirement +#[macro_export] +macro_rules! actor_trait_def { + ($actor_trait:ident, $enum_name:ident, with_shutdown, $($message_type:ty,)*) => { + #[doc = std::stringify!($actor_trait)] + #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] + #[doc = stringify!($enum_name)] + #[doc = ". This actor must also implement [`ceramic_actor::Shutdown`]."] + pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + $crate::Shutdown + ::std::marker::Send + 'static { } + }; + ($actor_trait:ident, $enum_name:ident, without_shutdown, $($message_type:ty,)*) => { + #[doc = std::stringify!($actor_trait)] + #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] + #[doc = stringify!($enum_name)] + #[doc = "."] + pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { } + }; +} + +/// Helper macro to conditionally call shutdown +#[macro_export] +macro_rules! actor_shutdown_call { + (with_shutdown, $actor:ident) => { + $actor.shutdown().await; + }; + (without_shutdown, $actor:ident) => { + // No shutdown call for actors that don't implement shutdown + }; +} diff --git a/one/src/daemon.rs b/one/src/daemon.rs index efebe38b2..2bfcd87f9 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -1,7 +1,7 @@ use std::{path::PathBuf, time::Duration}; use crate::{ - default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts, + default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, startup, DBOpts, DBOptsExperimental, Info, LogOpts, Network, }; use anyhow::{anyhow, bail, Result}; @@ -274,6 +274,16 @@ pub struct DaemonOpts { default_value = "file://./pipeline" )] object_store_url: url::Url, + + /// Comma-separated list of stream IDs or model IDs to register as initial interests on startup. + /// The node will track these streams and sync their data. + #[arg( + long, + use_value_delimiter = true, + value_delimiter = ',', + env = "CERAMIC_ONE_INITIAL_INTERESTS" + )] + initial_interests: Vec, } fn spawn_database_optimizer( @@ -447,6 +457,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let node_key = NodeKey::try_from_dir(opts.p2p_key_dir).await?; let node_id = node_key.id(); + // Process initial interests if provided + startup::process_initial_interests(&opts.initial_interests, &interest_svc, &network, &node_id) + .await?; + // Register metrics for all components let recon_metrics = MetricsHandle::register(recon::Metrics::register); let peer_svc_store_metrics = diff --git a/one/src/lib.rs b/one/src/lib.rs index 2dad1d9ed..ef2d7a572 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -8,6 +8,7 @@ mod metrics; mod migrations; mod network; mod query; +mod startup; use anyhow::{anyhow, bail, Result}; use ceramic_core::ssi::caip2::ChainId; diff --git a/one/src/startup.rs b/one/src/startup.rs new file mode 100644 index 000000000..90e9c8a9b --- /dev/null +++ b/one/src/startup.rs @@ -0,0 +1,186 @@ +//! Startup utilities for the Ceramic One daemon. + +use anyhow::{anyhow, Result}; +use ceramic_api::InterestService as InterestServiceTrait; +use ceramic_core::{EventId, Interest, NodeId, StreamId}; +use ceramic_interest_svc::InterestService; +use std::str::FromStr; +use std::sync::Arc; +use tracing::{debug, info, warn}; + +/// Process initial interests by registering them with the interest service +pub async fn process_initial_interests( + initial_interests: &[String], + interest_svc: &Arc, + network: &ceramic_core::Network, + node_id: &NodeId, +) -> Result<()> { + if initial_interests.is_empty() { + return Ok(()); + } + + info!("Processing {} initial interests", initial_interests.len()); + + for stream_id_str in initial_interests { + let stream_id_str = stream_id_str.trim(); + if stream_id_str.is_empty() { + continue; + } + + // Validate that the model stream ID is parseable + let _stream_id = StreamId::from_str(stream_id_str) + .map_err(|e| anyhow!("Invalid model ID '{}': {}", stream_id_str, e))?; + + // Create an interest for the "model" separator key covering the full range for this stream + // This follows the same pattern as the API endpoint /ceramic/interests/model/{stream_id} + let stream_id_bytes = multibase::decode(stream_id_str) + .map_err(|e| anyhow!("Failed to decode stream ID '{}': {}", stream_id_str, e))? + .1; + let start = EventId::builder() + .with_network(network) + .with_sep("model", &stream_id_bytes) + .with_min_controller() + .with_min_init() + .with_min_event() + .build_fencepost(); + let stop = EventId::builder() + .with_network(network) + .with_sep("model", &stream_id_bytes) + .with_max_controller() + .with_max_init() + .with_max_event() + .build_fencepost(); + + let interest = Interest::builder() + .with_sep_key("model") + .with_peer_id(&node_id.peer_id()) + .with_range((start.as_slice(), stop.as_slice())) + .with_not_after(0) + .build(); + + match interest_svc.insert(interest).await { + Ok(was_inserted) => { + if was_inserted { + info!( + "Successfully registered initial interest for stream: {}", + stream_id_str + ); + } else { + debug!( + "Interest for stream {} was already registered", + stream_id_str + ); + } + } + Err(e) => { + warn!( + "Failed to register initial interest for stream {}: {}", + stream_id_str, e + ); + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use ceramic_core::Network; + use ceramic_sql::sqlite::SqlitePool; + use std::sync::Arc; + + #[tokio::test] + async fn test_process_initial_interests_empty() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let interest_svc = Arc::new(InterestService::new(pool)); + let network = Network::InMemory; + let node_key = ceramic_core::NodeKey::random(); + let node_id = node_key.id(); + + // Test with empty interests + let result = process_initial_interests(&[], &interest_svc, &network, &node_id).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_process_initial_interests_invalid_stream_id() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let interest_svc = Arc::new(InterestService::new(pool)); + let network = Network::InMemory; + let node_key = ceramic_core::NodeKey::random(); + let node_id = node_key.id(); + + // Invalid stream ID - tests daemon-specific stream ID parsing and error handling + let stream_ids = vec!["invalid-stream-id".to_string()]; + + let result = + process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await; + assert!(result.is_err()); + + // Verify the error message contains information about the invalid stream ID + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("invalid-stream-id")); + } + + #[tokio::test] + async fn test_process_initial_interests_mixed_valid_invalid() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let interest_svc = Arc::new(InterestService::new(pool)); + let network = Network::InMemory; + let node_key = ceramic_core::NodeKey::random(); + let node_id = node_key.id(); + + // Mix of valid and invalid stream IDs - tests early failure behavior + let stream_ids = vec![ + "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9".to_string(), // Valid + "invalid-stream-id".to_string(), // Invalid + ]; + + let result = + process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await; + // Should fail on the first invalid ID + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_process_initial_interests_whitespace_handling() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let interest_svc = Arc::new(InterestService::new(pool)); + let network = Network::InMemory; + let node_key = ceramic_core::NodeKey::random(); + let node_id = node_key.id(); + + // Test daemon-specific input sanitization logic + let stream_ids = vec![ + " k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9 ".to_string(), // Valid with whitespace + "".to_string(), // Empty string + " ".to_string(), // Only whitespace + ]; + + let result = + process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await; + // Should succeed and handle whitespace correctly + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_process_initial_interests_multibase_decoding() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let interest_svc = Arc::new(InterestService::new(pool)); + let network = Network::InMemory; + let node_key = ceramic_core::NodeKey::random(); + let node_id = node_key.id(); + + // Test daemon-specific multibase decoding logic + let stream_ids = vec![ + "k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9".to_string(), // Valid multibase-encoded stream ID + ]; + + let result = + process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await; + // Should succeed with proper multibase decoding + assert!(result.is_ok()); + } +} diff --git a/pipeline/src/aggregator/mock.rs b/pipeline/src/aggregator/mock.rs index e18f5b851..c73159c69 100644 --- a/pipeline/src/aggregator/mock.rs +++ b/pipeline/src/aggregator/mock.rs @@ -1,6 +1,6 @@ //! Provides a mock implmentation of the aggregator actor. use async_trait::async_trait; -use ceramic_actor::{Actor, Handler, Message}; +use ceramic_actor::{Actor, Handler, Message, Shutdown}; use mockall::mock; use prometheus_client::registry::Registry; @@ -69,6 +69,13 @@ impl Actor for MockAggregator { } impl AggregatorActor for MockAggregator {} +#[async_trait] +impl Shutdown for MockAggregator { + async fn shutdown(&mut self) { + // Mock implementation - no cleanup needed + } +} + impl MockAggregator { /// Spawn a mock aggregator actor. pub fn spawn(mock_actor: MockAggregator) -> AggregatorHandle { diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index f8a0d7dfd..d3d11510f 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -35,7 +35,7 @@ use arrow::{ }; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; -use ceramic_actor::{actor_envelope, Actor, Handler, Message}; +use ceramic_actor::{actor_envelope, Actor, Handler, Message, Shutdown as ActorShutdown}; use ceramic_core::{StreamId, StreamIdType}; use cid::Cid; use datafusion::{ @@ -67,7 +67,7 @@ use model_instance_patch::ModelInstancePatch; use model_patch::ModelPatch; use shutdown::{Shutdown, ShutdownSignal}; use tokio::{select, sync::broadcast, task::JoinHandle}; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, info, instrument}; use crate::{ cache_table::CacheTable, @@ -802,38 +802,47 @@ impl Aggregator { .await .context("writing to mem table data")?; - let count = self - .ctx - .table(EVENT_STATES_MEM_TABLE) - .await? - .count() - .await - .context("count mem table")?; + let count = self.count_cache().await?; // If we have enough data cached in memory write it out to persistent store if count >= self.max_cached_rows { - self.ctx - .table(EVENT_STATES_MEM_TABLE) - .await? - .write_table( - EVENT_STATES_PERSISTENT_TABLE, - DataFrameWriteOptions::new() - .with_partition_by(vec!["event_cid_partition".to_owned()]), - ) - .await - .context("writing to persistent table")?; - // Clear all data in the memory batch, by writing an empty batch - self.ctx - .read_batch(RecordBatch::new_empty(schemas::event_states_partitioned())) - .context("reading empty batch")? - .write_table( - EVENT_STATES_MEM_TABLE, - DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite), - ) - .await - .context("clearing mem table")?; + self.flush_cache().await?; } ordered.collect().await.context("collecting ordered events") } + + async fn count_cache(&self) -> Result { + self.ctx + .table(EVENT_STATES_MEM_TABLE) + .await? + .count() + .await + .context("count mem table") + } + + async fn flush_cache(&self) -> Result { + let cnt = self.count_cache().await?; + self.ctx + .table(EVENT_STATES_MEM_TABLE) + .await? + .write_table( + EVENT_STATES_PERSISTENT_TABLE, + DataFrameWriteOptions::new() + .with_partition_by(vec!["event_cid_partition".to_owned()]), + ) + .await + .context("writing to persistent table")?; + // Clear all data in the memory batch, by writing an empty batch + self.ctx + .read_batch(RecordBatch::new_empty(schemas::event_states_partitioned())) + .context("reading empty batch")? + .write_table( + EVENT_STATES_MEM_TABLE, + DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite), + ) + .await + .context("clearing mem table")?; + Ok(cnt) + } } // Construct a column for the field in the UNNAMED_TABLE. @@ -888,6 +897,7 @@ actor_envelope! { AggregatorEnvelope, AggregatorActor, AggregatorRecorder, + with_shutdown, SubscribeSince => SubscribeSinceMsg, NewConclusionEvents => NewConclusionEventsMsg, // TODO: Remove this message and use the analogous message on the Resolver. @@ -895,6 +905,25 @@ actor_envelope! { StreamState => StreamStateMsg, } +#[async_trait] +impl ActorShutdown for Aggregator { + async fn shutdown(&mut self) { + info!("Aggregator shutdown: flushing cached data to persistent storage..."); + + match self.flush_cache().await { + Ok(rows) => { + debug!( + "Aggregator shutdown: successfully flushed {} rows to persistent storage", + rows + ); + } + Err(e) => { + error!("Aggregator shutdown: failed to flush cached data: {}", e); + } + } + } +} + #[async_trait] impl Handler for Aggregator { async fn handle( @@ -3265,4 +3294,213 @@ mod tests { | 1 | baeabeibrydyefwuzizf6s32lbuu27rpjibrbtsziqukscbnuo3y52pgmha | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeibrydyefwuzizf6s32lbuu27rpjibrbtsziqukscbnuo3y52pgmha | 0 | 0 | {"content":{"accountRelation":{"fields":[],"type":"set"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{}} | [Account relation of type Set must include at least one field] | | | +-------------------+-------------------------------------------------------------+-------------+-------------+-----------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------+--------+----------+"#]].assert_eq(&event_states.to_string()); } + + #[tokio::test] + async fn cache_flush_on_shutdown() { + use object_store::memory::InMemory; + let object_store = Arc::new(InMemory::new()); + + // Create test events (just 2 events - will stay in cache with threshold of 5) + let events = &model_and_mids_events()[0..2]; + let conclusion_events = conclusion_events_to_record_batch(events).unwrap(); + + let phase1_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store( + mock_concluder, + object_store.clone(), + Some(5), // Cache threshold of 5 - our 2 events will stay in memory + ) + .await + .unwrap(); + + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events.clone()), + ) + .await + .unwrap(); + + ctx.shutdown().await.unwrap(); + result + }; + + // Phase 2: Restart aggregator and query for existing data + let phase2_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store(mock_concluder, object_store.clone(), Some(5)) + .await + .unwrap(); + + // Query for existing data using direct subscription - finds nothing due to cache loss bug + let mut subscription = ctx + .actor_handle + .send(SubscribeSinceMsg { + projection: None, + filters: None, + limit: None, + }) + .await + .unwrap() + .unwrap(); + + // DO NOT send new events - we want to see what persisted from Phase 1 + // Wait briefly for any persisted data, then timeout + let result = tokio::time::timeout( + std::time::Duration::from_millis(100), + subscription.try_next(), + ) + .await; + + let event_states = match result { + Ok(Ok(Some(batch))) => pretty_event_states(vec![batch]).await.unwrap().to_string(), + _ => "No persisted data found".to_string(), + }; + + ctx.shutdown().await.unwrap(); + event_states + }; + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+"# + ]].assert_eq(&phase1_result.to_string()); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+"# + ]].assert_eq(&phase2_result.to_string()); + } + + #[tokio::test] + async fn cache_flush_on_threshold() { + use object_store::memory::InMemory; + let object_store = Arc::new(InMemory::new()); + + // Create test events - we'll use 4 events with threshold of 3 + let events = &model_and_mids_events()[0..4]; // Get first 4 events + let conclusion_events = conclusion_events_to_record_batch(events).unwrap(); + + // Phase 1: Process events that will exceed cache threshold + let phase1_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store( + mock_concluder, + object_store.clone(), + Some(3), // Cache threshold of 3 - our 4 events should trigger flush + ) + .await + .unwrap(); + + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events.clone()), + ) + .await + .unwrap(); + + ctx.shutdown().await.unwrap(); + result + }; + + // Phase 2: Restart aggregator and query for persisted data + let phase2_result = { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .times(1) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + + let ctx = init_with_object_store(mock_concluder, object_store.clone(), Some(3)) + .await + .unwrap(); + + // Query for existing data using direct subscription - should find persisted data + let mut subscription = ctx + .actor_handle + .send(SubscribeSinceMsg { + projection: None, + filters: None, + limit: None, + }) + .await + .unwrap() + .unwrap(); + + // DO NOT send new events - we want to see what persisted from Phase 1 + // The data should be available immediately since it was flushed to persistent storage + let batch = subscription.try_next().await.unwrap().unwrap(); + let event_states = pretty_event_states(vec![batch]).await.unwrap(); + + ctx.shutdown().await.unwrap(); + event_states + }; + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&phase1_result.to_string()); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&phase2_result.to_string()); + } }