Skip to content

Commit cc59f53

Browse files
committed
feat(api): implement cron trigger management for workflows, including upsert and retrieval functionality
1 parent f1680ff commit cc59f53

22 files changed

Lines changed: 1998 additions & 91 deletions

apps/api/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"@hono/zod-validator": "^0.5.0",
4747
"@sendgrid/mail": "^8.1.5",
4848
"cloudflare": "^4.2.0",
49+
"cron-parser": "^5.2.0",
4950
"drizzle-orm": "0.43.1",
5051
"exifreader": "^4.31.0",
5152
"hono": "^4.7.8",

apps/api/src/cron.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import CronParser from "cron-parser"; // Or your chosen cron library
2+
import { and, eq, lte } from "drizzle-orm";
3+
import { drizzle } from "drizzle-orm/d1";
4+
5+
import { cronTriggers } from "./db/schema";
6+
7+
// This is a placeholder. You need to implement how workflows are triggered.
8+
// If executeWorkflow becomes more complex or used elsewhere, it could also be in its own file.
9+
async function executeWorkflow(
10+
workflowId: string,
11+
_env: any,
12+
_ctx: any
13+
): Promise<void> {
14+
console.log(`Executing workflow ${workflowId}`);
15+
// Example: await fetch(`https://your-service.com/api/execute-workflow/${workflowId}`, { method: 'POST' });
16+
// Or: directly call a function if workflow logic is bundled
17+
// Remember to handle errors appropriately.
18+
await Promise.resolve(); // Replace with actual workflow execution logic
19+
}
20+
21+
export async function handleCronTriggers(
22+
event: ScheduledEvent,
23+
env: { DB: D1Database },
24+
ctx: ExecutionContext
25+
): Promise<void> {
26+
console.log(`Cron event triggered at: ${new Date(event.scheduledTime)}`);
27+
const db = drizzle(env.DB, { schema: { cronTriggers } });
28+
29+
const now = new Date();
30+
31+
try {
32+
const dueTriggers = await db
33+
.select()
34+
.from(cronTriggers)
35+
.where(
36+
and(eq(cronTriggers.active, true), lte(cronTriggers.nextRunAt, now))
37+
)
38+
.all();
39+
40+
if (dueTriggers.length === 0) {
41+
console.log("No due cron triggers found.");
42+
return;
43+
}
44+
45+
console.log(`Found ${dueTriggers.length} due cron triggers.`);
46+
47+
for (const trigger of dueTriggers) {
48+
console.log(`Processing trigger for workflow: ${trigger.workflowId}`);
49+
try {
50+
// 1. Execute the workflow
51+
await executeWorkflow(trigger.workflowId, env, ctx);
52+
53+
// 2. Calculate next run time
54+
const interval = CronParser.parse(trigger.cronExpression, {
55+
currentDate: now,
56+
});
57+
const nextRun = interval.next().toDate();
58+
59+
// 3. Update the trigger in the database
60+
await db
61+
.update(cronTriggers)
62+
.set({
63+
lastRun: now,
64+
nextRunAt: nextRun,
65+
})
66+
.where(eq(cronTriggers.workflowId, trigger.workflowId))
67+
.execute();
68+
69+
console.log(
70+
`Workflow ${trigger.workflowId} executed. Next run at: ${nextRun.toISOString()}`
71+
);
72+
} catch (err) {
73+
console.error(
74+
`Error processing trigger for workflow ${trigger.workflowId}:`,
75+
err
76+
);
77+
// Optionally, you might want to implement retry logic or disable the trigger after too many failures.
78+
}
79+
}
80+
} catch (dbError) {
81+
console.error("Database error during scheduled task:", dbError);
82+
}
83+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
CREATE TABLE `cron_triggers` (
2+
`workflow_id` text PRIMARY KEY NOT NULL,
3+
`cron_expression` text NOT NULL,
4+
`active` integer DEFAULT true NOT NULL,
5+
`last_run` integer,
6+
`next_run_at` integer,
7+
`created_at` integer DEFAULT CURRENT_TIMESTAMP NOT NULL,
8+
`updated_at` integer DEFAULT CURRENT_TIMESTAMP NOT NULL,
9+
FOREIGN KEY (`workflow_id`) REFERENCES `workflows`(`id`) ON UPDATE no action ON DELETE cascade
10+
);
11+
--> statement-breakpoint
12+
CREATE INDEX `cron_triggers_workflow_id_idx` ON `cron_triggers` (`workflow_id`);--> statement-breakpoint
13+
CREATE INDEX `cron_triggers_active_idx` ON `cron_triggers` (`active`);--> statement-breakpoint
14+
CREATE INDEX `cron_triggers_last_run_idx` ON `cron_triggers` (`last_run`);--> statement-breakpoint
15+
CREATE INDEX `cron_triggers_next_run_at_idx` ON `cron_triggers` (`next_run_at`);--> statement-breakpoint
16+
CREATE INDEX `cron_triggers_active_next_run_at_idx` ON `cron_triggers` (`active`,`next_run_at`);--> statement-breakpoint
17+
CREATE INDEX `cron_triggers_created_at_idx` ON `cron_triggers` (`created_at`);--> statement-breakpoint
18+
CREATE INDEX `cron_triggers_updated_at_idx` ON `cron_triggers` (`updated_at`);

0 commit comments

Comments
 (0)