Skip to content

Commit 6b6c414

Browse files
nathanielcdav1do
authored andcommitted
wip: adds scaffold for resolver actor
1 parent 6206208 commit 6b6c414

7 files changed

Lines changed: 870 additions & 13 deletions

File tree

pipeline/src/aggregator/mod.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,8 @@ actor_envelope! {
870870
AggregatorRecorder,
871871
SubscribeSince => SubscribeSinceMsg,
872872
NewConclusionEvents => NewConclusionEventsMsg,
873+
// TODO: Remove this message and use the analogous message on the Resolver.
874+
// This way the canonical stream state is provided via the API
873875
StreamState => StreamStateMsg,
874876
}
875877

@@ -903,13 +905,6 @@ impl Handler<SubscribeSinceMsg> for Aggregator {
903905
// Execute query to get initial (historical) results
904906
let query_stream = df.execute_stream().await?;
905907

906-
// Create subscription stream
907-
let subscription_stream = RecordBatchStreamAdapter::new(
908-
schemas::event_states(),
909-
tokio_stream::wrappers::BroadcastStream::new(subscription)
910-
.map_err(|err| exec_datafusion_err!("{err}")),
911-
);
912-
913908
// Merge query results with subscription updates
914909
rows_since(RowsSinceInput {
915910
session_context: &ctx,
@@ -918,7 +913,11 @@ impl Handler<SubscribeSinceMsg> for Aggregator {
918913
projection: message.projection,
919914
filters: message.filters,
920915
limit: message.limit,
921-
subscription: Box::pin(subscription_stream),
916+
subscription: Box::pin(RecordBatchStreamAdapter::new(
917+
schemas::event_states(),
918+
tokio_stream::wrappers::BroadcastStream::new(subscription)
919+
.map_err(|err| exec_datafusion_err!("{err}")),
920+
)),
922921
since: query_stream,
923922
})
924923
}

pipeline/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod cache_table;
1414
pub mod cid_part;
1515
pub mod cid_string;
1616
pub mod concluder;
17+
pub mod resolver;
1718
mod config;
1819
pub mod dimension_extract;
1920
mod metrics;

pipeline/src/metrics.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,24 @@ use prometheus_client::{
1111
pub struct Metrics {
1212
pub(crate) message_count: Family<MessageLabels, Counter>,
1313

14+
pub(crate) concluder_poll_new_events_loop_count: Counter,
15+
1416
pub(crate) aggregator_new_conclusion_events_count: Counter,
1517

16-
pub(crate) concluder_poll_new_events_loop_count: Counter,
18+
pub(crate) resolver_new_event_states_count: Counter,
1719
}
1820
impl Metrics {
1921
/// Register and construct Metrics
2022
pub fn register(registry: &mut Registry) -> Self {
2123
let sub_registry = registry.sub_registry_with_prefix("pipeline");
2224

25+
register!(
26+
concluder_poll_new_events_loop_count,
27+
"Number of times the loop to poll new conclusion events has run",
28+
Counter::default(),
29+
sub_registry
30+
);
31+
2332
register!(
2433
message_count,
2534
"Number of messages delivered to actors",
@@ -32,18 +41,18 @@ impl Metrics {
3241
Counter::default(),
3342
sub_registry
3443
);
35-
3644
register!(
37-
concluder_poll_new_events_loop_count,
38-
"Number of times the loop to poll new conclusion events has run",
45+
resolver_new_event_states_count,
46+
"Number of new event states delivered to the resolver",
3947
Counter::default(),
4048
sub_registry
4149
);
4250

4351
Self {
4452
message_count,
45-
aggregator_new_conclusion_events_count,
4653
concluder_poll_new_events_loop_count,
54+
aggregator_new_conclusion_events_count,
55+
resolver_new_event_states_count,
4756
}
4857
}
4958
}

pipeline/src/resolver/metrics.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use ceramic_actor::MessageEvent;
2+
use ceramic_metrics::Recorder;
3+
4+
use crate::metrics::{MessageLabels, Metrics};
5+
6+
use super::{NewEventStatesMsg, ResolverRecorder, StreamStateMsg};
7+
8+
impl Recorder<MessageEvent<NewEventStatesMsg>> for Metrics {
9+
fn record(&self, event: &MessageEvent<NewEventStatesMsg>) {
10+
self.message_count
11+
.get_or_create(&MessageLabels::from(event))
12+
.inc();
13+
self.resolver_new_event_states_count
14+
.inc_by(event.message.events.num_rows() as u64);
15+
}
16+
}
17+
impl Recorder<MessageEvent<StreamStateMsg>> for Metrics {
18+
fn record(&self, event: &MessageEvent<StreamStateMsg>) {
19+
self.message_count
20+
.get_or_create(&MessageLabels::from(event))
21+
.inc();
22+
}
23+
}
24+
impl ResolverRecorder for Metrics {}

pipeline/src/resolver/mock.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//! Provides a mock implmentation of the aggregator actor.
2+
use async_trait::async_trait;
3+
use ceramic_actor::{Actor, Handler, Message};
4+
use mockall::mock;
5+
use prometheus_client::registry::Registry;
6+
7+
use crate::metrics::Metrics;
8+
9+
use super::{
10+
NewEventStatesMsg, Resolver, ResolverActor, ResolverEnvelope, ResolverHandle, StreamStateMsg,
11+
SubscribeSinceMsg,
12+
};
13+
14+
mock! {
15+
// mockall does not support multiple methods on the struct with the same name.
16+
// This arises when implementing multiple traits that have methods with the same name as is
17+
// the case with the [`ceramic_actor::Handler`] trait.
18+
//
19+
// We add a layer of indirection to get around this limitation.
20+
pub Resolver {
21+
#[allow(missing_docs)]
22+
pub fn handle_subscribe_since(
23+
&mut self,
24+
message: SubscribeSinceMsg,
25+
) -> <SubscribeSinceMsg as Message>::Result;
26+
#[allow(missing_docs)]
27+
pub fn handle_new_event_states(
28+
&mut self,
29+
message: NewEventStatesMsg,
30+
) -> <NewEventStatesMsg as Message>::Result;
31+
#[allow(missing_docs)]
32+
pub fn handle_stream_state(
33+
&mut self,
34+
message: StreamStateMsg,
35+
) -> <StreamStateMsg
36+
as Message>::Result;
37+
}
38+
}
39+
40+
#[async_trait]
41+
impl Handler<SubscribeSinceMsg> for MockResolver {
42+
async fn handle(
43+
&mut self,
44+
message: SubscribeSinceMsg,
45+
) -> <SubscribeSinceMsg as Message>::Result {
46+
self.handle_subscribe_since(message)
47+
}
48+
}
49+
50+
#[async_trait]
51+
impl Handler<NewEventStatesMsg> for MockResolver {
52+
async fn handle(
53+
&mut self,
54+
message: NewEventStatesMsg,
55+
) -> <NewEventStatesMsg as Message>::Result {
56+
self.handle_new_event_states(message)
57+
}
58+
}
59+
60+
#[async_trait]
61+
impl Handler<StreamStateMsg> for MockResolver {
62+
async fn handle(&mut self, message: StreamStateMsg) -> <StreamStateMsg as Message>::Result {
63+
self.handle_stream_state(message)
64+
}
65+
}
66+
67+
impl Actor for MockResolver {
68+
type Envelope = ResolverEnvelope;
69+
}
70+
impl ResolverActor for MockResolver {}
71+
72+
impl MockResolver {
73+
/// Spawn a mock aggregator actor.
74+
pub fn spawn(mock_actor: MockResolver) -> ResolverHandle {
75+
let metrics = Metrics::register(&mut Registry::default());
76+
let (handle, _task_handle) =
77+
Resolver::spawn(1_000, mock_actor, metrics, std::future::pending());
78+
handle
79+
}
80+
}

0 commit comments

Comments
 (0)