Skip to content

Commit bc229b1

Browse files
committed
feat(api): enhance cron job execution with workflow data handling and improved trigger management
1 parent cc59f53 commit bc229b1

2 files changed

Lines changed: 147 additions & 55 deletions

File tree

apps/api/src/cron.ts

Lines changed: 83 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,87 @@
11
import CronParser from "cron-parser"; // Or your chosen cron library
2-
import { and, eq, lte } from "drizzle-orm";
32
import { drizzle } from "drizzle-orm/d1";
3+
import { Node, Workflow as WorkflowType } from "@dafthunk/types"; // Using Workflow as WorkflowType from types
4+
import { ExecutionContext } from "@cloudflare/workers-types";
45

5-
import { cronTriggers } from "./db/schema";
6+
import * as schema from "./db/schema";
7+
import { WorkflowRow } from "./db/schema";
8+
import { getDueCronTriggers, updateCronTriggerRunTimes, saveExecution } from "./db/queries";
9+
import { createDatabase, ExecutionStatusType } from "./db"; // Added ExecutionStatusType
10+
import { Bindings } from "./context"; // For the full env type
611

7-
// This is a placeholder. You need to implement how workflows are triggered.
8-
// If executeWorkflow becomes more complex or used elsewhere, it could also be in its own file.
12+
// This function will now handle the actual execution triggering and initial record saving
913
async function executeWorkflow(
10-
workflowId: string,
11-
_env: any,
12-
_ctx: any
14+
workflowRow: WorkflowRow,
15+
db: ReturnType<typeof createDatabase>,
16+
env: Bindings,
17+
_ctx: ExecutionContext
1318
): Promise<void> {
14-
console.log(`Executing workflow ${workflowId}`);
15-
// Example: await fetch(`https://your-service.com/api/execute-workflow/${workflowId}`, { method: 'POST' });
16-
// Or: directly call a function if workflow logic is bundled
17-
// Remember to handle errors appropriately.
18-
await Promise.resolve(); // Replace with actual workflow execution logic
19+
console.log(`Attempting to execute workflow ${workflowRow.id} via cron.`);
20+
21+
// Assuming workflowRow.data is of type WorkflowType from @dafthunk/types
22+
// which includes type, nodes, and edges.
23+
const workflowData: WorkflowType = workflowRow.data;
24+
25+
try {
26+
const executionInstance = await env.EXECUTE.create({
27+
params: {
28+
userId: "cron_trigger",
29+
organizationId: workflowRow.organizationId,
30+
workflow: {
31+
id: workflowRow.id,
32+
name: workflowRow.name,
33+
handle: workflowRow.handle,
34+
type: workflowData.type, // Reinstated: type from workflow data
35+
nodes: workflowData.nodes, // Reinstated: nodes from workflow data
36+
edges: workflowData.edges, // Reinstated: edges from workflow data
37+
},
38+
monitorProgress: false,
39+
deploymentId: undefined,
40+
},
41+
});
42+
43+
const executionId = executionInstance.id;
44+
console.log(`Workflow ${workflowRow.id} started with execution ID: ${executionId}`);
45+
46+
const nodeExecutions = workflowData.nodes.map((node: Node) => ({
47+
nodeId: node.id,
48+
status: "idle" as const,
49+
}));
50+
51+
await saveExecution(db, {
52+
id: executionId,
53+
workflowId: workflowRow.id,
54+
deploymentId: undefined,
55+
userId: "cron_trigger",
56+
organizationId: workflowRow.organizationId,
57+
status: "executing" as ExecutionStatusType,
58+
visibility: "private",
59+
nodeExecutions,
60+
createdAt: new Date(),
61+
updatedAt: new Date(),
62+
startedAt: new Date(),
63+
});
64+
console.log(`Initial execution record saved for ${executionId}`);
65+
66+
} catch (execError) {
67+
console.error(`Error executing workflow ${workflowRow.id} or saving initial record:`, execError);
68+
}
1969
}
2070

2171
export async function handleCronTriggers(
2272
event: ScheduledEvent,
23-
env: { DB: D1Database },
73+
env: Bindings, // Updated to use the full Bindings type
2474
ctx: ExecutionContext
2575
): Promise<void> {
2676
console.log(`Cron event triggered at: ${new Date(event.scheduledTime)}`);
27-
const db = drizzle(env.DB, { schema: { cronTriggers } });
77+
// Ensure db is initialized correctly if createDatabase is the standard way
78+
// drizzle can also be initialized with just env.DB and schema
79+
const db = createDatabase(env.DB);
2880

2981
const now = new Date();
3082

3183
try {
32-
const dueTriggers = await db
33-
.select()
34-
.from(cronTriggers)
35-
.where(
36-
and(eq(cronTriggers.active, true), lte(cronTriggers.nextRunAt, now))
37-
)
38-
.all();
84+
const dueTriggers = await getDueCronTriggers(db, now);
3985

4086
if (dueTriggers.length === 0) {
4187
console.log("No due cron triggers found.");
@@ -44,34 +90,35 @@ export async function handleCronTriggers(
4490

4591
console.log(`Found ${dueTriggers.length} due cron triggers.`);
4692

47-
for (const trigger of dueTriggers) {
48-
console.log(`Processing trigger for workflow: ${trigger.workflowId}`);
93+
for (const item of dueTriggers) {
94+
// item contains { cronTrigger: CronTriggerRow, workflow: WorkflowRow }
95+
console.log(`Processing trigger for workflow: ${item.workflow.id}`);
4996
try {
50-
// 1. Execute the workflow
51-
await executeWorkflow(trigger.workflowId, env, ctx);
97+
// 1. Execute the workflow (asynchronously, no await here if we don't wait for termination)
98+
// However, executeWorkflow itself is async due to EXECUTE.create and saveExecution.
99+
// We should await it to ensure the initial saveExecution completes before updating run times.
100+
await executeWorkflow(item.workflow, db, env, ctx);
52101

53102
// 2. Calculate next run time
54-
const interval = CronParser.parse(trigger.cronExpression, {
103+
const interval = CronParser.parse(item.cronTrigger.cronExpression, {
55104
currentDate: now,
56105
});
57106
const nextRun = interval.next().toDate();
58107

59108
// 3. Update the trigger in the database
60-
await db
61-
.update(cronTriggers)
62-
.set({
63-
lastRun: now,
64-
nextRunAt: nextRun,
65-
})
66-
.where(eq(cronTriggers.workflowId, trigger.workflowId))
67-
.execute();
109+
await updateCronTriggerRunTimes(
110+
db,
111+
item.cronTrigger.workflowId,
112+
nextRun,
113+
now
114+
);
68115

69116
console.log(
70-
`Workflow ${trigger.workflowId} executed. Next run at: ${nextRun.toISOString()}`
117+
`Workflow ${item.workflow.id} processing initiated. Next run at: ${nextRun.toISOString()}`
71118
);
72119
} catch (err) {
73120
console.error(
74-
`Error processing trigger for workflow ${trigger.workflowId}:`,
121+
`Error processing trigger for workflow ${item.workflow.id}:`,
75122
err
76123
);
77124
// Optionally, you might want to implement retry logic or disable the trigger after too many failures.

apps/api/src/db/queries.ts

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
WorkflowExecutionStatus,
77
} from "@dafthunk/types";
88
import * as crypto from "crypto";
9-
import { and, desc, eq, inArray, SQL, sql } from "drizzle-orm";
9+
import { and, desc, eq, inArray, SQL, sql, lte } from "drizzle-orm";
1010
import { alias } from "drizzle-orm/sqlite-core";
1111
import { v7 as uuidv7 } from "uuid";
1212

@@ -1080,12 +1080,12 @@ export async function getExecutionWithVisibility(
10801080
}
10811081

10821082
/**
1083-
* Get a cron trigger for a workflow
1083+
* Get a cron trigger.
10841084
*
10851085
* @param db Database instance
10861086
* @param workflowId Workflow ID
10871087
* @param organizationIdOrHandle Organization ID or handle
1088-
* @returns The cron trigger record or undefined if not found
1088+
* @returns Cron trigger record or undefined.
10891089
*/
10901090
export async function getCronTrigger(
10911091
db: ReturnType<typeof createDatabase>,
@@ -1110,34 +1110,79 @@ export async function getCronTrigger(
11101110
}
11111111

11121112
/**
1113-
* Upsert a cron trigger for a workflow
1113+
* Create or update a cron trigger.
11141114
*
11151115
* @param db Database instance
1116-
* @param values Values to insert or update
1117-
* @returns The upserted cron trigger record
1116+
* @param values Cron trigger data.
1117+
* @returns The created or updated cron trigger record.
11181118
*/
11191119
export async function upsertCronTrigger(
11201120
db: ReturnType<typeof createDatabase>,
1121-
values: CronTriggerInsert & { updatedAt?: Date } // Allow updatedAt for explicit update
1121+
values: CronTriggerInsert
11221122
): Promise<CronTriggerRow> {
1123-
// Separate updatedAt for the 'set' part of onConflictDoUpdate
1124-
const { updatedAt, ...insertValues } = values;
1125-
1126-
const updateValues: Partial<CronTriggerInsert> & { updatedAt: Date } = {
1127-
cronExpression: values.cronExpression,
1128-
active: values.active,
1129-
nextRunAt: values.nextRunAt,
1130-
updatedAt: updatedAt || new Date(), // Use provided updatedAt or current time
1123+
const updateSet = {
1124+
...values,
1125+
updatedAt: new Date(),
11311126
};
1132-
11331127
const [upsertedCron] = await db
11341128
.insert(cronTriggers)
1135-
.values(insertValues) // createdAt will use default from schema
1129+
.values(values)
11361130
.onConflictDoUpdate({
1137-
target: cronTriggers.workflowId, // Primary key for conflict
1138-
set: updateValues,
1131+
target: cronTriggers.workflowId,
1132+
set: updateSet,
11391133
})
11401134
.returning();
11411135

11421136
return upsertedCron;
11431137
}
1138+
1139+
/**
1140+
* Update run times for a cron trigger.
1141+
*
1142+
* @param db Database instance
1143+
* @param workflowId Workflow ID
1144+
* @param nextRunAt New next run time
1145+
* @param lastRun Current execution time
1146+
* @returns Updated cron trigger record or undefined.
1147+
*/
1148+
export async function updateCronTriggerRunTimes(
1149+
db: ReturnType<typeof createDatabase>,
1150+
workflowId: string,
1151+
nextRunAt: Date,
1152+
lastRun: Date
1153+
): Promise<CronTriggerRow | undefined> {
1154+
const [updatedTrigger] = await db
1155+
.update(cronTriggers)
1156+
.set({
1157+
lastRun: lastRun,
1158+
nextRunAt: nextRunAt,
1159+
updatedAt: new Date(),
1160+
})
1161+
.where(eq(cronTriggers.workflowId, workflowId))
1162+
.returning();
1163+
return updatedTrigger;
1164+
}
1165+
1166+
/**
1167+
* Get due cron triggers with workflow data.
1168+
*
1169+
* @param db Database instance
1170+
* @param now Current time
1171+
* @returns Array of due cron triggers with workflow data.
1172+
*/
1173+
export async function getDueCronTriggers(
1174+
db: ReturnType<typeof createDatabase>,
1175+
now: Date
1176+
): Promise<{ cronTrigger: CronTriggerRow; workflow: WorkflowRow }[]> {
1177+
return db
1178+
.select({
1179+
cronTrigger: cronTriggers,
1180+
workflow: workflows
1181+
})
1182+
.from(cronTriggers)
1183+
.innerJoin(workflows, eq(cronTriggers.workflowId, workflows.id))
1184+
.where(
1185+
and(eq(cronTriggers.active, true), lte(cronTriggers.nextRunAt, now))
1186+
)
1187+
.all();
1188+
}

0 commit comments

Comments
 (0)