Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub struct ActorConfigOverrides {
pub wait_until_timeout: Option<Duration>,
}

#[derive(Clone, Debug)]
pub struct ActionDefinition {
pub name: String,
}

#[derive(Clone, Debug)]
pub struct ActorConfig {
pub name: Option<String>,
Expand Down Expand Up @@ -83,6 +88,7 @@ pub struct ActorConfig {
pub preload_max_workflow_bytes: Option<u64>,
pub preload_max_connections_bytes: Option<u64>,
pub overrides: Option<ActorConfigOverrides>,
pub actions: Vec<ActionDefinition>,
}

/// Sparse, serialization-friendly actor configuration. All fields are optional with millisecond integers instead of Duration. Used at runtime boundaries (NAPI, config files). Convert to ActorConfig via ActorConfig::from_input().
Expand Down Expand Up @@ -110,6 +116,7 @@ pub struct ActorConfigInput {
pub max_outgoing_message_size: Option<u32>,
pub preload_max_workflow_bytes: Option<f64>,
pub preload_max_connections_bytes: Option<f64>,
pub actions: Option<Vec<ActionDefinition>>,
}

impl ActorConfig {
Expand Down Expand Up @@ -180,6 +187,9 @@ impl ActorConfig {
actor_config.preload_max_connections_bytes = config
.preload_max_connections_bytes
.map(|value| value as u64);
if let Some(actions) = config.actions {
actor_config.actions = actions;
}

actor_config
}
Expand Down Expand Up @@ -243,6 +253,7 @@ impl Default for ActorConfig {
preload_max_workflow_bytes: None,
preload_max_connections_bytes: None,
overrides: None,
actions: Vec::new(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod task_types;
pub(crate) mod work_registry;

pub use action::ActionDispatchError;
pub use config::{ActorConfig, ActorConfigOverrides, CanHibernateWebSocket};
pub use config::{ActionDefinition, ActorConfig, ActorConfigOverrides, CanHibernateWebSocket};
pub use connection::ConnHandle;
pub use context::{ActorContext, WebSocketCallbackRegion};
pub use factory::{ActorEntryFn, ActorFactory};
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-rust/packages/rivetkit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use actor::{kv, sqlite};

pub use actor::action::ActionDispatchError;
pub use actor::config::{
ActorConfig, ActorConfigInput, ActorConfigOverrides, CanHibernateWebSocket,
ActionDefinition, ActorConfig, ActorConfigInput, ActorConfigOverrides, CanHibernateWebSocket,
};
pub use actor::connection::ConnHandle;
pub use actor::context::{ActorContext, WebSocketCallbackRegion};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,13 @@ pub(super) fn build_actor_inspector() -> Inspector {
}

pub(super) fn inspector_rpcs(instance: &ActorTaskHandle) -> Vec<String> {
let _ = instance;
Vec::new()
instance
.factory
.config()
.actions
.iter()
.map(|action| action.name.clone())
.collect()
}

pub(super) fn inspector_request_url(request: &Request) -> Result<Url> {
Expand Down
4 changes: 4 additions & 0 deletions rivetkit-typescript/packages/rivetkit-napi/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* tslint:disable */

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-napi/index.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
/* eslint-disable */

/* auto-generated by NAPI-RS */
Expand Down Expand Up @@ -44,6 +44,9 @@
status: string
response?: Buffer
}
export interface JsActionDefinition {
name: string
}
export interface JsActorConfig {
name?: string
icon?: string
Expand Down Expand Up @@ -72,6 +75,7 @@
maxOutgoingMessageSize?: number
preloadMaxWorkflowBytes?: number
preloadMaxConnectionsBytes?: number
actions?: Array<JsActionDefinition>
}
export interface JsBindParam {
kind: string
Expand Down
15 changes: 14 additions & 1 deletion rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use napi::{Env, JsFunction, JsObject};
use napi_derive::napi;
use rivet_error::{MacroMarker, RivetError, RivetErrorSchema};
use rivetkit_core::{
ActorConfig, ActorConfigInput, ActorContext as CoreActorContext,
ActionDefinition, ActorConfig, ActorConfigInput, ActorContext as CoreActorContext,
ActorFactory as CoreActorFactory, ConnHandle as CoreConnHandle, Request, Response,
WebSocket as CoreWebSocket,
};
Expand Down Expand Up @@ -54,6 +54,12 @@ pub struct JsQueueSendResult {
pub response: Option<Buffer>,
}

#[napi(object)]
#[derive(Clone, Default)]
pub struct JsActionDefinition {
pub name: String,
}

#[napi(object)]
#[derive(Clone, Default)]
pub struct JsActorConfig {
Expand Down Expand Up @@ -84,6 +90,7 @@ pub struct JsActorConfig {
pub max_outgoing_message_size: Option<u32>,
pub preload_max_workflow_bytes: Option<f64>,
pub preload_max_connections_bytes: Option<f64>,
pub actions: Option<Vec<JsActionDefinition>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -1048,6 +1055,12 @@ impl From<JsActorConfig> for ActorConfigInput {
max_outgoing_message_size: value.max_outgoing_message_size,
preload_max_workflow_bytes: value.preload_max_workflow_bytes,
preload_max_connections_bytes: value.preload_max_connections_bytes,
actions: value.actions.map(|actions| {
actions
.into_iter()
.map(|action| ActionDefinition { name: action.name })
.collect()
}),
}
}
}
Expand Down
130 changes: 22 additions & 108 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3083,6 +3083,11 @@ function buildActorConfig(
preloadMaxConnectionsBytes: options.preloadMaxConnectionsBytes as
| number
| undefined,
actions: Object.keys(
(config.actions ?? {}) as Record<string, unknown>,
)
.sort()
.map((name) => ({ name })),
};
}

Expand Down Expand Up @@ -3230,106 +3235,6 @@ export function buildNativeFactory(
);
const workflowState = async () =>
(await getNativeWorkflowInspector(ctx)?.getState?.()) ?? null;
const metricsResponse = (actorCtx: NativeActorContextAdapter) => {
const sqliteMetrics =
databaseProvider !== undefined
? (actorCtx.sql.getSqliteVfsMetrics?.() ?? null)
: null;
const commitCount =
databaseProvider === undefined
? 0
: Math.max(sqliteMetrics?.commitCount ?? 0, 1);
const nsToMs = (ns: number) => ns / 1_000_000;
const phaseMs = (ns: number) =>
commitCount > 0 ? Math.max(nsToMs(ns), 0.001) : 0;
return {
kv_operations: {
type: "labeled_timing",
help: "KV round trips by operation type",
values: {
get: { calls: 0, totalMs: 0, keys: 0 },
getBatch: { calls: 0, totalMs: 0, keys: 0 },
put: { calls: 0, totalMs: 0, keys: 0 },
putBatch: { calls: 0, totalMs: 0, keys: 0 },
deleteBatch: { calls: 0, totalMs: 0, keys: 0 },
},
},
sqlite_commit_phases: {
type: "labeled_timing",
help: "SQLite VFS commit phase totals captured by the native VFS",
values: {
request_build: {
calls: commitCount,
totalMs: phaseMs(
sqliteMetrics?.requestBuildNs ?? 0,
),
keys: 0,
},
serialize: {
calls: commitCount,
totalMs: phaseMs(sqliteMetrics?.serializeNs ?? 0),
keys: 0,
},
transport: {
calls: commitCount,
totalMs: phaseMs(sqliteMetrics?.transportNs ?? 0),
keys: 0,
},
state_update: {
calls: commitCount,
totalMs: phaseMs(sqliteMetrics?.stateUpdateNs ?? 0),
keys: 0,
},
},
},
startup_total_ms: {
type: "gauge",
help: "Total actor startup time in milliseconds",
value: 1,
},
startup_kv_round_trips: {
type: "gauge",
help: "KV round-trips during startup",
value: 0,
},
startup_is_new: {
type: "gauge",
help: "1 if new actor, 0 if existing",
value: 0,
},
startup_internal_load_state_ms: {
type: "gauge",
help: "Time to load persisted state",
value: 0,
},
startup_internal_init_queue_ms: {
type: "gauge",
help: "Time to initialize queue state",
value: 0,
},
startup_internal_init_inspector_token_ms: {
type: "gauge",
help: "Time to initialize inspector token state",
value: 0,
},
startup_user_create_vars_ms: {
type: "gauge",
help: "Time spent running createVars",
value: 0,
},
startup_user_on_wake_ms: {
type: "gauge",
help: "Time spent running onWake",
value: 0,
},
startup_user_create_state_ms: {
type: "gauge",
help: "Time spent running createState",
value: 0,
},
};
};

const actorCtx = makeActorCtx(ctx, jsRequest);
try {
if (
Expand Down Expand Up @@ -3591,12 +3496,6 @@ export function buildNativeFactory(
workflowHistory: workflowHistory(),
});
}
if (
url.pathname === "/inspector/metrics" &&
jsRequest.method === "GET"
) {
return jsonResponse(metricsResponse(actorCtx));
}
if (
jsRequest.method === "POST" &&
url.pathname.startsWith("/inspector/action/")
Expand Down Expand Up @@ -3662,9 +3561,14 @@ export function buildNativeFactory(
const actorCtx = makeActorCtx(ctx);
try {
const decodedInput = decodeValue(input);
const startedAt = performance.now();
const state = hasStaticState
? structuredClone(config.state)
: await config.createState(actorCtx, decodedInput);
logger().debug({
msg: "perf user: createStateMs",
durationMs: performance.now() - startedAt,
});
actorCtx.initializeState(state);
return encodeValue(state);
} finally {
Expand Down Expand Up @@ -3703,9 +3607,14 @@ export function buildNativeFactory(
const { ctx } = unwrapTsfnPayload(error, payload);
const actorCtx = makeActorCtx(ctx);
try {
const startedAt = performance.now();
const vars = hasStaticVars
? structuredClone(config.vars)
: await config.createVars(actorCtx, undefined);
logger().debug({
msg: "perf user: createVarsMs",
durationMs: performance.now() - startedAt,
});
actorCtx.vars = vars;
} finally {
await actorCtx.dispose();
Expand Down Expand Up @@ -3756,7 +3665,12 @@ export function buildNativeFactory(
const { ctx } = unwrapTsfnPayload(error, payload);
const actorCtx = makeActorCtx(ctx);
try {
const startedAt = performance.now();
await config.onWake(actorCtx);
logger().debug({
msg: "perf user: onWakeMs",
durationMs: performance.now() - startedAt,
});
} finally {
await actorCtx.dispose();
}
Expand Down Expand Up @@ -4475,9 +4389,9 @@ export async function buildNativeRegistry(config: RegistryConfig): Promise<{
}> {
if (
config.test?.enabled &&
process.env.RIVET_INSPECTOR_TOKEN === undefined
process.env._RIVET_TEST_INSPECTOR_TOKEN === undefined
) {
process.env.RIVET_INSPECTOR_TOKEN = "token";
process.env._RIVET_TEST_INSPECTOR_TOKEN = "token";
}

const bindings = await loadNativeBindings();
Expand Down
Loading