Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,46 @@ impl ActorContext {
self.config().max_queue_size
}

/// Removes all messages from the queue and resets metadata.
pub async fn reset(&self) -> Result<()> {
self.ensure_initialized().await?;

// List and delete all message keys
let entries = self.list_message_entries().await?;
if !entries.is_empty() {
let keys: Vec<Vec<u8>> = entries.iter().map(|(k, _)| k.clone()).collect();
let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
self.0
.kv
.batch_delete(&key_refs)
.await
.context("delete all queue messages")?;
}

// Reset metadata
let metadata = QueueMetadata {
next_id: 0,
size: 0,
};
let encoded_metadata =
encode_queue_metadata(&metadata).context("encode reset queue metadata")?;
self.0
.kv
.put(&QUEUE_METADATA_KEY, &encoded_metadata)
.await
.context("persist reset queue metadata")?;
*self.0.queue_metadata.lock().await = metadata;

// Drop all completion waiters
self.0.queue_completion_waiters.clear_async().await;

// Update metrics and notify inspector
self.0.metrics.set_queue_depth(0);
self.notify_inspector_update(0);

Ok(())
}

pub(crate) fn configure_queue(&self, config: ActorConfig) {
*self.0.queue_config.lock() = config;
}
Expand Down
30 changes: 26 additions & 4 deletions rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::dispatch::*;
use super::http::*;
use super::*;
use crate::error::{client_error_message, ProtocolError};
use crate::error::{ProtocolError, client_error_message};
use ::http;

#[derive(rivet_error::RivetError, serde::Serialize)]
Expand Down Expand Up @@ -127,6 +127,30 @@ impl RegistryDispatcher {
};
json_http_response(StatusCode::OK, &payload)
}
(http::Method::DELETE, "/inspector/queue") => {
match instance.ctx.queue().reset().await {
Ok(_) => json_http_response(StatusCode::OK, &json!({})),
Err(error) => Err(error).context("reset inspector queue"),
}
}
(http::Method::POST, "/inspector/queue") => {
let body: InspectorEnqueueBody = match parse_json_body(request) {
Ok(body) => body,
Err(response) => return Ok(Some(response)),
};
let cbor_body = encode_json_as_cbor(&body.body.unwrap_or(serde_json::Value::Null))?;
match instance.ctx.queue().send(&body.name, &cbor_body).await {
Ok(message) => json_http_response(
StatusCode::OK,
&json!({
"id": message.id,
"name": message.name,
"createdAtMs": message.created_at,
}),
),
Err(error) => Err(error).context("enqueue inspector queue message"),
}
}
(http::Method::GET, "/inspector/workflow-history") => self
.inspector_workflow_history(instance)
.await
Expand Down Expand Up @@ -346,9 +370,7 @@ impl RegistryDispatcher {
) -> Result<(bool, Option<Vec<u8>>)> {
let result = instance
.ctx
.internal_keep_awake(dispatch_workflow_history_through_task(
&instance.dispatch,
))
.internal_keep_awake(dispatch_workflow_history_through_task(&instance.dispatch))
.await
.context("load inspector workflow history");

Expand Down
7 changes: 7 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ struct InspectorWorkflowReplayBody {
entry_id: Option<String>,
}

#[derive(Debug, Default, Deserialize)]
#[serde(default)]
struct InspectorEnqueueBody {
name: String,
body: Option<JsonValue>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct InspectorQueueMessageJson {
Expand Down
5 changes: 5 additions & 0 deletions rivetkit-typescript/packages/rivetkit-napi/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ impl Queue {
self.inner.max_size()
}

#[napi]
pub async fn reset(&self) -> napi::Result<()> {
self.inner.reset().await.map_err(napi_anyhow_error)
}

#[napi]
pub async fn inspect_messages(&self) -> napi::Result<Vec<JsQueueInspectMessage>> {
self.inner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,10 @@ export class NapiCoreRuntime implements CoreRuntime {
return await asNativeActorContext(ctx).queue().inspectMessages();
}

async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
await asNativeActorContext(ctx).queue().reset();
}

actorScheduleAfter(
ctx: ActorContextHandle,
durationMs: number,
Expand Down
21 changes: 21 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3461,6 +3461,27 @@ export function buildNativeFactory(
messages,
});
}
if (
url.pathname === "/inspector/queue" &&
jsRequest.method === "DELETE"
) {
await runtime.actorQueueReset(ctx);
return jsonResponse({});
}
if (
url.pathname === "/inspector/queue" &&
jsRequest.method === "POST"
) {
const body = await jsRequest.json() as { name?: string; body?: unknown };
const name = body.name ?? "";
const cbor = encodeCborCompat((body.body ?? null) as JsonCompatValue);
const message = await runtime.actorQueueSend(ctx, name, cbor);
return jsonResponse({
id: Number(message.id()),
name: message.name(),
createdAtMs: message.createdAt(),
});
}
if (
url.pathname === "/inspector/traces" &&
jsRequest.method === "GET"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ export interface CoreRuntime {
actorQueueInspectMessages(
ctx: ActorContextHandle,
): Promise<RuntimeQueueInspectMessage[]>;
actorQueueReset(ctx: ActorContextHandle): Promise<void>;

actorScheduleAfter(
ctx: ActorContextHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ export class WasmCoreRuntime implements CoreRuntime {
return await callHandleAsync(queue, "inspectMessages");
}

async actorQueueReset(ctx: ActorContextHandle): Promise<void> {
const queue = childHandle(asWasmActorContext(ctx), "queue");
await callHandleAsync(queue, "reset");
}

actorScheduleAfter(
ctx: ActorContextHandle,
durationMs: number | bigint,
Expand Down
Loading