Skip to content

Commit 9a50acb

Browse files
committed
fix(rivetkit): wire CancellationToken through waitForNamesAvailable to eliminate KV busy-polling
1 parent 7b1acff commit 9a50acb

5 files changed

Lines changed: 16 additions & 53 deletions

File tree

rivetkit-typescript/packages/rivetkit-napi/src/queue.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ impl Queue {
162162
&self,
163163
names: Vec<String>,
164164
options: Option<JsQueueWaitOptions>,
165+
signal: Option<&CancellationToken>,
165166
) -> napi::Result<()> {
166167
self.inner
167-
.wait_for_names_available(names, queue_wait_opts(options, None)?)
168+
.wait_for_names_available(names, queue_wait_opts(options, signal)?)
168169
.await
169170
.map_err(napi_anyhow_error)
170171
}

rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,10 +677,11 @@ export class NapiCoreRuntime implements CoreRuntime {
677677
ctx: ActorContextHandle,
678678
names: string[],
679679
options?: RuntimeQueueWaitOptions | undefined | null,
680+
signal?: CancellationTokenHandle | undefined | null,
680681
): Promise<void> {
681682
await asNativeActorContext(ctx)
682683
.queue()
683-
.waitForNamesAvailable(names, options);
684+
.waitForNamesAvailable(names, options, signal);
684685
}
685686

686687
async actorQueueEnqueueAndWait(

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,65 +1675,24 @@ class NativeQueueAdapter {
16751675
signal?: AbortSignal;
16761676
},
16771677
) {
1678-
if (!options?.signal) {
1678+
const { token, cleanup } = await createCancellationTokenHandle(
1679+
this.#runtime,
1680+
options?.signal,
1681+
);
1682+
1683+
try {
16791684
await callNative(() =>
16801685
this.#runtime.actorQueueWaitForNamesAvailable(
16811686
this.#ctx,
16821687
[...names],
16831688
{
16841689
timeoutMs: options?.timeout,
16851690
},
1691+
token,
16861692
),
16871693
);
1688-
return;
1689-
}
1690-
1691-
const deadline =
1692-
options.timeout === undefined
1693-
? undefined
1694-
: Date.now() + options.timeout;
1695-
1696-
for (;;) {
1697-
if (options.signal.aborted) {
1698-
throw actorAbortedError();
1699-
}
1700-
1701-
const remainingTimeout =
1702-
deadline === undefined
1703-
? undefined
1704-
: Math.max(0, deadline - Date.now());
1705-
const sliceTimeout =
1706-
remainingTimeout === undefined
1707-
? 100
1708-
: Math.min(remainingTimeout, 100);
1709-
1710-
try {
1711-
await callNative(() =>
1712-
this.#runtime.actorQueueWaitForNamesAvailable(
1713-
this.#ctx,
1714-
[...names],
1715-
{
1716-
timeoutMs: sliceTimeout,
1717-
},
1718-
),
1719-
);
1720-
return;
1721-
} catch (error) {
1722-
if (
1723-
(error as { group?: string; code?: string }).group ===
1724-
"queue" &&
1725-
(error as { group?: string; code?: string }).code ===
1726-
"timed_out"
1727-
) {
1728-
if (
1729-
remainingTimeout === undefined ||
1730-
remainingTimeout > 100
1731-
) {
1732-
continue;
1733-
}
1734-
}
1735-
throw error;
1736-
}
1694+
} finally {
1695+
cleanup?.();
17371696
}
17381697
}
17391698

rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@ export interface CoreRuntime {
491491
ctx: ActorContextHandle,
492492
names: string[],
493493
options?: RuntimeQueueWaitOptions | undefined | null,
494+
signal?: CancellationTokenHandle | undefined | null,
494495
): Promise<void>;
495496
actorQueueEnqueueAndWait(
496497
ctx: ActorContextHandle,

rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,9 +764,10 @@ export class WasmCoreRuntime implements CoreRuntime {
764764
ctx: ActorContextHandle,
765765
names: string[],
766766
options?: RuntimeQueueWaitOptions | undefined | null,
767+
signal?: CancellationTokenHandle | undefined | null,
767768
): Promise<void> {
768769
const queue = childHandle(asWasmActorContext(ctx), "queue");
769-
await callHandleAsync(queue, "waitForNamesAvailable", names, options);
770+
await callHandleAsync(queue, "waitForNamesAvailable", names, options, signal);
770771
}
771772

772773
async actorQueueEnqueueAndWait(

0 commit comments

Comments
 (0)