Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ pub struct ActionDefinition {
pub struct ActorConfig {
pub name: Option<String>,
pub icon: Option<String>,
/// Whether the user declared a SQLite database for this actor (`db({...})`
/// on the TS side). Gates the inspector database tab.
pub has_database: bool,
/// Whether the user declared actor state (`state: ...` or `createState`).
/// Gates the inspector state tab and state-subscription messages.
pub has_state: bool,
pub can_hibernate_websocket: CanHibernateWebSocket,
pub state_save_interval: Duration,
pub create_vars_timeout: Duration,
Expand Down Expand Up @@ -96,6 +102,8 @@ pub struct ActorConfig {
pub struct ActorConfigInput {
pub name: Option<String>,
pub icon: Option<String>,
pub has_database: Option<bool>,
pub has_state: Option<bool>,
pub can_hibernate_websocket: Option<bool>,
pub state_save_interval_ms: Option<u32>,
pub create_vars_timeout_ms: Option<u32>,
Expand Down Expand Up @@ -124,6 +132,8 @@ impl ActorConfig {
let mut actor_config = Self {
name: config.name,
icon: config.icon,
has_database: config.has_database.unwrap_or(false),
has_state: config.has_state.unwrap_or(false),
..Self::default()
};
if let Some(can_hibernate_websocket) = config.can_hibernate_websocket {
Expand Down Expand Up @@ -227,6 +237,8 @@ impl Default for ActorConfig {
Self {
name: None,
icon: None,
has_database: false,
has_state: false,
can_hibernate_websocket: CanHibernateWebSocket::default(),
state_save_interval: DEFAULT_STATE_SAVE_INTERVAL,
create_vars_timeout: DEFAULT_CREATE_VARS_TIMEOUT,
Expand Down
4 changes: 4 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ impl ActorContext {
&self.0.region
}

pub fn has_state(&self) -> bool {
self.0.connection_config.read().has_state
}

#[doc(hidden)]
pub fn record_startup_create_state(&self, duration: Duration) {
self.0.metrics.observe_create_state(duration);
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,12 @@ impl ActorContext {
})
}

pub(crate) async fn inspect_messages(&self) -> Result<Vec<QueueMessage>> {
pub async fn inspect_messages(&self) -> Result<Vec<QueueMessage>> {
self.ensure_initialized().await?;
self.list_messages().await
}

pub(crate) fn max_size(&self) -> u32 {
pub fn max_size(&self) -> u32 {
self.config().max_queue_size
}

Expand Down
10 changes: 10 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub struct SqliteDb {
handle: Option<EnvoyHandle>,
actor_id: Option<String>,
startup_data: Option<protocol::SqliteStartupData>,
/// Mirrors the user's actor-config `db({...})` declaration. The envoy
/// always sets up sqlite storage under the hood, so handle/actor_id are
/// not a reliable signal for whether the user opted in; this flag is.
enabled: bool,
#[cfg(feature = "sqlite")]
// Forced-sync: native SQLite handles are used inside spawn_blocking and
// synchronous diagnostic accessors.
Expand All @@ -89,16 +93,22 @@ impl SqliteDb {
handle: EnvoyHandle,
actor_id: impl Into<String>,
startup_data: Option<protocol::SqliteStartupData>,
enabled: bool,
) -> Self {
Self {
handle: Some(handle),
actor_id: Some(actor_id.into()),
startup_data,
enabled,
#[cfg(feature = "sqlite")]
db: Default::default(),
}
}

pub fn is_enabled(&self) -> bool {
self.enabled
}

pub async fn get_pages(
&self,
request: protocol::SqliteGetPagesRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl RegistryDispatcher {
StatusCode::OK,
&json!({
"state": decode_cbor_json_or_null(&instance.ctx.state()),
"isStateEnabled": true,
"isStateEnabled": instance.ctx.has_state(),
}),
),
(http::Method::PATCH, "/inspector/state") => {
Expand Down Expand Up @@ -277,11 +277,11 @@ impl RegistryDispatcher {
.context("load inspector workflow summary")?;
Ok(InspectorSummaryJson {
state: decode_cbor_json_or_null(&instance.ctx.state()),
is_state_enabled: true,
is_state_enabled: instance.ctx.has_state(),
connections: inspector_connections(&instance.ctx),
rpcs: inspector_rpcs(instance),
queue_size: queue_messages.len().try_into().unwrap_or(u32::MAX),
is_database_enabled: instance.ctx.sql().runtime_config().is_ok(),
is_database_enabled: instance.ctx.sql().is_enabled(),
workflow_supported,
workflow_history,
})
Expand Down
25 changes: 17 additions & 8 deletions rivetkit-rust/packages/rivetkit-core/src/registry/inspector_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ use super::inspector::*;
use super::websocket::{closing_websocket_handler, websocket_inspector_token};
use super::*;

/// Aborts the wrapped task on drop. Ensures the overlay task cannot outlive
/// the websocket handler even if `on_close` never fires (for example when the
/// handler is dropped due to actor teardown rather than a clean close frame).
struct AbortOnDropTask(JoinHandle<()>);

impl Drop for AbortOnDropTask {
fn drop(&mut self) {
self.0.abort();
}
}

impl RegistryDispatcher {
pub(super) async fn handle_inspector_websocket(
self: &Arc<Self>,
Expand Down Expand Up @@ -32,7 +43,7 @@ impl RegistryDispatcher {
// Forced-sync: inspector websocket slots are filled/cleared inside
// synchronous callback setup/teardown and moved out before awaiting.
let subscription_slot = Arc::new(Mutex::new(None::<InspectorSubscription>));
let overlay_task_slot = Arc::new(Mutex::new(None::<JoinHandle<()>>));
let overlay_task_slot = Arc::new(Mutex::new(None::<AbortOnDropTask>));
let attach_guard_slot = Arc::new(Mutex::new(None::<InspectorAttachGuard>));
let on_open_instance = instance.clone();
let on_open_dispatcher = dispatcher.clone();
Expand Down Expand Up @@ -64,9 +75,7 @@ impl RegistryDispatcher {
let mut guard = slot.lock();
guard.take();
let mut overlay_guard = overlay_slot.lock();
if let Some(task) = overlay_guard.take() {
task.abort();
}
overlay_guard.take();
let mut attach_guard = attach_slot.lock();
attach_guard.take();
})
Expand Down Expand Up @@ -146,7 +155,7 @@ impl RegistryDispatcher {
}
});
let mut overlay_guard = on_open_overlay_slot.lock();
*overlay_guard = Some(overlay_task);
*overlay_guard = Some(AbortOnDropTask(overlay_task));

let listener_dispatcher = on_open_dispatcher.clone();
let listener_instance = on_open_instance.clone();
Expand Down Expand Up @@ -356,9 +365,9 @@ impl RegistryDispatcher {
inspector_protocol::InitMessage {
connections: inspector_wire_connections(&instance.ctx),
state: Some(instance.ctx.state()),
is_state_enabled: true,
is_state_enabled: instance.ctx.has_state(),
rpcs: inspector_rpcs(instance),
is_database_enabled: instance.ctx.sql().runtime_config().is_ok(),
is_database_enabled: instance.ctx.sql().is_enabled(),
queue_size: serde_bare::Uint(queue_size),
workflow_history,
is_workflow_enabled: workflow_supported,
Expand All @@ -374,7 +383,7 @@ impl RegistryDispatcher {
inspector_protocol::StateResponse {
rid,
state: Some(instance.ctx.state()),
is_state_enabled: true,
is_state_enabled: instance.ctx.has_state(),
}
}

Expand Down
7 changes: 6 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,12 @@ impl RegistryDispatcher {
self.region.clone(),
factory.config().clone(),
Kv::new(handle.clone(), actor_id.to_owned()),
SqliteDb::new(handle.clone(), actor_id.to_owned(), sqlite_startup_data),
SqliteDb::new(
handle.clone(),
actor_id.to_owned(),
sqlite_startup_data,
factory.config().has_database,
),
);
ctx.configure_envoy(handle, Some(generation));
ctx
Expand Down
12 changes: 12 additions & 0 deletions rivetkit-typescript/packages/rivetkit-napi/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* tslint:disable */

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-napi/index.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
/* eslint-disable */

/* auto-generated by NAPI-RS */
Expand Down Expand Up @@ -51,6 +51,7 @@
name?: string
icon?: string
hasDatabase?: boolean
hasState?: boolean
canHibernateWebsocket?: boolean
stateSaveIntervalMs?: number
createStateTimeoutMs?: number
Expand Down Expand Up @@ -127,6 +128,15 @@
count?: number
completable?: boolean
}
export interface JsQueueInspectMessage {
/**
* Queue message id. Stored as the raw u64 reinterpreted as i64 so JS
* sees a plain number; ids are monotonic and fit comfortably in i64.
*/
id: number
name: string
createdAtMs: number
}
export interface JsServeConfig {
version: number
endpoint: string
Expand Down Expand Up @@ -265,6 +275,8 @@
enqueueAndWait(name: string, body: Buffer, options?: JsQueueEnqueueAndWaitOptions | undefined | null, signal?: CancellationToken | undefined | null): Promise<Buffer | null>
tryNext(options?: JsQueueTryNextOptions | undefined | null): QueueMessage | null
tryNextBatch(options?: JsQueueTryNextBatchOptions | undefined | null): Array<QueueMessage>
maxSize(): number
inspectMessages(): Promise<Array<JsQueueInspectMessage>>
}
export declare class QueueMessage {
id(): bigint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct JsActorConfig {
pub name: Option<String>,
pub icon: Option<String>,
pub has_database: Option<bool>,
pub has_state: Option<bool>,
pub can_hibernate_websocket: Option<bool>,
pub state_save_interval_ms: Option<u32>,
pub create_state_timeout_ms: Option<u32>,
Expand Down Expand Up @@ -1036,6 +1037,7 @@ impl From<JsActorConfig> for ActorConfigInput {
name: value.name,
icon: value.icon,
has_database: value.has_database,
has_state: value.has_state,
can_hibernate_websocket: value.can_hibernate_websocket,
state_save_interval_ms: value.state_save_interval_ms,
create_vars_timeout_ms: value.create_vars_timeout_ms,
Expand Down
36 changes: 36 additions & 0 deletions rivetkit-typescript/packages/rivetkit-napi/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,42 @@ impl Queue {
.map(|messages| messages.into_iter().map(QueueMessage::from_core).collect())
.map_err(napi_anyhow_error)
}

#[napi]
pub fn max_size(&self) -> u32 {
self.inner.max_size()
}

#[napi]
pub async fn inspect_messages(&self) -> napi::Result<Vec<JsQueueInspectMessage>> {
self.inner
.inspect_messages()
.await
.map(|messages| {
messages
.into_iter()
.map(|m| JsQueueInspectMessage {
id: u64_to_i64(m.id),
name: m.name,
created_at_ms: m.created_at,
})
.collect()
})
.map_err(napi_anyhow_error)
}
}

#[napi(object)]
pub struct JsQueueInspectMessage {
/// Queue message id. Stored as the raw u64 reinterpreted as i64 so JS
/// sees a plain number; ids are monotonic and fit comfortably in i64.
pub id: i64,
pub name: String,
pub created_at_ms: i64,
}

fn u64_to_i64(value: u64) -> i64 {
i64::try_from(value).unwrap_or(i64::MAX)
}

#[napi]
Expand Down
32 changes: 23 additions & 9 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3047,6 +3047,9 @@ function buildActorConfig(
name: options.name as string | undefined,
icon: options.icon as string | undefined,
hasDatabase: config.db !== undefined,
hasState:
config.state !== undefined ||
typeof config.createState === "function",
canHibernateWebsocket:
typeof canHibernate === "boolean" ? canHibernate : undefined,
stateSaveIntervalMs: options.stateSaveInterval as number | undefined,
Expand Down Expand Up @@ -3289,16 +3292,27 @@ export function buildNativeFactory(
url.pathname === "/inspector/queue" &&
jsRequest.method === "GET"
) {
const inspectorSnapshot = callNativeSync(() =>
ctx.inspectorSnapshot(),
);
const limitParam = url.searchParams.get("limit");
const parsedLimit = limitParam ? Number(limitParam) : 100;
const limit =
Number.isFinite(parsedLimit) && parsedLimit > 0
? Math.floor(parsedLimit)
: 100;
const queue = ctx.queue();
const allMessages = await queue.inspectMessages();
const truncated = allMessages.length > limit;
const messages = allMessages
.slice(0, limit)
.map((m) => ({
id: m.id,
name: m.name,
createdAtMs: m.createdAtMs,
}));
return jsonResponse({
size: inspectorSnapshot.queueSize,
maxSize:
(config.options.maxQueueSize as number | undefined) ??
1000,
truncated: false,
messages: [],
size: allMessages.length,
maxSize: queue.maxSize(),
truncated,
messages,
});
}
if (
Expand Down
Loading