Skip to content

Commit 86c9121

Browse files
committed
fix: flush pipeline aggregator cache on shutdown with optional actor shutdown trait
1 parent 5c18247 commit 86c9121

4 files changed

Lines changed: 162 additions & 8 deletions

File tree

actor/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ pub trait Actor {
6161
}
6262
}
6363

64+
/// Optional trait for actors that need to perform cleanup operations during shutdown.
65+
/// Only actors that need cleanup should implement this trait.
66+
#[async_trait]
67+
pub trait Shutdown {
68+
/// Called when the actor is shutting down to allow for cleanup operations.
69+
/// This method is called after all pending messages have been processed
70+
/// but before the actor loop exits.
71+
async fn shutdown(&mut self);
72+
}
73+
6474
/// Wrapper of any T with its tracing span context.
6575
#[derive(Debug)]
6676
pub struct Traced<T> {

actor/src/macros.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,75 @@
11
/// Constructs an envelope enumeration that contains all messages for an actor.
22
///
3+
/// # Usage
4+
///
5+
/// Basic usage (no shutdown support):
6+
/// ```ignore
7+
/// actor_envelope! {
8+
/// MyEnvelope,
9+
/// MyActor,
10+
/// MyRecorder,
11+
/// Message1 => Msg1Type,
12+
/// Message2 => Msg2Type,
13+
/// }
14+
/// ```
15+
///
16+
/// With shutdown support (actor must implement `Shutdown` trait):
17+
/// ```ignore
18+
/// actor_envelope! {
19+
/// MyEnvelope,
20+
/// MyActor,
21+
/// MyRecorder,
22+
/// with_shutdown,
23+
/// Message1 => Msg1Type,
24+
/// Message2 => Msg2Type,
25+
/// }
26+
/// ```
27+
///
328
/// The first identifier is the name of the enum.
429
/// The second identifier is the name of a trait specific to the actor.
530
/// The third identifier is the name of a trait for recording message events.
31+
/// Optional `with_shutdown` parameter adds shutdown support for actors that implement the Shutdown trait.
632
/// The remaining pairs are the variants of the envelope indicating the messages the actor handles.
733
///
834
/// The constructed actor trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait.
935
///
1036
/// The constructed recorder trait is a union of the [`ceramic_metrics::Recorder`] traits for each message.
1137
#[macro_export]
1238
macro_rules! actor_envelope {
39+
// Version with shutdown support
40+
(
41+
$enum_name:ident,
42+
$actor_trait:ident,
43+
$recorder_trait:ident,
44+
with_shutdown,
45+
$(
46+
$variant_name:ident => $message_type:ty,
47+
)*
48+
) => {
49+
$crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, with_shutdown, $($variant_name => $message_type,)*);
50+
};
51+
52+
// Default version without shutdown
53+
(
54+
$enum_name:ident,
55+
$actor_trait:ident,
56+
$recorder_trait:ident,
57+
$(
58+
$variant_name:ident => $message_type:ty,
59+
)*
60+
) => {
61+
$crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, without_shutdown, $($variant_name => $message_type,)*);
62+
};
63+
}
64+
65+
/// Internal implementation macro that handles both with/without shutdown cases
66+
#[macro_export]
67+
macro_rules! actor_envelope_impl {
1368
(
1469
$enum_name:ident,
1570
$actor_trait:ident,
1671
$recorder_trait:ident,
72+
$shutdown_mode:ident,
1773
$(
1874
$variant_name:ident => $message_type:ty,
1975
)*
@@ -51,11 +107,7 @@ macro_rules! actor_envelope {
51107
}
52108
}
53109
}
54-
#[doc = std::stringify!($actor_trait)]
55-
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
56-
#[doc = stringify!($enum_name)]
57-
#[doc = "."]
58-
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { }
110+
$crate::actor_trait_def!($actor_trait, $enum_name, $shutdown_mode, $($message_type,)*);
59111

60112
#[doc = std::stringify!($recorder_trait)]
61113
#[doc = " is an [`ceramic_metrics::Recorder`] for each message type in the actor envelope "]
@@ -64,6 +116,7 @@ macro_rules! actor_envelope {
64116
pub trait $recorder_trait : $(::ceramic_metrics::Recorder<$crate::MessageEvent<$message_type>> +)*
65117
::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static { }
66118

119+
67120
impl $enum_name {
68121
/// Runs the actor handling messages as they arrive.
69122
pub async fn run<A>(mut actor: A, mut receiver: $crate::Receiver<A::Envelope>, mut shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static)
@@ -111,6 +164,7 @@ macro_rules! actor_envelope {
111164
}
112165
}
113166
}
167+
$crate::actor_shutdown_call!($shutdown_mode, actor);
114168
}
115169
}
116170
$(
@@ -132,3 +186,33 @@ macro_rules! actor_envelope {
132186
)*
133187
};
134188
}
189+
190+
/// Helper macro to define actor trait with or without shutdown requirement
191+
#[macro_export]
192+
macro_rules! actor_trait_def {
193+
($actor_trait:ident, $enum_name:ident, with_shutdown, $($message_type:ty,)*) => {
194+
#[doc = std::stringify!($actor_trait)]
195+
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
196+
#[doc = stringify!($enum_name)]
197+
#[doc = ". This actor must also implement [`ceramic_actor::Shutdown`]."]
198+
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + $crate::Shutdown + ::std::marker::Send + 'static { }
199+
};
200+
($actor_trait:ident, $enum_name:ident, without_shutdown, $($message_type:ty,)*) => {
201+
#[doc = std::stringify!($actor_trait)]
202+
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
203+
#[doc = stringify!($enum_name)]
204+
#[doc = "."]
205+
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { }
206+
};
207+
}
208+
209+
/// Helper macro to conditionally call shutdown
210+
#[macro_export]
211+
macro_rules! actor_shutdown_call {
212+
(with_shutdown, $actor:ident) => {
213+
$actor.shutdown().await;
214+
};
215+
(without_shutdown, $actor:ident) => {
216+
// No shutdown call for actors that don't implement shutdown
217+
};
218+
}

pipeline/src/aggregator/mock.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Provides a mock implmentation of the aggregator actor.
22
use async_trait::async_trait;
3-
use ceramic_actor::{Actor, Handler, Message};
3+
use ceramic_actor::{Actor, Handler, Message, Shutdown};
44
use mockall::mock;
55
use prometheus_client::registry::Registry;
66

@@ -69,6 +69,13 @@ impl Actor for MockAggregator {
6969
}
7070
impl AggregatorActor for MockAggregator {}
7171

72+
#[async_trait]
73+
impl Shutdown for MockAggregator {
74+
async fn shutdown(&mut self) {
75+
// Mock implementation - no cleanup needed
76+
}
77+
}
78+
7279
impl MockAggregator {
7380
/// Spawn a mock aggregator actor.
7481
pub fn spawn(mock_actor: MockAggregator) -> AggregatorHandle {

pipeline/src/aggregator/mod.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use arrow::{
3535
};
3636
use arrow_schema::{DataType, SchemaRef};
3737
use async_trait::async_trait;
38-
use ceramic_actor::{actor_envelope, Actor, Handler, Message};
38+
use ceramic_actor::{actor_envelope, Actor, Handler, Message, Shutdown as ActorShutdown};
3939
use ceramic_core::{StreamId, StreamIdType};
4040
use cid::Cid;
4141
use datafusion::{
@@ -67,7 +67,7 @@ use model_instance_patch::ModelInstancePatch;
6767
use model_patch::ModelPatch;
6868
use shutdown::{Shutdown, ShutdownSignal};
6969
use tokio::{select, sync::broadcast, task::JoinHandle};
70-
use tracing::{debug, error, instrument};
70+
use tracing::{debug, error, info, instrument};
7171

7272
use crate::{
7373
cache_table::CacheTable,
@@ -888,13 +888,66 @@ actor_envelope! {
888888
AggregatorEnvelope,
889889
AggregatorActor,
890890
AggregatorRecorder,
891+
with_shutdown,
891892
SubscribeSince => SubscribeSinceMsg,
892893
NewConclusionEvents => NewConclusionEventsMsg,
893894
// TODO: Remove this message and use the analogous message on the Resolver.
894895
// This way the canonical stream state is provided via the API
895896
StreamState => StreamStateMsg,
896897
}
897898

899+
#[async_trait]
900+
impl ActorShutdown for Aggregator {
901+
async fn shutdown(&mut self) {
902+
info!("Aggregator shutdown: flushing cached data to persistent storage...");
903+
904+
// Check if there's any cached data that needs to be flushed
905+
let count_result = match self.ctx.table(EVENT_STATES_MEM_TABLE).await {
906+
Ok(table) => table.count().await,
907+
Err(e) => Err(e),
908+
};
909+
910+
match count_result {
911+
Ok(0) => {
912+
debug!("Aggregator shutdown: no cached data to flush");
913+
}
914+
Ok(count) => {
915+
debug!(
916+
"Aggregator shutdown: flushing {} cached rows to persistent storage",
917+
count
918+
);
919+
920+
// Flush cache to persistent storage (same logic as threshold-based flush)
921+
let flush_result = match self.ctx.table(EVENT_STATES_MEM_TABLE).await {
922+
Ok(table) => {
923+
table
924+
.write_table(
925+
EVENT_STATES_PERSISTENT_TABLE,
926+
DataFrameWriteOptions::new()
927+
.with_partition_by(vec!["event_cid_partition".to_owned()]),
928+
)
929+
.await
930+
}
931+
Err(e) => Err(e),
932+
};
933+
934+
match flush_result {
935+
Ok(_) => {
936+
debug!("Aggregator shutdown: successfully flushed {} rows to persistent storage", count);
937+
// Note: We skip clearing the memory cache during shutdown since the aggregator is terminating
938+
}
939+
Err(e) => {
940+
error!("Aggregator shutdown: failed to flush cached data: {}", e);
941+
}
942+
}
943+
}
944+
Err(e) => {
945+
error!("Aggregator shutdown: failed to check cache count: {}", e);
946+
}
947+
}
948+
}
949+
}
950+
898951
#[async_trait]
899952
impl Handler<SubscribeSinceMsg> for Aggregator {
900953
async fn handle(

0 commit comments

Comments
 (0)