Skip to content

Commit 6e92ff6

Browse files
committed
fix: flush pipeline aggregator cache on shutdown with optional actor shutdown trait
1 parent 5c18247 commit 6e92ff6

4 files changed

Lines changed: 165 additions & 35 deletions

File tree

actor/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ pub trait Actor {
6161
}
6262
}
6363

64+
/// Optional trait for actors that need to perform cleanup operations during shutdown.
65+
/// Only actors that need cleanup should implement this trait.
66+
#[async_trait]
67+
pub trait Shutdown {
68+
/// Called when the actor is shutting down to allow for cleanup operations.
69+
/// This method is called after all pending messages have been processed
70+
/// but before the actor loop exits.
71+
async fn shutdown(&mut self);
72+
}
73+
6474
/// Wrapper of any T with its tracing span context.
6575
#[derive(Debug)]
6676
pub struct Traced<T> {

actor/src/macros.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,75 @@
11
/// Constructs an envelope enumeration that contains all messages for an actor.
22
///
3+
/// # Usage
4+
///
5+
/// Basic usage (no shutdown support):
6+
/// ```ignore
7+
/// actor_envelope! {
8+
/// MyEnvelope,
9+
/// MyActor,
10+
/// MyRecorder,
11+
/// Message1 => Msg1Type,
12+
/// Message2 => Msg2Type,
13+
/// }
14+
/// ```
15+
///
16+
/// With shutdown support (actor must implement `Shutdown` trait):
17+
/// ```ignore
18+
/// actor_envelope! {
19+
/// MyEnvelope,
20+
/// MyActor,
21+
/// MyRecorder,
22+
/// with_shutdown,
23+
/// Message1 => Msg1Type,
24+
/// Message2 => Msg2Type,
25+
/// }
26+
/// ```
27+
///
328
/// The first identifier is the name of the enum.
429
/// The second identifier is the name of a trait specific to the actor.
530
/// The third identifier is the name of a trait for recording message events.
31+
/// Optional `with_shutdown` parameter adds shutdown support for actors that implement the Shutdown trait.
632
/// The remaining pairs are the variants of the envelope indicating the messages the actor handles.
733
///
834
/// The constructed actor trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait.
935
///
1036
/// The constructed recorder trait is a union of the [`ceramic_metrics::Recorder`] traits for each message.
1137
#[macro_export]
1238
macro_rules! actor_envelope {
39+
// Version with shutdown support
40+
(
41+
$enum_name:ident,
42+
$actor_trait:ident,
43+
$recorder_trait:ident,
44+
with_shutdown,
45+
$(
46+
$variant_name:ident => $message_type:ty,
47+
)*
48+
) => {
49+
$crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, with_shutdown, $($variant_name => $message_type,)*);
50+
};
51+
52+
// Default version without shutdown
53+
(
54+
$enum_name:ident,
55+
$actor_trait:ident,
56+
$recorder_trait:ident,
57+
$(
58+
$variant_name:ident => $message_type:ty,
59+
)*
60+
) => {
61+
$crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, without_shutdown, $($variant_name => $message_type,)*);
62+
};
63+
}
64+
65+
/// Internal implementation macro that handles both with/without shutdown cases
66+
#[macro_export]
67+
macro_rules! actor_envelope_impl {
1368
(
1469
$enum_name:ident,
1570
$actor_trait:ident,
1671
$recorder_trait:ident,
72+
$shutdown_mode:ident,
1773
$(
1874
$variant_name:ident => $message_type:ty,
1975
)*
@@ -51,11 +107,7 @@ macro_rules! actor_envelope {
51107
}
52108
}
53109
}
54-
#[doc = std::stringify!($actor_trait)]
55-
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
56-
#[doc = stringify!($enum_name)]
57-
#[doc = "."]
58-
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { }
110+
$crate::actor_trait_def!($actor_trait, $enum_name, $shutdown_mode, $($message_type,)*);
59111

60112
#[doc = std::stringify!($recorder_trait)]
61113
#[doc = " is an [`ceramic_metrics::Recorder`] for each message type in the actor envelope "]
@@ -64,6 +116,7 @@ macro_rules! actor_envelope {
64116
pub trait $recorder_trait : $(::ceramic_metrics::Recorder<$crate::MessageEvent<$message_type>> +)*
65117
::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static { }
66118

119+
67120
impl $enum_name {
68121
/// Runs the actor handling messages as they arrive.
69122
pub async fn run<A>(mut actor: A, mut receiver: $crate::Receiver<A::Envelope>, mut shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static)
@@ -111,6 +164,7 @@ macro_rules! actor_envelope {
111164
}
112165
}
113166
}
167+
$crate::actor_shutdown_call!($shutdown_mode, actor);
114168
}
115169
}
116170
$(
@@ -132,3 +186,33 @@ macro_rules! actor_envelope {
132186
)*
133187
};
134188
}
189+
190+
/// Helper macro to define actor trait with or without shutdown requirement
191+
#[macro_export]
192+
macro_rules! actor_trait_def {
193+
($actor_trait:ident, $enum_name:ident, with_shutdown, $($message_type:ty,)*) => {
194+
#[doc = std::stringify!($actor_trait)]
195+
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
196+
#[doc = stringify!($enum_name)]
197+
#[doc = ". This actor must also implement [`ceramic_actor::Shutdown`]."]
198+
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + $crate::Shutdown + ::std::marker::Send + 'static { }
199+
};
200+
($actor_trait:ident, $enum_name:ident, without_shutdown, $($message_type:ty,)*) => {
201+
#[doc = std::stringify!($actor_trait)]
202+
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
203+
#[doc = stringify!($enum_name)]
204+
#[doc = "."]
205+
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { }
206+
};
207+
}
208+
209+
/// Helper macro to conditionally call shutdown
210+
#[macro_export]
211+
macro_rules! actor_shutdown_call {
212+
(with_shutdown, $actor:ident) => {
213+
$actor.shutdown().await;
214+
};
215+
(without_shutdown, $actor:ident) => {
216+
// No shutdown call for actors that don't implement shutdown
217+
};
218+
}

pipeline/src/aggregator/mock.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Provides a mock implmentation of the aggregator actor.
22
use async_trait::async_trait;
3-
use ceramic_actor::{Actor, Handler, Message};
3+
use ceramic_actor::{Actor, Handler, Message, Shutdown};
44
use mockall::mock;
55
use prometheus_client::registry::Registry;
66

@@ -69,6 +69,13 @@ impl Actor for MockAggregator {
6969
}
7070
impl AggregatorActor for MockAggregator {}
7171

72+
#[async_trait]
73+
impl Shutdown for MockAggregator {
74+
async fn shutdown(&mut self) {
75+
// Mock implementation - no cleanup needed
76+
}
77+
}
78+
7279
impl MockAggregator {
7380
/// Spawn a mock aggregator actor.
7481
pub fn spawn(mock_actor: MockAggregator) -> AggregatorHandle {

pipeline/src/aggregator/mod.rs

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use arrow::{
3535
};
3636
use arrow_schema::{DataType, SchemaRef};
3737
use async_trait::async_trait;
38-
use ceramic_actor::{actor_envelope, Actor, Handler, Message};
38+
use ceramic_actor::{actor_envelope, Actor, Handler, Message, Shutdown as ActorShutdown};
3939
use ceramic_core::{StreamId, StreamIdType};
4040
use cid::Cid;
4141
use datafusion::{
@@ -67,7 +67,7 @@ use model_instance_patch::ModelInstancePatch;
6767
use model_patch::ModelPatch;
6868
use shutdown::{Shutdown, ShutdownSignal};
6969
use tokio::{select, sync::broadcast, task::JoinHandle};
70-
use tracing::{debug, error, instrument};
70+
use tracing::{debug, error, info, instrument};
7171

7272
use crate::{
7373
cache_table::CacheTable,
@@ -802,38 +802,47 @@ impl Aggregator {
802802
.await
803803
.context("writing to mem table data")?;
804804

805-
let count = self
806-
.ctx
807-
.table(EVENT_STATES_MEM_TABLE)
808-
.await?
809-
.count()
810-
.await
811-
.context("count mem table")?;
805+
let count = self.count_cache().await?;
812806
// If we have enough data cached in memory write it out to persistent store
813807
if count >= self.max_cached_rows {
814-
self.ctx
815-
.table(EVENT_STATES_MEM_TABLE)
816-
.await?
817-
.write_table(
818-
EVENT_STATES_PERSISTENT_TABLE,
819-
DataFrameWriteOptions::new()
820-
.with_partition_by(vec!["event_cid_partition".to_owned()]),
821-
)
822-
.await
823-
.context("writing to persistent table")?;
824-
// Clear all data in the memory batch, by writing an empty batch
825-
self.ctx
826-
.read_batch(RecordBatch::new_empty(schemas::event_states_partitioned()))
827-
.context("reading empty batch")?
828-
.write_table(
829-
EVENT_STATES_MEM_TABLE,
830-
DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite),
831-
)
832-
.await
833-
.context("clearing mem table")?;
808+
self.flush_cache().await?;
834809
}
835810
ordered.collect().await.context("collecting ordered events")
836811
}
812+
813+
async fn count_cache(&self) -> Result<usize> {
814+
self.ctx
815+
.table(EVENT_STATES_MEM_TABLE)
816+
.await?
817+
.count()
818+
.await
819+
.context("count mem table")
820+
}
821+
822+
async fn flush_cache(&self) -> Result<usize> {
823+
let cnt = self.count_cache().await?;
824+
self.ctx
825+
.table(EVENT_STATES_MEM_TABLE)
826+
.await?
827+
.write_table(
828+
EVENT_STATES_PERSISTENT_TABLE,
829+
DataFrameWriteOptions::new()
830+
.with_partition_by(vec!["event_cid_partition".to_owned()]),
831+
)
832+
.await
833+
.context("writing to persistent table")?;
834+
// Clear all data in the memory batch, by writing an empty batch
835+
self.ctx
836+
.read_batch(RecordBatch::new_empty(schemas::event_states_partitioned()))
837+
.context("reading empty batch")?
838+
.write_table(
839+
EVENT_STATES_MEM_TABLE,
840+
DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite),
841+
)
842+
.await
843+
.context("clearing mem table")?;
844+
Ok(cnt)
845+
}
837846
}
838847

839848
// Construct a column for the field in the UNNAMED_TABLE.
@@ -888,13 +897,33 @@ actor_envelope! {
888897
AggregatorEnvelope,
889898
AggregatorActor,
890899
AggregatorRecorder,
900+
with_shutdown,
891901
SubscribeSince => SubscribeSinceMsg,
892902
NewConclusionEvents => NewConclusionEventsMsg,
893903
// TODO: Remove this message and use the analogous message on the Resolver.
894904
// This way the canonical stream state is provided via the API
895905
StreamState => StreamStateMsg,
896906
}
897907

908+
#[async_trait]
909+
impl ActorShutdown for Aggregator {
910+
async fn shutdown(&mut self) {
911+
info!("Aggregator shutdown: flushing cached data to persistent storage...");
912+
913+
match self.flush_cache().await {
914+
Ok(rows) => {
915+
debug!(
916+
"Aggregator shutdown: successfully flushed {} rows to persistent storage",
917+
rows
918+
);
919+
}
920+
Err(e) => {
921+
error!("Aggregator shutdown: failed to flush cached data: {}", e);
922+
}
923+
}
924+
}
925+
}
926+
898927
#[async_trait]
899928
impl Handler<SubscribeSinceMsg> for Aggregator {
900929
async fn handle(

0 commit comments

Comments
 (0)