Skip to content

Commit 5e99ca7

Browse files
committed
fix(rivetkit): wire CancellationToken through waitForNamesAvailable to eliminate KV busy-polling
1 parent 50b18ec commit 5e99ca7

5 files changed

Lines changed: 16 additions & 53 deletions

File tree

rivetkit-typescript/packages/rivetkit-napi/src/queue.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ impl Queue {
162162
&self,
163163
names: Vec<String>,
164164
options: Option<JsQueueWaitOptions>,
165+
signal: Option<&CancellationToken>,
165166
) -> napi::Result<()> {
166167
self.inner
167-
.wait_for_names_available(names, queue_wait_opts(options, None)?)
168+
.wait_for_names_available(names, queue_wait_opts(options, signal)?)
168169
.await
169170
.map_err(napi_anyhow_error)
170171
}

rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,10 +645,11 @@ export class NapiCoreRuntime implements CoreRuntime {
645645
ctx: ActorContextHandle,
646646
names: string[],
647647
options?: RuntimeQueueWaitOptions | undefined | null,
648+
signal?: CancellationTokenHandle | undefined | null,
648649
): Promise<void> {
649650
await asNativeActorContext(ctx)
650651
.queue()
651-
.waitForNamesAvailable(names, options);
652+
.waitForNamesAvailable(names, options, signal);
652653
}
653654

654655
async actorQueueEnqueueAndWait(

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,65 +1649,24 @@ class NativeQueueAdapter {
16491649
signal?: AbortSignal;
16501650
},
16511651
) {
1652-
if (!options?.signal) {
1652+
const { token, cleanup } = await createCancellationTokenHandle(
1653+
this.#runtime,
1654+
options?.signal,
1655+
);
1656+
1657+
try {
16531658
await callNative(() =>
16541659
this.#runtime.actorQueueWaitForNamesAvailable(
16551660
this.#ctx,
16561661
[...names],
16571662
{
16581663
timeoutMs: options?.timeout,
16591664
},
1665+
token,
16601666
),
16611667
);
1662-
return;
1663-
}
1664-
1665-
const deadline =
1666-
options.timeout === undefined
1667-
? undefined
1668-
: Date.now() + options.timeout;
1669-
1670-
for (;;) {
1671-
if (options.signal.aborted) {
1672-
throw actorAbortedError();
1673-
}
1674-
1675-
const remainingTimeout =
1676-
deadline === undefined
1677-
? undefined
1678-
: Math.max(0, deadline - Date.now());
1679-
const sliceTimeout =
1680-
remainingTimeout === undefined
1681-
? 100
1682-
: Math.min(remainingTimeout, 100);
1683-
1684-
try {
1685-
await callNative(() =>
1686-
this.#runtime.actorQueueWaitForNamesAvailable(
1687-
this.#ctx,
1688-
[...names],
1689-
{
1690-
timeoutMs: sliceTimeout,
1691-
},
1692-
),
1693-
);
1694-
return;
1695-
} catch (error) {
1696-
if (
1697-
(error as { group?: string; code?: string }).group ===
1698-
"queue" &&
1699-
(error as { group?: string; code?: string }).code ===
1700-
"timed_out"
1701-
) {
1702-
if (
1703-
remainingTimeout === undefined ||
1704-
remainingTimeout > 100
1705-
) {
1706-
continue;
1707-
}
1708-
}
1709-
throw error;
1710-
}
1668+
} finally {
1669+
cleanup?.();
17111670
}
17121671
}
17131672

rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ export interface CoreRuntime {
481481
ctx: ActorContextHandle,
482482
names: string[],
483483
options?: RuntimeQueueWaitOptions | undefined | null,
484+
signal?: CancellationTokenHandle | undefined | null,
484485
): Promise<void>;
485486
actorQueueEnqueueAndWait(
486487
ctx: ActorContextHandle,

rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,9 +753,10 @@ export class WasmCoreRuntime implements CoreRuntime {
753753
ctx: ActorContextHandle,
754754
names: string[],
755755
options?: RuntimeQueueWaitOptions | undefined | null,
756+
signal?: CancellationTokenHandle | undefined | null,
756757
): Promise<void> {
757758
const queue = childHandle(asWasmActorContext(ctx), "queue");
758-
await callHandleAsync(queue, "waitForNamesAvailable", names, options);
759+
await callHandleAsync(queue, "waitForNamesAvailable", names, options, signal);
759760
}
760761

761762
async actorQueueEnqueueAndWait(

0 commit comments

Comments
 (0)