Skip to content

Commit 0295ef9

Browse files
committed
feat(inspector): insert and clear actor queue
1 parent a7e1162 commit 0295ef9

8 files changed

Lines changed: 109 additions & 4 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,46 @@ impl ActorContext {
535535
self.config().max_queue_size
536536
}
537537

538+
/// Removes all messages from the queue and resets metadata.
539+
pub async fn reset(&self) -> Result<()> {
540+
self.ensure_initialized().await?;
541+
542+
// List and delete all message keys
543+
let entries = self.list_message_entries().await?;
544+
if !entries.is_empty() {
545+
let keys: Vec<Vec<u8>> = entries.iter().map(|(k, _)| k.clone()).collect();
546+
let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
547+
self.0
548+
.kv
549+
.batch_delete(&key_refs)
550+
.await
551+
.context("delete all queue messages")?;
552+
}
553+
554+
// Reset metadata
555+
let metadata = QueueMetadata {
556+
next_id: 0,
557+
size: 0,
558+
};
559+
let encoded_metadata =
560+
encode_queue_metadata(&metadata).context("encode reset queue metadata")?;
561+
self.0
562+
.kv
563+
.put(&QUEUE_METADATA_KEY, &encoded_metadata)
564+
.await
565+
.context("persist reset queue metadata")?;
566+
*self.0.queue_metadata.lock().await = metadata;
567+
568+
// Drop all completion waiters
569+
self.0.queue_completion_waiters.clear_async().await;
570+
571+
// Update metrics and notify inspector
572+
self.0.metrics.set_queue_depth(0);
573+
self.notify_inspector_update(0);
574+
575+
Ok(())
576+
}
577+
538578
pub(crate) fn configure_queue(&self, config: ActorConfig) {
539579
*self.0.queue_config.lock() = config;
540580
}

rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::dispatch::*;
22
use super::http::*;
33
use super::*;
4-
use crate::error::{client_error_message, ProtocolError};
4+
use crate::error::{ProtocolError, client_error_message};
55
use ::http;
66

77
#[derive(rivet_error::RivetError, serde::Serialize)]
@@ -127,6 +127,30 @@ impl RegistryDispatcher {
127127
};
128128
json_http_response(StatusCode::OK, &payload)
129129
}
130+
(http::Method::DELETE, "/inspector/queue") => {
131+
match instance.ctx.queue().reset().await {
132+
Ok(_) => json_http_response(StatusCode::OK, &json!({})),
133+
Err(error) => Err(error).context("reset inspector queue"),
134+
}
135+
}
136+
(http::Method::POST, "/inspector/queue") => {
137+
let body: InspectorEnqueueBody = match parse_json_body(request) {
138+
Ok(body) => body,
139+
Err(response) => return Ok(Some(response)),
140+
};
141+
let cbor_body = encode_json_as_cbor(&body.body.unwrap_or(serde_json::Value::Null))?;
142+
match instance.ctx.queue().send(&body.name, &cbor_body).await {
143+
Ok(message) => json_http_response(
144+
StatusCode::OK,
145+
&json!({
146+
"id": message.id,
147+
"name": message.name,
148+
"createdAtMs": message.created_at,
149+
}),
150+
),
151+
Err(error) => Err(error).context("enqueue inspector queue message"),
152+
}
153+
}
130154
(http::Method::GET, "/inspector/workflow-history") => self
131155
.inspector_workflow_history(instance)
132156
.await
@@ -346,9 +370,7 @@ impl RegistryDispatcher {
346370
) -> Result<(bool, Option<Vec<u8>>)> {
347371
let result = instance
348372
.ctx
349-
.internal_keep_awake(dispatch_workflow_history_through_task(
350-
&instance.dispatch,
351-
))
373+
.internal_keep_awake(dispatch_workflow_history_through_task(&instance.dispatch))
352374
.await
353375
.context("load inspector workflow history");
354376

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ struct InspectorWorkflowReplayBody {
241241
entry_id: Option<String>,
242242
}
243243

244+
#[derive(Debug, Default, Deserialize)]
245+
#[serde(default)]
246+
struct InspectorEnqueueBody {
247+
name: String,
248+
body: Option<JsonValue>,
249+
}
250+
244251
#[derive(Debug, Serialize)]
245252
#[serde(rename_all = "camelCase")]
246253
struct InspectorQueueMessageJson {

rivetkit-typescript/packages/rivetkit-napi/src/queue.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ impl Queue {
216216
self.inner.max_size()
217217
}
218218

219+
#[napi]
220+
pub async fn reset(&self) -> napi::Result<()> {
221+
self.inner.reset().await.map_err(napi_anyhow_error)
222+
}
223+
219224
#[napi]
220225
pub async fn inspect_messages(&self) -> napi::Result<Vec<JsQueueInspectMessage>> {
221226
self.inner

rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,10 @@ export class NapiCoreRuntime implements CoreRuntime {
719719
return await asNativeActorContext(ctx).queue().inspectMessages();
720720
}
721721

722+
async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
723+
await asNativeActorContext(ctx).queue().reset();
724+
}
725+
722726
actorScheduleAfter(
723727
ctx: ActorContextHandle,
724728
durationMs: number,

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3424,6 +3424,27 @@ export function buildNativeFactory(
34243424
messages,
34253425
});
34263426
}
3427+
if (
3428+
url.pathname === "/inspector/queue" &&
3429+
jsRequest.method === "DELETE"
3430+
) {
3431+
await runtime.actorQueueReset(ctx);
3432+
return jsonResponse({});
3433+
}
3434+
if (
3435+
url.pathname === "/inspector/queue" &&
3436+
jsRequest.method === "POST"
3437+
) {
3438+
const body = await jsRequest.json() as { name?: string; body?: unknown };
3439+
const name = body.name ?? "";
3440+
const cbor = encodeCborCompat((body.body ?? null) as JsonCompatValue);
3441+
const message = await runtime.actorQueueSend(ctx, name, cbor);
3442+
return jsonResponse({
3443+
id: Number(message.id()),
3444+
name: message.name(),
3445+
createdAtMs: message.createdAt(),
3446+
});
3447+
}
34273448
if (
34283449
url.pathname === "/inspector/traces" &&
34293450
jsRequest.method === "GET"

rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ export interface CoreRuntime {
508508
actorQueueInspectMessages(
509509
ctx: ActorContextHandle,
510510
): Promise<RuntimeQueueInspectMessage[]>;
511+
actorQueueReset(ctx: ActorContextHandle): Promise<void>;
511512

512513
actorScheduleAfter(
513514
ctx: ActorContextHandle,

rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,11 @@ export class WasmCoreRuntime implements CoreRuntime {
814814
return await callHandleAsync(queue, "inspectMessages");
815815
}
816816

817+
async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
818+
const queue = childHandle(asWasmActorContext(ctx), "queue");
819+
await callHandleAsync(queue, "reset");
820+
}
821+
817822
actorScheduleAfter(
818823
ctx: ActorContextHandle,
819824
durationMs: number | bigint,

0 commit comments

Comments
 (0)