Skip to content

Commit d1095c3

Browse files
committed
fix(rivetkit): inspector reports actual config state + real queue messages
1 parent 32b9abd commit d1095c3

11 files changed

Lines changed: 127 additions & 23 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ pub struct ActionDefinition {
6262
pub struct ActorConfig {
6363
pub name: Option<String>,
6464
pub icon: Option<String>,
65+
/// Whether the user declared a SQLite database for this actor (`db({...})`
66+
/// on the TS side). Gates the inspector database tab.
67+
pub has_database: bool,
68+
/// Whether the user declared actor state (`state: ...` or `createState`).
69+
/// Gates the inspector state tab and state-subscription messages.
70+
pub has_state: bool,
6571
pub can_hibernate_websocket: CanHibernateWebSocket,
6672
pub state_save_interval: Duration,
6773
pub create_vars_timeout: Duration,
@@ -96,6 +102,8 @@ pub struct ActorConfig {
96102
pub struct ActorConfigInput {
97103
pub name: Option<String>,
98104
pub icon: Option<String>,
105+
pub has_database: Option<bool>,
106+
pub has_state: Option<bool>,
99107
pub can_hibernate_websocket: Option<bool>,
100108
pub state_save_interval_ms: Option<u32>,
101109
pub create_vars_timeout_ms: Option<u32>,
@@ -124,6 +132,8 @@ impl ActorConfig {
124132
let mut actor_config = Self {
125133
name: config.name,
126134
icon: config.icon,
135+
has_database: config.has_database.unwrap_or(false),
136+
has_state: config.has_state.unwrap_or(false),
127137
..Self::default()
128138
};
129139
if let Some(can_hibernate_websocket) = config.can_hibernate_websocket {
@@ -227,6 +237,8 @@ impl Default for ActorConfig {
227237
Self {
228238
name: None,
229239
icon: None,
240+
has_database: false,
241+
has_state: false,
230242
can_hibernate_websocket: CanHibernateWebSocket::default(),
231243
state_save_interval: DEFAULT_STATE_SAVE_INTERVAL,
232244
create_vars_timeout: DEFAULT_CREATE_VARS_TIMEOUT,

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,10 @@ impl ActorContext {
521521
&self.0.region
522522
}
523523

524+
pub fn has_state(&self) -> bool {
525+
self.0.connection_config.read().has_state
526+
}
527+
524528
#[doc(hidden)]
525529
pub fn record_startup_create_state(&self, duration: Duration) {
526530
self.0.metrics.observe_create_state(duration);

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,12 +530,12 @@ impl ActorContext {
530530
})
531531
}
532532

533-
pub(crate) async fn inspect_messages(&self) -> Result<Vec<QueueMessage>> {
533+
pub async fn inspect_messages(&self) -> Result<Vec<QueueMessage>> {
534534
self.ensure_initialized().await?;
535535
self.list_messages().await
536536
}
537537

538-
pub(crate) fn max_size(&self) -> u32 {
538+
pub fn max_size(&self) -> u32 {
539539
self.config().max_queue_size
540540
}
541541

rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ pub struct SqliteDb {
7878
handle: Option<EnvoyHandle>,
7979
actor_id: Option<String>,
8080
startup_data: Option<protocol::SqliteStartupData>,
81+
/// Mirrors the user's actor-config `db({...})` declaration. The envoy
82+
/// always sets up sqlite storage under the hood, so handle/actor_id are
83+
/// not a reliable signal for whether the user opted in; this flag is.
84+
enabled: bool,
8185
#[cfg(feature = "sqlite")]
8286
// Forced-sync: native SQLite handles are used inside spawn_blocking and
8387
// synchronous diagnostic accessors.
@@ -89,16 +93,22 @@ impl SqliteDb {
8993
handle: EnvoyHandle,
9094
actor_id: impl Into<String>,
9195
startup_data: Option<protocol::SqliteStartupData>,
96+
enabled: bool,
9297
) -> Self {
9398
Self {
9499
handle: Some(handle),
95100
actor_id: Some(actor_id.into()),
96101
startup_data,
102+
enabled,
97103
#[cfg(feature = "sqlite")]
98104
db: Default::default(),
99105
}
100106
}
101107

108+
pub fn is_enabled(&self) -> bool {
109+
self.enabled
110+
}
111+
102112
pub async fn get_pages(
103113
&self,
104114
request: protocol::SqliteGetPagesRequest,

rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl RegistryDispatcher {
4444
StatusCode::OK,
4545
&json!({
4646
"state": decode_cbor_json_or_null(&instance.ctx.state()),
47-
"isStateEnabled": true,
47+
"isStateEnabled": instance.ctx.has_state(),
4848
}),
4949
),
5050
(http::Method::PATCH, "/inspector/state") => {
@@ -277,11 +277,11 @@ impl RegistryDispatcher {
277277
.context("load inspector workflow summary")?;
278278
Ok(InspectorSummaryJson {
279279
state: decode_cbor_json_or_null(&instance.ctx.state()),
280-
is_state_enabled: true,
280+
is_state_enabled: instance.ctx.has_state(),
281281
connections: inspector_connections(&instance.ctx),
282282
rpcs: inspector_rpcs(instance),
283283
queue_size: queue_messages.len().try_into().unwrap_or(u32::MAX),
284-
is_database_enabled: instance.ctx.sql().runtime_config().is_ok(),
284+
is_database_enabled: instance.ctx.sql().is_enabled(),
285285
workflow_supported,
286286
workflow_history,
287287
})

rivetkit-rust/packages/rivetkit-core/src/registry/inspector_ws.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ use super::inspector::*;
44
use super::websocket::{closing_websocket_handler, websocket_inspector_token};
55
use super::*;
66

7+
/// Aborts the wrapped task on drop. Ensures the overlay task cannot outlive
8+
/// the websocket handler even if `on_close` never fires (for example when the
9+
/// handler is dropped due to actor teardown rather than a clean close frame).
10+
struct AbortOnDropTask(JoinHandle<()>);
11+
12+
impl Drop for AbortOnDropTask {
13+
fn drop(&mut self) {
14+
self.0.abort();
15+
}
16+
}
17+
718
impl RegistryDispatcher {
819
pub(super) async fn handle_inspector_websocket(
920
self: &Arc<Self>,
@@ -32,7 +43,7 @@ impl RegistryDispatcher {
3243
// Forced-sync: inspector websocket slots are filled/cleared inside
3344
// synchronous callback setup/teardown and moved out before awaiting.
3445
let subscription_slot = Arc::new(Mutex::new(None::<InspectorSubscription>));
35-
let overlay_task_slot = Arc::new(Mutex::new(None::<JoinHandle<()>>));
46+
let overlay_task_slot = Arc::new(Mutex::new(None::<AbortOnDropTask>));
3647
let attach_guard_slot = Arc::new(Mutex::new(None::<InspectorAttachGuard>));
3748
let on_open_instance = instance.clone();
3849
let on_open_dispatcher = dispatcher.clone();
@@ -64,9 +75,7 @@ impl RegistryDispatcher {
6475
let mut guard = slot.lock();
6576
guard.take();
6677
let mut overlay_guard = overlay_slot.lock();
67-
if let Some(task) = overlay_guard.take() {
68-
task.abort();
69-
}
78+
overlay_guard.take();
7079
let mut attach_guard = attach_slot.lock();
7180
attach_guard.take();
7281
})
@@ -146,7 +155,7 @@ impl RegistryDispatcher {
146155
}
147156
});
148157
let mut overlay_guard = on_open_overlay_slot.lock();
149-
*overlay_guard = Some(overlay_task);
158+
*overlay_guard = Some(AbortOnDropTask(overlay_task));
150159

151160
let listener_dispatcher = on_open_dispatcher.clone();
152161
let listener_instance = on_open_instance.clone();
@@ -356,9 +365,9 @@ impl RegistryDispatcher {
356365
inspector_protocol::InitMessage {
357366
connections: inspector_wire_connections(&instance.ctx),
358367
state: Some(instance.ctx.state()),
359-
is_state_enabled: true,
368+
is_state_enabled: instance.ctx.has_state(),
360369
rpcs: inspector_rpcs(instance),
361-
is_database_enabled: instance.ctx.sql().runtime_config().is_ok(),
370+
is_database_enabled: instance.ctx.sql().is_enabled(),
362371
queue_size: serde_bare::Uint(queue_size),
363372
workflow_history,
364373
is_workflow_enabled: workflow_supported,
@@ -374,7 +383,7 @@ impl RegistryDispatcher {
374383
inspector_protocol::StateResponse {
375384
rid,
376385
state: Some(instance.ctx.state()),
377-
is_state_enabled: true,
386+
is_state_enabled: instance.ctx.has_state(),
378387
}
379388
}
380389

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,12 @@ impl RegistryDispatcher {
907907
self.region.clone(),
908908
factory.config().clone(),
909909
Kv::new(handle.clone(), actor_id.to_owned()),
910-
SqliteDb::new(handle.clone(), actor_id.to_owned(), sqlite_startup_data),
910+
SqliteDb::new(
911+
handle.clone(),
912+
actor_id.to_owned(),
913+
sqlite_startup_data,
914+
factory.config().has_database,
915+
),
911916
);
912917
ctx.configure_envoy(handle, Some(generation));
913918
ctx

rivetkit-typescript/packages/rivetkit-napi/index.d.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export interface JsActorConfig {
5151
name?: string
5252
icon?: string
5353
hasDatabase?: boolean
54+
hasState?: boolean
5455
canHibernateWebsocket?: boolean
5556
stateSaveIntervalMs?: number
5657
createStateTimeoutMs?: number
@@ -127,6 +128,15 @@ export interface JsQueueTryNextBatchOptions {
127128
count?: number
128129
completable?: boolean
129130
}
131+
export interface JsQueueInspectMessage {
132+
/**
133+
* Queue message id. Stored as the raw u64 reinterpreted as i64 so JS
134+
* sees a plain number; ids are monotonic and fit comfortably in i64.
135+
*/
136+
id: number
137+
name: string
138+
createdAtMs: number
139+
}
130140
export interface JsServeConfig {
131141
version: number
132142
endpoint: string
@@ -265,6 +275,8 @@ export declare class Queue {
265275
enqueueAndWait(name: string, body: Buffer, options?: JsQueueEnqueueAndWaitOptions | undefined | null, signal?: CancellationToken | undefined | null): Promise<Buffer | null>
266276
tryNext(options?: JsQueueTryNextOptions | undefined | null): QueueMessage | null
267277
tryNextBatch(options?: JsQueueTryNextBatchOptions | undefined | null): Array<QueueMessage>
278+
maxSize(): number
279+
inspectMessages(): Promise<Array<JsQueueInspectMessage>>
268280
}
269281
export declare class QueueMessage {
270282
id(): bigint

rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub struct JsActorConfig {
6666
pub name: Option<String>,
6767
pub icon: Option<String>,
6868
pub has_database: Option<bool>,
69+
pub has_state: Option<bool>,
6970
pub can_hibernate_websocket: Option<bool>,
7071
pub state_save_interval_ms: Option<u32>,
7172
pub create_state_timeout_ms: Option<u32>,
@@ -1036,6 +1037,7 @@ impl From<JsActorConfig> for ActorConfigInput {
10361037
name: value.name,
10371038
icon: value.icon,
10381039
has_database: value.has_database,
1040+
has_state: value.has_state,
10391041
can_hibernate_websocket: value.can_hibernate_websocket,
10401042
state_save_interval_ms: value.state_save_interval_ms,
10411043
create_vars_timeout_ms: value.create_vars_timeout_ms,

rivetkit-typescript/packages/rivetkit-napi/src/queue.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,42 @@ impl Queue {
209209
.map(|messages| messages.into_iter().map(QueueMessage::from_core).collect())
210210
.map_err(napi_anyhow_error)
211211
}
212+
213+
#[napi]
214+
pub fn max_size(&self) -> u32 {
215+
self.inner.max_size()
216+
}
217+
218+
#[napi]
219+
pub async fn inspect_messages(&self) -> napi::Result<Vec<JsQueueInspectMessage>> {
220+
self.inner
221+
.inspect_messages()
222+
.await
223+
.map(|messages| {
224+
messages
225+
.into_iter()
226+
.map(|m| JsQueueInspectMessage {
227+
id: u64_to_i64(m.id),
228+
name: m.name,
229+
created_at_ms: m.created_at,
230+
})
231+
.collect()
232+
})
233+
.map_err(napi_anyhow_error)
234+
}
235+
}
236+
237+
#[napi(object)]
238+
pub struct JsQueueInspectMessage {
239+
/// Queue message id. Stored as the raw u64 reinterpreted as i64 so JS
240+
/// sees a plain number; ids are monotonic and fit comfortably in i64.
241+
pub id: i64,
242+
pub name: String,
243+
pub created_at_ms: i64,
244+
}
245+
246+
fn u64_to_i64(value: u64) -> i64 {
247+
i64::try_from(value).unwrap_or(i64::MAX)
212248
}
213249

214250
#[napi]

0 commit comments

Comments
 (0)