Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions examples/playground/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { DurableObject } from "cloudflare:workers"
import { Actor, handler, Entrypoint, ActorState } from '../../../packages/core/src'
import { Storage } from '../../../packages/storage/src'
import { Alarms } from "../../../packages/alarms/src";
import { Queue } from "../../../packages/queue/src";

/**
* ------------
Expand Down Expand Up @@ -171,6 +172,32 @@ export class MyAlarmActor extends Actor<Env> {
// export default handler(MyAlarmActor);


// ---------------------------
// Example Actor with a queue
// ---------------------------
export class MyQueueActor extends Actor<Env> {
async fetch(request: Request): Promise<Response> {
this.queue.enqueue('operationA', []);
this.queue.enqueue('operationB', []);
return new Response('Operations queued')
}

public async operationA(payload: any): Promise<number> {
// Wait for 5 seconds before returning
await new Promise(resolve => setTimeout(resolve, 5000));
console.log('Operation A completing')
return 1;
}

public async operationB(payload: any): Promise<number> {
await new Promise(resolve => setTimeout(resolve, 10000));
console.log('Operation B completing')
return 2;
}
}
// export default handler(MyQueueActor);


// -----------------------------------------------------------
// Example Durable Object using the Storage & Alarms classes
// -----------------------------------------------------------
Expand All @@ -179,11 +206,13 @@ export class MyAlarmActor extends Actor<Env> {
export class MyDurableObject extends DurableObject<Env> {
storage: Storage;
alarms: Alarms<this>;
queue: Queue<this>;

constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
this.storage = new Storage(ctx.storage);
this.alarms = new Alarms(ctx, this);
this.queue = new Queue(ctx, this);
}

async fetch(request: Request): Promise<Response> {
Expand Down
7 changes: 6 additions & 1 deletion examples/playground/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"MyRPCActor",
"MyStorageActor",
"MyAlarmActor",
"MyDurableObject"
"MyDurableObject",
"MyQueueActor"
],
"tag": "v1"
}
Expand All @@ -32,6 +33,10 @@
{
"class_name": "MyDurableObject",
"name": "MyDurableObject"
},
{
"class_name": "MyQueueActor",
"name": "MyQueueActor"
}
]
},
Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
"exports": {
".": "./dist/core/src/index.js",
"./storage": "./dist/storage/src/index.js",
"./alarms": "./dist/alarms/src/index.js"
"./alarms": "./dist/alarms/src/index.js",
"./queue": "./dist/queue/src/index.js"
},
"types": "./dist/core/src/index.d.ts",
"typesVersions": {
"*": {
".": ["./dist/core/src/index.d.ts"],
"storage": ["./dist/storage/src/index.d.ts"],
"alarms": ["./dist/alarms/src/index.d.ts"]
"alarms": ["./dist/alarms/src/index.d.ts"],
"queue": ["./dist/queue/src/index.d.ts"]
}
},
"files": [
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { env, DurableObject, WorkerEntrypoint } from "cloudflare:workers";
import { Storage } from "../../storage/src/index";
import { Alarms } from "../../alarms/src/index";
import { Queue } from "../../queue/src/index";

/**
* Alias type for DurableObjectState to match the adopted Actor nomenclature.
Expand Down Expand Up @@ -38,6 +39,7 @@ export abstract class Actor<E> extends DurableObject<E> {
public identifier?: string;
public storage: Storage;
public alarms: Alarms<this>;
public queue: Queue<this>;

public __studio(_: any) {
return this.storage.__studio(_);
Expand Down Expand Up @@ -86,11 +88,13 @@ export abstract class Actor<E> extends DurableObject<E> {
super(ctx, env);
this.storage = new Storage(ctx.storage);
this.alarms = new Alarms(ctx, this);
this.queue = new Queue(ctx, this);
} else {
// @ts-ignore - This is handled internally by the framework
super();
this.storage = new Storage(undefined);
this.alarms = new Alarms(undefined, this);
this.queue = new Queue(undefined, this);
}

// Set a default identifier if none exists
Expand Down
2 changes: 1 addition & 1 deletion packages/core/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
"composite": true
},
"include": ["**/*.ts"],
"references": [{ "path": "../storage" }, { "path": "../alarms" }]
"references": [{ "path": "../storage" }, { "path": "../alarms" }, { "path": "../queue" }]
}
19 changes: 19 additions & 0 deletions packages/queue/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "actor-queue",
"private": true,
"version": "0.0.0",
"main": "../../dist/queue/src/index.js",
"types": "../../dist/queue/src/index.d.ts",
"scripts": {
"build": "tsc -p ."
},
"peerDependencies": {
"nanoid": "^5.1.5"
},
"devDependencies": {
"@cloudflare/vite-plugin": "1.2.2",
"typescript": "^5.5.2",
"vite": "^6.3.5",
"wrangler": "^4.16.0"
}
}
181 changes: 181 additions & 0 deletions packages/queue/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import { nanoid } from "nanoid";
import { DurableObject } from "cloudflare:workers";
import type { DurableObjectStorage } from "@cloudflare/workers-types";


export type QueueItem<T = string> = {
id: string;
payload: T;
callback: keyof DurableObject<unknown>;
created_at: number;
};

export class Queue<P extends DurableObject<any>> {
private parent: P;
public storage: DurableObjectStorage | undefined;
private isProcessing: boolean = false;

constructor(ctx: DurableObjectState | undefined, parent: P) {
this.storage = ctx?.storage;
this.parent = parent;
}

/**
* Queue a task to be executed in the future
* @param payload Payload to pass to the callback
* @param callback Name of the method to call
* @returns The ID of the queued task
*/
async enqueue<T = unknown>(callback: keyof P, payload: T): Promise<string> {
const id = nanoid(9);
if (typeof callback !== "string") {
throw new Error("Callback must be a string");
}

if (typeof this.parent[callback] !== "function") {
throw new Error(`this.${callback} is not a function`);
}

this.sql`
CREATE TABLE IF NOT EXISTS _actor_queues (id TEXT PRIMARY KEY, payload TEXT, callback TEXT, created_at INTEGER)
`;

this.sql`
INSERT OR REPLACE INTO _actor_queues (id, payload, callback, created_at)
VALUES (${id}, ${JSON.stringify(payload)}, ${callback}, ${Date.now()})
`;

// Only start a new flush if one isn't already running
if (!this.isProcessing) {
void this._flushQueue().catch((e) => {
console.error("Error flushing queue:", e);
});
}

return id;
}

private async _flushQueue() {
// If already processing, don't start another processing cycle
if (this.isProcessing) {
return;
}

this.isProcessing = true;

try {
while (true) {
const result = this.sql<QueueItem<string>>`
SELECT * FROM _actor_queues
ORDER BY created_at ASC
LIMIT 1
`;

if (!result || result.length === 0) {
break;
}

for (const row of result) {
const callback = this.parent[row.callback as keyof P];
if (!callback) {
console.error(`callback ${row.callback} not found`);
// Remove invalid callbacks from the queue
await this.dequeue(row.id);
continue;
}

// TODO: Add retries and backoff
try {
await (
callback as (
payload: unknown,
queueItem: QueueItem<string>
) => Promise<void>
).bind(this.parent)(JSON.parse(row.payload as string), row);

// Dequeue the task after successful execution
await this.dequeue(row.id);
} catch (e) {
console.error(`error executing callback "${row.callback}"`, e);
// Optionally: You could implement retry logic here instead of removing failed tasks
// For now, we'll remove failed tasks to prevent infinite retry loops
await this.dequeue(row.id);
}
}
}
} catch (error) {
console.error("Error in queue processing:", error);
} finally {
// Reset the processing flag when done
this.isProcessing = false;
}
}

/**
* Dequeue a task by ID
* @param id ID of the task to dequeue
*/
async dequeue(id: string) {
this.sql`DELETE FROM _actor_queues WHERE id = ${id}`;
}

/**
* Dequeue all tasks
*/
async dequeueAll() {
this.sql`DELETE FROM _actor_queues`;
}

/**
* Dequeue all tasks by callback
* @param callback Name of the callback to dequeue
*/
async dequeueAllByCallback(callback: string) {
this.sql`DELETE FROM _actor_queues WHERE callback = ${callback}`;
}

/**
* Get a queued task by ID
* @param id ID of the task to get
* @returns The task or undefined if not found
*/
async getQueue(id: string): Promise<QueueItem<string> | undefined> {
const result = this.sql<QueueItem<string>>`
SELECT * FROM _actor_queues WHERE id = ${id}
`;
return result
? { ...result[0], payload: JSON.parse(result[0].payload) }
: undefined;
}

/**
* Execute SQL queries against the Agent's database
* @template T Type of the returned rows
* @param strings SQL query template strings
* @param values Values to be inserted into the query
* @returns Array of query results
*/
sql<T = Record<string, string | number | boolean | null>>(
strings: TemplateStringsArray,
...values: (string | number | boolean | null)[]
) {
let query = "";
try {
// Construct the SQL query with placeholders
query = strings.reduce(
(acc, str, i) => acc + str + (i < values.length ? "?" : ""),
""
);

if (!this.storage) {
throw new Error("Storage not initialized");
}

// Execute the SQL query with the provided values
return [...this.storage.sql.exec(query, ...values)] as T[];
} catch (e) {
console.error(`failed to execute sql query: ${query}`, e);
throw e;
}
}
}
10 changes: 10 additions & 0 deletions packages/queue/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "../../dist/queue",
"rootDir": ".",
"composite": true
},
"include": ["**/*.ts"],
"references": [{ "path": "../core" }]
}