Skip to content

Commit 95dd3b3

Browse files
committed
feat(inspector): insert and clear actor queue
1 parent 9dc7e20 commit 95dd3b3

8 files changed

Lines changed: 111 additions & 9 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,46 @@ impl ActorContext {
535535
self.config().max_queue_size
536536
}
537537

538+
/// Removes all messages from the queue and resets metadata.
539+
pub async fn reset(&self) -> Result<()> {
540+
self.ensure_initialized().await?;
541+
542+
// List and delete all message keys
543+
let entries = self.list_message_entries().await?;
544+
if !entries.is_empty() {
545+
let keys: Vec<Vec<u8>> = entries.iter().map(|(k, _)| k.clone()).collect();
546+
let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
547+
self.0
548+
.kv
549+
.batch_delete(&key_refs)
550+
.await
551+
.context("delete all queue messages")?;
552+
}
553+
554+
// Reset metadata
555+
let metadata = QueueMetadata {
556+
next_id: 0,
557+
size: 0,
558+
};
559+
let encoded_metadata =
560+
encode_queue_metadata(&metadata).context("encode reset queue metadata")?;
561+
self.0
562+
.kv
563+
.put(&QUEUE_METADATA_KEY, &encoded_metadata)
564+
.await
565+
.context("persist reset queue metadata")?;
566+
*self.0.queue_metadata.lock().await = metadata;
567+
568+
// Drop all completion waiters
569+
self.0.queue_completion_waiters.clear_async().await;
570+
571+
// Update metrics and notify inspector
572+
self.0.metrics.set_queue_depth(0);
573+
self.notify_inspector_update(0);
574+
575+
Ok(())
576+
}
577+
538578
pub(crate) fn configure_queue(&self, config: ActorConfig) {
539579
*self.0.queue_config.lock() = config;
540580
}

rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::dispatch::*;
22
use super::http::*;
33
use super::*;
4-
use crate::error::{client_error_message, ProtocolError};
4+
use crate::error::{ProtocolError, client_error_message};
55
use ::http;
66

77
#[derive(rivet_error::RivetError, serde::Serialize)]
@@ -127,6 +127,30 @@ impl RegistryDispatcher {
127127
};
128128
json_http_response(StatusCode::OK, &payload)
129129
}
130+
(http::Method::DELETE, "/inspector/queue") => {
131+
match instance.ctx.queue().reset().await {
132+
Ok(_) => json_http_response(StatusCode::OK, &json!({})),
133+
Err(error) => Err(error).context("reset inspector queue"),
134+
}
135+
}
136+
(http::Method::POST, "/inspector/queue") => {
137+
let body: InspectorEnqueueBody = match parse_json_body(request) {
138+
Ok(body) => body,
139+
Err(response) => return Ok(Some(response)),
140+
};
141+
let cbor_body = encode_json_as_cbor(&body.body.unwrap_or(serde_json::Value::Null))?;
142+
match instance.ctx.queue().send(&body.name, &cbor_body).await {
143+
Ok(message) => json_http_response(
144+
StatusCode::OK,
145+
&json!({
146+
"id": message.id,
147+
"name": message.name,
148+
"createdAtMs": message.created_at,
149+
}),
150+
),
151+
Err(error) => Err(error).context("enqueue inspector queue message"),
152+
}
153+
}
130154
(http::Method::GET, "/inspector/workflow-history") => self
131155
.inspector_workflow_history(instance)
132156
.await
@@ -346,9 +370,7 @@ impl RegistryDispatcher {
346370
) -> Result<(bool, Option<Vec<u8>>)> {
347371
let result = instance
348372
.ctx
349-
.internal_keep_awake(dispatch_workflow_history_through_task(
350-
&instance.dispatch,
351-
))
373+
.internal_keep_awake(dispatch_workflow_history_through_task(&instance.dispatch))
352374
.await
353375
.context("load inspector workflow history");
354376

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,13 @@ struct InspectorWorkflowReplayBody {
212212
entry_id: Option<String>,
213213
}
214214

215+
#[derive(Debug, Default, Deserialize)]
216+
#[serde(default)]
217+
struct InspectorEnqueueBody {
218+
name: String,
219+
body: Option<JsonValue>,
220+
}
221+
215222
#[derive(Debug, Serialize)]
216223
#[serde(rename_all = "camelCase")]
217224
struct InspectorQueueMessageJson {
@@ -625,11 +632,8 @@ impl RegistryDispatcher {
625632

626633
let (start_tx, start_rx) = oneshot::channel();
627634
let result: Result<Arc<ActorTaskHandle>> = async {
628-
try_send_lifecycle_command(
629-
&lifecycle_tx,
630-
LifecycleCommand::Start { reply: start_tx },
631-
)
632-
.context("send actor task start command")?;
635+
try_send_lifecycle_command(&lifecycle_tx, LifecycleCommand::Start { reply: start_tx })
636+
.context("send actor task start command")?;
633637
start_rx
634638
.await
635639
.context("receive actor task start reply")?

rivetkit-typescript/packages/rivetkit-napi/src/queue.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ impl Queue {
216216
self.inner.max_size()
217217
}
218218

219+
#[napi]
220+
pub async fn reset(&self) -> napi::Result<()> {
221+
self.inner.reset().await.map_err(napi_anyhow_error)
222+
}
223+
219224
#[napi]
220225
pub async fn inspect_messages(&self) -> napi::Result<Vec<JsQueueInspectMessage>> {
221226
self.inner

rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,10 @@ export class NapiCoreRuntime implements CoreRuntime {
687687
return await asNativeActorContext(ctx).queue().inspectMessages();
688688
}
689689

690+
async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
691+
await asNativeActorContext(ctx).queue().reset();
692+
}
693+
690694
actorScheduleAfter(
691695
ctx: ActorContextHandle,
692696
durationMs: number,

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3461,6 +3461,27 @@ export function buildNativeFactory(
34613461
messages,
34623462
});
34633463
}
3464+
if (
3465+
url.pathname === "/inspector/queue" &&
3466+
jsRequest.method === "DELETE"
3467+
) {
3468+
await runtime.actorQueueReset(ctx);
3469+
return jsonResponse({});
3470+
}
3471+
if (
3472+
url.pathname === "/inspector/queue" &&
3473+
jsRequest.method === "POST"
3474+
) {
3475+
const body = await jsRequest.json() as { name?: string; body?: unknown };
3476+
const name = body.name ?? "";
3477+
const cbor = encodeCborCompat((body.body ?? null) as JsonCompatValue);
3478+
const message = await runtime.actorQueueSend(ctx, name, cbor);
3479+
return jsonResponse({
3480+
id: Number(message.id()),
3481+
name: message.name(),
3482+
createdAtMs: message.createdAt(),
3483+
});
3484+
}
34643485
if (
34653486
url.pathname === "/inspector/traces" &&
34663487
jsRequest.method === "GET"

rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ export interface CoreRuntime {
498498
actorQueueInspectMessages(
499499
ctx: ActorContextHandle,
500500
): Promise<RuntimeQueueInspectMessage[]>;
501+
actorQueueReset(ctx: ActorContextHandle): Promise<void>;
501502

502503
actorScheduleAfter(
503504
ctx: ActorContextHandle,

rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,11 @@ export class WasmCoreRuntime implements CoreRuntime {
803803
return await callHandleAsync(queue, "inspectMessages");
804804
}
805805

806+
async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
807+
const queue = childHandle(asWasmActorContext(ctx), "queue");
808+
await callHandleAsync(queue, "reset");
809+
}
810+
806811
actorScheduleAfter(
807812
ctx: ActorContextHandle,
808813
durationMs: number | bigint,

0 commit comments

Comments
 (0)