Skip to content

Commit f83acdc

Browse files
committed
fix(rivetkit): wire CancellationToken through waitForNamesAvailable to eliminate KV busy-polling
1 parent 2c864a8 commit f83acdc

5 files changed

Lines changed: 16 additions & 53 deletions

File tree

rivetkit-typescript/packages/rivetkit-napi/src/queue.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ impl Queue {
162162
&self,
163163
names: Vec<String>,
164164
options: Option<JsQueueWaitOptions>,
165+
signal: Option<&CancellationToken>,
165166
) -> napi::Result<()> {
166167
self.inner
167-
.wait_for_names_available(names, queue_wait_opts(options, None)?)
168+
.wait_for_names_available(names, queue_wait_opts(options, signal)?)
168169
.await
169170
.map_err(napi_anyhow_error)
170171
}

rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,10 +677,11 @@ export class NapiCoreRuntime implements CoreRuntime {
677677
ctx: ActorContextHandle,
678678
names: string[],
679679
options?: RuntimeQueueWaitOptions | undefined | null,
680+
signal?: CancellationTokenHandle | undefined | null,
680681
): Promise<void> {
681682
await asNativeActorContext(ctx)
682683
.queue()
683-
.waitForNamesAvailable(names, options);
684+
.waitForNamesAvailable(names, options, signal);
684685
}
685686

686687
async actorQueueEnqueueAndWait(

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,65 +1657,24 @@ class NativeQueueAdapter {
16571657
signal?: AbortSignal;
16581658
},
16591659
) {
1660-
if (!options?.signal) {
1660+
const { token, cleanup } = await createCancellationTokenHandle(
1661+
this.#runtime,
1662+
options?.signal,
1663+
);
1664+
1665+
try {
16611666
await callNative(() =>
16621667
this.#runtime.actorQueueWaitForNamesAvailable(
16631668
this.#ctx,
16641669
[...names],
16651670
{
16661671
timeoutMs: options?.timeout,
16671672
},
1673+
token,
16681674
),
16691675
);
1670-
return;
1671-
}
1672-
1673-
const deadline =
1674-
options.timeout === undefined
1675-
? undefined
1676-
: Date.now() + options.timeout;
1677-
1678-
for (;;) {
1679-
if (options.signal.aborted) {
1680-
throw actorAbortedError();
1681-
}
1682-
1683-
const remainingTimeout =
1684-
deadline === undefined
1685-
? undefined
1686-
: Math.max(0, deadline - Date.now());
1687-
const sliceTimeout =
1688-
remainingTimeout === undefined
1689-
? 100
1690-
: Math.min(remainingTimeout, 100);
1691-
1692-
try {
1693-
await callNative(() =>
1694-
this.#runtime.actorQueueWaitForNamesAvailable(
1695-
this.#ctx,
1696-
[...names],
1697-
{
1698-
timeoutMs: sliceTimeout,
1699-
},
1700-
),
1701-
);
1702-
return;
1703-
} catch (error) {
1704-
if (
1705-
(error as { group?: string; code?: string }).group ===
1706-
"queue" &&
1707-
(error as { group?: string; code?: string }).code ===
1708-
"timed_out"
1709-
) {
1710-
if (
1711-
remainingTimeout === undefined ||
1712-
remainingTimeout > 100
1713-
) {
1714-
continue;
1715-
}
1716-
}
1717-
throw error;
1718-
}
1676+
} finally {
1677+
cleanup?.();
17191678
}
17201679
}
17211680

rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ export interface CoreRuntime {
488488
ctx: ActorContextHandle,
489489
names: string[],
490490
options?: RuntimeQueueWaitOptions | undefined | null,
491+
signal?: CancellationTokenHandle | undefined | null,
491492
): Promise<void>;
492493
actorQueueEnqueueAndWait(
493494
ctx: ActorContextHandle,

rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,9 +764,10 @@ export class WasmCoreRuntime implements CoreRuntime {
764764
ctx: ActorContextHandle,
765765
names: string[],
766766
options?: RuntimeQueueWaitOptions | undefined | null,
767+
signal?: CancellationTokenHandle | undefined | null,
767768
): Promise<void> {
768769
const queue = childHandle(asWasmActorContext(ctx), "queue");
769-
await callHandleAsync(queue, "waitForNamesAvailable", names, options);
770+
await callHandleAsync(queue, "waitForNamesAvailable", names, options, signal);
770771
}
771772

772773
async actorQueueEnqueueAndWait(

0 commit comments

Comments
 (0)