Skip to content

Commit 04296e9

Browse files
committed
feat: [US-053] - [Inspector: Workflow bridge via NAPI callbacks]
1 parent 054891b commit 04296e9

12 files changed

Lines changed: 1023 additions & 15 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/callbacks.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ pub type ActionHandler =
204204
pub type BeforeActionResponseCallback = Box<
205205
dyn Fn(OnBeforeActionResponseRequest) -> BoxFuture<'static, Result<Vec<u8>>> + Send + Sync,
206206
>;
207+
pub type GetWorkflowHistoryCallback = Box<
208+
dyn Fn(GetWorkflowHistoryRequest) -> BoxFuture<'static, Result<Option<Vec<u8>>>>
209+
+ Send
210+
+ Sync,
211+
>;
212+
pub type ReplayWorkflowCallback = Box<
213+
dyn Fn(ReplayWorkflowRequest) -> BoxFuture<'static, Result<Option<Vec<u8>>>> + Send + Sync,
214+
>;
207215

208216
#[derive(Clone, Debug)]
209217
pub struct OnWakeRequest {
@@ -283,6 +291,17 @@ pub struct RunRequest {
283291
pub ctx: ActorContext,
284292
}
285293

294+
#[derive(Clone, Debug)]
295+
pub struct GetWorkflowHistoryRequest {
296+
pub ctx: ActorContext,
297+
}
298+
299+
#[derive(Clone, Debug)]
300+
pub struct ReplayWorkflowRequest {
301+
pub ctx: ActorContext,
302+
pub entry_id: Option<String>,
303+
}
304+
286305
#[derive(Default)]
287306
pub struct ActorInstanceCallbacks {
288307
pub on_migrate: Option<LifecycleCallback<OnMigrateRequest>>,
@@ -298,6 +317,8 @@ pub struct ActorInstanceCallbacks {
298317
pub actions: HashMap<String, ActionHandler>,
299318
pub on_before_action_response: Option<BeforeActionResponseCallback>,
300319
pub run: Option<LifecycleCallback<RunRequest>>,
320+
pub get_workflow_history: Option<GetWorkflowHistoryCallback>,
321+
pub replay_workflow: Option<ReplayWorkflowCallback>,
301322
}
302323

303324
impl fmt::Debug for ActorInstanceCallbacks {
@@ -319,6 +340,8 @@ impl fmt::Debug for ActorInstanceCallbacks {
319340
&self.on_before_action_response.is_some(),
320341
)
321342
.field("run", &self.run.is_some())
343+
.field("get_workflow_history", &self.get_workflow_history.is_some())
344+
.field("replay_workflow", &self.replay_workflow.is_some())
322345
.finish()
323346
}
324347
}

rivetkit-rust/packages/rivetkit-core/src/inspector/mod.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,51 @@
1+
use anyhow::Result;
2+
use futures::future::BoxFuture;
13
use std::sync::Arc;
24
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
35

6+
type WorkflowHistoryCallback =
7+
Arc<dyn Fn() -> BoxFuture<'static, Result<Option<Vec<u8>>>> + Send + Sync>;
8+
type WorkflowReplayCallback = Arc<
9+
dyn Fn(Option<String>) -> BoxFuture<'static, Result<Option<Vec<u8>>>> + Send + Sync,
10+
>;
11+
412
#[derive(Clone, Debug, Default)]
513
pub struct Inspector(Arc<InspectorInner>);
614

7-
#[derive(Debug, Default)]
15+
#[derive(Default)]
816
struct InspectorInner {
917
state_revision: AtomicU64,
1018
connections_revision: AtomicU64,
1119
queue_revision: AtomicU64,
1220
active_connections: AtomicU32,
1321
queue_size: AtomicU32,
1422
connected_clients: AtomicUsize,
23+
get_workflow_history: Option<WorkflowHistoryCallback>,
24+
replay_workflow: Option<WorkflowReplayCallback>,
25+
}
26+
27+
impl std::fmt::Debug for InspectorInner {
28+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29+
f.debug_struct("InspectorInner")
30+
.field("state_revision", &self.state_revision.load(Ordering::SeqCst))
31+
.field(
32+
"connections_revision",
33+
&self.connections_revision.load(Ordering::SeqCst),
34+
)
35+
.field("queue_revision", &self.queue_revision.load(Ordering::SeqCst))
36+
.field(
37+
"active_connections",
38+
&self.active_connections.load(Ordering::SeqCst),
39+
)
40+
.field("queue_size", &self.queue_size.load(Ordering::SeqCst))
41+
.field(
42+
"connected_clients",
43+
&self.connected_clients.load(Ordering::SeqCst),
44+
)
45+
.field("get_workflow_history", &self.get_workflow_history.is_some())
46+
.field("replay_workflow", &self.replay_workflow.is_some())
47+
.finish()
48+
}
1549
}
1650

1751
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
@@ -29,6 +63,17 @@ impl Inspector {
2963
Self::default()
3064
}
3165

66+
pub fn with_workflow_callbacks(
67+
get_workflow_history: Option<WorkflowHistoryCallback>,
68+
replay_workflow: Option<WorkflowReplayCallback>,
69+
) -> Self {
70+
Self(Arc::new(InspectorInner {
71+
get_workflow_history,
72+
replay_workflow,
73+
..InspectorInner::default()
74+
}))
75+
}
76+
3277
pub fn snapshot(&self) -> InspectorSnapshot {
3378
InspectorSnapshot {
3479
state_revision: self.0.state_revision.load(Ordering::SeqCst),
@@ -40,6 +85,24 @@ impl Inspector {
4085
}
4186
}
4287

88+
pub fn is_workflow_enabled(&self) -> bool {
89+
self.0.get_workflow_history.is_some()
90+
}
91+
92+
pub async fn get_workflow_history(&self) -> Result<Option<Vec<u8>>> {
93+
let Some(callback) = &self.0.get_workflow_history else {
94+
return Ok(None);
95+
};
96+
callback().await
97+
}
98+
99+
pub async fn replay_workflow(&self, entry_id: Option<String>) -> Result<Option<Vec<u8>>> {
100+
let Some(callback) = &self.0.replay_workflow else {
101+
return Ok(None);
102+
};
103+
callback(entry_id).await
104+
}
105+
43106
pub(crate) fn record_state_updated(&self) {
44107
self.0.state_revision.fetch_add(1, Ordering::SeqCst);
45108
}

rivetkit-rust/packages/rivetkit-core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ pub use actor::callbacks::{
1111
ActionRequest, ActorInstanceCallbacks, OnBeforeActionResponseRequest,
1212
OnBeforeConnectRequest, OnConnectRequest, OnDestroyRequest, OnDisconnectRequest,
1313
OnMigrateRequest, OnRequestRequest, OnSleepRequest, OnStateChangeRequest,
14-
OnWakeRequest, OnWebSocketRequest, Request, Response, RunRequest,
14+
OnWakeRequest, OnWebSocketRequest, ReplayWorkflowRequest, Request, Response,
15+
RunRequest, GetWorkflowHistoryRequest,
1516
};
1617
pub use actor::config::{
1718
ActorConfig, ActorConfigOverrides, CanHibernateWebSocket, FlatActorConfig,

rivetkit-rust/packages/rivetkit-core/src/registry.rs

Lines changed: 141 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ use uuid::Uuid;
3131

3232
use crate::actor::action::{ActionDispatchError, ActionInvoker};
3333
use crate::actor::callbacks::{ActionRequest, OnRequestRequest, OnWebSocketRequest, Request, Response};
34+
use crate::actor::callbacks::{GetWorkflowHistoryRequest, ReplayWorkflowRequest};
3435
use crate::actor::config::CanHibernateWebSocket;
3536
use crate::actor::context::ActorContext;
3637
use crate::actor::factory::ActorFactory;
3738
use crate::actor::lifecycle::{ActorLifecycle, StartupOptions};
3839
use crate::actor::state::{PERSIST_DATA_KEY, PersistedActor, decode_persisted_actor};
40+
use crate::inspector::Inspector;
3941
use crate::kv::Kv;
4042
use crate::sqlite::SqliteDb;
4143
use crate::types::{ActorKey, ActorKeySegment, SaveStateOpts};
@@ -53,6 +55,7 @@ struct ActiveActorInstance {
5355
ctx: ActorContext,
5456
factory: Arc<ActorFactory>,
5557
callbacks: Arc<crate::actor::callbacks::ActorInstanceCallbacks>,
58+
inspector: Inspector,
5659
}
5760

5861
struct RegistryDispatcher {
@@ -130,6 +133,12 @@ struct InspectorDatabaseExecuteBody {
130133
properties: Option<JsonValue>,
131134
}
132135

136+
#[derive(Debug, Default, Deserialize)]
137+
#[serde(default, rename_all = "camelCase")]
138+
struct InspectorWorkflowReplayBody {
139+
entry_id: Option<String>,
140+
}
141+
133142
#[derive(Debug, Serialize)]
134143
#[serde(rename_all = "camelCase")]
135144
struct InspectorQueueMessageJson {
@@ -262,13 +271,17 @@ impl RegistryDispatcher {
262271
.await
263272
.map_err(|error| error.into_source())
264273
.with_context(|| format!("start actor `{}`", request.actor_id))?;
274+
let inspector =
275+
build_actor_inspector(request.ctx.clone(), outcome.callbacks.clone());
276+
request.ctx.configure_inspector(Some(inspector.clone()));
265277

266278
let instance = ActiveActorInstance {
267279
actor_name: request.actor_name,
268280
generation: request.generation,
269281
ctx: request.ctx,
270282
factory,
271283
callbacks: outcome.callbacks,
284+
inspector,
272285
};
273286
let _ = self
274287
.active_instances
@@ -482,6 +495,36 @@ impl RegistryDispatcher {
482495
};
483496
json_http_response(StatusCode::OK, &payload)
484497
}
498+
(http::Method::GET, "/inspector/workflow-history") => self
499+
.inspector_workflow_history(instance)
500+
.await
501+
.and_then(|(is_workflow_enabled, history)| {
502+
json_http_response(
503+
StatusCode::OK,
504+
&json!({
505+
"history": history,
506+
"isWorkflowEnabled": is_workflow_enabled,
507+
}),
508+
)
509+
}),
510+
(http::Method::POST, "/inspector/workflow/replay") => {
511+
let body: InspectorWorkflowReplayBody = match parse_json_body(request) {
512+
Ok(body) => body,
513+
Err(response) => return Ok(Some(response)),
514+
};
515+
self
516+
.inspector_replay_workflow(instance, body.entry_id)
517+
.await
518+
.and_then(|(is_workflow_enabled, history)| {
519+
json_http_response(
520+
StatusCode::OK,
521+
&json!({
522+
"history": history,
523+
"isWorkflowEnabled": is_workflow_enabled,
524+
}),
525+
)
526+
})
527+
}
485528
(http::Method::GET, "/inspector/traces") => json_http_response(
486529
StatusCode::OK,
487530
&json!({
@@ -594,18 +637,63 @@ impl RegistryDispatcher {
594637
.inspect_messages()
595638
.await
596639
.context("list queue messages for inspector summary")?;
640+
let (is_workflow_enabled, workflow_history) = self
641+
.inspector_workflow_history(instance)
642+
.await
643+
.context("load inspector workflow summary")?;
597644
Ok(InspectorSummaryJson {
598645
state: decode_cbor_json_or_null(&instance.ctx.state()),
599646
is_state_enabled: true,
600647
connections: inspector_connections(&instance.ctx),
601648
rpcs: inspector_rpcs(instance),
602649
queue_size: queue_messages.len().try_into().unwrap_or(u32::MAX),
603650
is_database_enabled: instance.ctx.sql().runtime_config().is_ok(),
604-
is_workflow_enabled: false,
605-
workflow_history: None,
651+
is_workflow_enabled,
652+
workflow_history,
606653
})
607654
}
608655

656+
async fn inspector_workflow_history(
657+
&self,
658+
instance: &ActiveActorInstance,
659+
) -> Result<(bool, Option<JsonValue>)> {
660+
let is_workflow_enabled = instance.inspector.is_workflow_enabled();
661+
if !is_workflow_enabled {
662+
return Ok((false, None));
663+
}
664+
665+
let history = instance
666+
.inspector
667+
.get_workflow_history()
668+
.await
669+
.context("load inspector workflow history")?
670+
.map(|payload| decode_cbor_json_or_null(&payload))
671+
.filter(|value| !value.is_null());
672+
673+
Ok((true, history))
674+
}
675+
676+
async fn inspector_replay_workflow(
677+
&self,
678+
instance: &ActiveActorInstance,
679+
entry_id: Option<String>,
680+
) -> Result<(bool, Option<JsonValue>)> {
681+
let is_workflow_enabled = instance.inspector.is_workflow_enabled();
682+
if !is_workflow_enabled {
683+
return Ok((false, None));
684+
}
685+
686+
let history = instance
687+
.inspector
688+
.replay_workflow(entry_id)
689+
.await
690+
.context("replay inspector workflow history")?
691+
.map(|payload| decode_cbor_json_or_null(&payload))
692+
.filter(|value| !value.is_null());
693+
694+
Ok((true, history))
695+
}
696+
609697
async fn inspector_database_schema(&self, ctx: &ActorContext) -> Result<JsonValue> {
610698
let tables = decode_cbor_json_or_null(
611699
&ctx
@@ -1257,6 +1345,57 @@ fn inspector_connections(ctx: &ActorContext) -> Vec<InspectorConnectionJson> {
12571345
.collect()
12581346
}
12591347

1348+
fn build_actor_inspector(
1349+
ctx: ActorContext,
1350+
callbacks: Arc<crate::actor::callbacks::ActorInstanceCallbacks>,
1351+
) -> Inspector {
1352+
let get_workflow_history = callbacks.get_workflow_history.as_ref().map(|_| {
1353+
let callbacks = callbacks.clone();
1354+
let ctx = ctx.clone();
1355+
Arc::new(move || -> futures::future::BoxFuture<'static, Result<Option<Vec<u8>>>> {
1356+
let callbacks = callbacks.clone();
1357+
let ctx = ctx.clone();
1358+
Box::pin(async move {
1359+
let Some(callback) = callbacks.get_workflow_history.as_ref() else {
1360+
return Ok(None);
1361+
};
1362+
callback(GetWorkflowHistoryRequest { ctx }).await
1363+
})
1364+
}) as Arc<
1365+
dyn Fn() -> futures::future::BoxFuture<'static, Result<Option<Vec<u8>>>>
1366+
+ Send
1367+
+ Sync,
1368+
>
1369+
});
1370+
let replay_workflow = callbacks.replay_workflow.as_ref().map(|_| {
1371+
let callbacks = callbacks.clone();
1372+
let ctx = ctx.clone();
1373+
Arc::new(
1374+
move |entry_id: Option<String>| -> futures::future::BoxFuture<
1375+
'static,
1376+
Result<Option<Vec<u8>>>,
1377+
> {
1378+
let callbacks = callbacks.clone();
1379+
let ctx = ctx.clone();
1380+
Box::pin(async move {
1381+
let Some(callback) = callbacks.replay_workflow.as_ref() else {
1382+
return Ok(None);
1383+
};
1384+
callback(ReplayWorkflowRequest { ctx, entry_id }).await
1385+
})
1386+
},
1387+
) as Arc<
1388+
dyn Fn(
1389+
Option<String>,
1390+
) -> futures::future::BoxFuture<'static, Result<Option<Vec<u8>>>>
1391+
+ Send
1392+
+ Sync,
1393+
>
1394+
});
1395+
1396+
Inspector::with_workflow_callbacks(get_workflow_history, replay_workflow)
1397+
}
1398+
12601399
fn inspector_rpcs(instance: &ActiveActorInstance) -> Vec<String> {
12611400
let mut rpcs: Vec<String> = instance.callbacks.actions.keys().cloned().collect();
12621401
rpcs.sort();

0 commit comments

Comments
 (0)