Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions actor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ pub trait Actor {
}
}

/// Optional trait for actors that need to perform cleanup operations during shutdown.
/// Only actors that need cleanup should implement this trait.
#[async_trait]
pub trait Shutdown {
/// Called when the actor is shutting down to allow for cleanup operations.
/// This method is called after all pending messages have been processed
/// but before the actor loop exits.
async fn shutdown(&mut self);
}

/// Wrapper of any T with its tracing span context.
#[derive(Debug)]
pub struct Traced<T> {
Expand Down
94 changes: 89 additions & 5 deletions actor/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,75 @@
/// Constructs an envelope enumeration that contains all messages for an actor.
///
/// # Usage
///
/// Basic usage (no shutdown support):
/// ```ignore
/// actor_envelope! {
/// MyEnvelope,
/// MyActor,
/// MyRecorder,
/// Message1 => Msg1Type,
/// Message2 => Msg2Type,
/// }
/// ```
///
/// With shutdown support (actor must implement `Shutdown` trait):
/// ```ignore
/// actor_envelope! {
/// MyEnvelope,
/// MyActor,
/// MyRecorder,
/// with_shutdown,
/// Message1 => Msg1Type,
/// Message2 => Msg2Type,
/// }
/// ```
///
/// The first identifier is the name of the enum.
/// The second identifier is the name of a trait specific to the actor.
/// The third identifier is the name of a trait for recording message events.
/// Optional `with_shutdown` parameter adds shutdown support for actors that implement the Shutdown trait.
/// The remaining pairs are the variants of the envelope indicating the messages the actor handles.
///
/// The constructed actor trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait.
///
/// The constructed recorder trait is a union of the [`ceramic_metrics::Recorder`] traits for each message.
#[macro_export]
macro_rules! actor_envelope {
// Version with shutdown support
(
$enum_name:ident,
$actor_trait:ident,
$recorder_trait:ident,
with_shutdown,
$(
$variant_name:ident => $message_type:ty,
)*
) => {
$crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, with_shutdown, $($variant_name => $message_type,)*);
};

// Default version without shutdown
(
$enum_name:ident,
$actor_trait:ident,
$recorder_trait:ident,
$(
$variant_name:ident => $message_type:ty,
)*
) => {
$crate::actor_envelope_impl!($enum_name, $actor_trait, $recorder_trait, without_shutdown, $($variant_name => $message_type,)*);
};
}

/// Internal implementation macro that handles both with/without shutdown cases
#[macro_export]
macro_rules! actor_envelope_impl {
(
$enum_name:ident,
$actor_trait:ident,
$recorder_trait:ident,
$shutdown_mode:ident,
$(
$variant_name:ident => $message_type:ty,
)*
Expand Down Expand Up @@ -51,11 +107,7 @@ macro_rules! actor_envelope {
}
}
}
#[doc = std::stringify!($actor_trait)]
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
#[doc = stringify!($enum_name)]
#[doc = "."]
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { }
$crate::actor_trait_def!($actor_trait, $enum_name, $shutdown_mode, $($message_type,)*);

#[doc = std::stringify!($recorder_trait)]
#[doc = " is an [`ceramic_metrics::Recorder`] for each message type in the actor envelope "]
Expand All @@ -64,6 +116,7 @@ macro_rules! actor_envelope {
pub trait $recorder_trait : $(::ceramic_metrics::Recorder<$crate::MessageEvent<$message_type>> +)*
::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static { }


impl $enum_name {
/// Runs the actor handling messages as they arrive.
pub async fn run<A>(mut actor: A, mut receiver: $crate::Receiver<A::Envelope>, mut shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static)
Expand Down Expand Up @@ -111,6 +164,7 @@ macro_rules! actor_envelope {
}
}
}
$crate::actor_shutdown_call!($shutdown_mode, actor);
}
}
$(
Expand All @@ -132,3 +186,33 @@ macro_rules! actor_envelope {
)*
};
}

/// Helper macro to define actor trait with or without shutdown requirement
#[macro_export]
macro_rules! actor_trait_def {
($actor_trait:ident, $enum_name:ident, with_shutdown, $($message_type:ty,)*) => {
#[doc = std::stringify!($actor_trait)]
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
#[doc = stringify!($enum_name)]
#[doc = ". This actor must also implement [`ceramic_actor::Shutdown`]."]
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + $crate::Shutdown + ::std::marker::Send + 'static { }
};
($actor_trait:ident, $enum_name:ident, without_shutdown, $($message_type:ty,)*) => {
#[doc = std::stringify!($actor_trait)]
#[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "]
#[doc = stringify!($enum_name)]
#[doc = "."]
pub trait $actor_trait : $crate::Actor<Envelope = $enum_name> $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { }
};
}

/// Helper macro to conditionally call shutdown
#[macro_export]
macro_rules! actor_shutdown_call {
(with_shutdown, $actor:ident) => {
$actor.shutdown().await;
};
(without_shutdown, $actor:ident) => {
// No shutdown call for actors that don't implement shutdown
};
}
9 changes: 8 additions & 1 deletion pipeline/src/aggregator/mock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Provides a mock implmentation of the aggregator actor.
use async_trait::async_trait;
use ceramic_actor::{Actor, Handler, Message};
use ceramic_actor::{Actor, Handler, Message, Shutdown};
use mockall::mock;
use prometheus_client::registry::Registry;

Expand Down Expand Up @@ -69,6 +69,13 @@ impl Actor for MockAggregator {
}
impl AggregatorActor for MockAggregator {}

#[async_trait]
impl Shutdown for MockAggregator {
async fn shutdown(&mut self) {
// Mock implementation - no cleanup needed
}
}

impl MockAggregator {
/// Spawn a mock aggregator actor.
pub fn spawn(mock_actor: MockAggregator) -> AggregatorHandle {
Expand Down
Loading
Loading