diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs b/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs index d210232f1f..f75927f0a7 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs @@ -162,9 +162,10 @@ impl Queue { &self, names: Vec, options: Option, + signal: Option<&CancellationToken>, ) -> napi::Result<()> { self.inner - .wait_for_names_available(names, queue_wait_opts(options, None)?) + .wait_for_names_available(names, queue_wait_opts(options, signal)?) .await .map_err(napi_anyhow_error) } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index 73ec7abbcb..c95c9db95c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -677,10 +677,11 @@ export class NapiCoreRuntime implements CoreRuntime { ctx: ActorContextHandle, names: string[], options?: RuntimeQueueWaitOptions | undefined | null, + signal?: CancellationTokenHandle | undefined | null, ): Promise { await asNativeActorContext(ctx) .queue() - .waitForNamesAvailable(names, options); + .waitForNamesAvailable(names, options, signal); } async actorQueueEnqueueAndWait( diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 9f38f4b240..bc8036324e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -1657,7 +1657,12 @@ class NativeQueueAdapter { signal?: AbortSignal; }, ) { - if (!options?.signal) { + const { token, cleanup } = await createCancellationTokenHandle( + this.#runtime, + options?.signal, + ); + + try { await callNative(() => this.#runtime.actorQueueWaitForNamesAvailable( this.#ctx, @@ -1665,57 +1670,11 @@ class NativeQueueAdapter { { timeoutMs: options?.timeout, }, + token, ), ); - return; - } - - const deadline = - options.timeout === undefined - ? undefined - : Date.now() + options.timeout; - - for (;;) { - if (options.signal.aborted) { - throw actorAbortedError(); - } - - const remainingTimeout = - deadline === undefined - ? undefined - : Math.max(0, deadline - Date.now()); - const sliceTimeout = - remainingTimeout === undefined - ? 100 - : Math.min(remainingTimeout, 100); - - try { - await callNative(() => - this.#runtime.actorQueueWaitForNamesAvailable( - this.#ctx, - [...names], - { - timeoutMs: sliceTimeout, - }, - ), - ); - return; - } catch (error) { - if ( - (error as { group?: string; code?: string }).group === - "queue" && - (error as { group?: string; code?: string }).code === - "timed_out" - ) { - if ( - remainingTimeout === undefined || - remainingTimeout > 100 - ) { - continue; - } - } - throw error; - } + } finally { + cleanup?.(); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index 378dbae1ff..67f6b6a633 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -488,6 +488,7 @@ export interface CoreRuntime { ctx: ActorContextHandle, names: string[], options?: RuntimeQueueWaitOptions | undefined | null, + signal?: CancellationTokenHandle | undefined | null, ): Promise; actorQueueEnqueueAndWait( ctx: ActorContextHandle, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index 0b02f2c500..5601532364 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -764,9 +764,10 @@ export class WasmCoreRuntime implements CoreRuntime { ctx: ActorContextHandle, names: string[], options?: RuntimeQueueWaitOptions | undefined | null, + signal?: CancellationTokenHandle | undefined | null, ): Promise { const queue = childHandle(asWasmActorContext(ctx), "queue"); - await callHandleAsync(queue, "waitForNamesAvailable", names, options); + await callHandleAsync(queue, "waitForNamesAvailable", names, options, signal); } async actorQueueEnqueueAndWait(