66 WorkflowExecutionStatus ,
77} from "@dafthunk/types" ;
88import * as crypto from "crypto" ;
9- import { and , desc , eq , inArray , SQL , sql } from "drizzle-orm" ;
9+ import { and , desc , eq , inArray , lte , SQL , sql } from "drizzle-orm" ;
1010import { alias } from "drizzle-orm/sqlite-core" ;
1111import { v7 as uuidv7 } from "uuid" ;
1212
@@ -1169,11 +1169,8 @@ export async function updateCronTriggerRunTimes(
11691169}
11701170
11711171/**
1172- * Get active due cron triggers with workflow, and deployment data.
1173- *
1174- * @param db Database instance
1175- * @param now Current time
1176- * @returns Array of due cron triggers with workflow data.
1172+ * Get active, due cron triggers together with the workflow and (optionally) a
1173+ * deployment that should be executed.
11771174 */
11781175export async function getDueCronTriggers (
11791176 db : ReturnType < typeof createDatabase > ,
@@ -1207,15 +1204,10 @@ export async function getDueCronTriggers(
12071204 selectedDeployment : selectedDeployment ,
12081205 } )
12091206 . from ( cronTriggers )
1210- . where (
1211- and (
1212- eq ( cronTriggers . active , true ) ,
1213- sql `${ cronTriggers . nextRunAt } <= ${ Math . floor ( now . getTime ( ) / 1000 ) } `
1214- )
1215- )
1207+ . where ( and ( eq ( cronTriggers . active , true ) , lte ( cronTriggers . nextRunAt , now ) ) )
12161208 . innerJoin ( workflows , eq ( workflows . id , cronTriggers . workflowId ) )
1217- . innerJoin ( latestByWorkflow , eq ( latestByWorkflow . workflowId , workflows . id ) )
1218- . innerJoin (
1209+ . leftJoin ( latestByWorkflow , eq ( latestByWorkflow . workflowId , workflows . id ) )
1210+ . leftJoin (
12191211 latestDeployment ,
12201212 and (
12211213 eq ( latestDeployment . workflowId , latestByWorkflow . workflowId ) ,
@@ -1231,14 +1223,21 @@ export async function getDueCronTriggers(
12311223 )
12321224 . all ( ) ;
12331225
1234- return rows . map ( ( r ) => ( {
1235- cronTrigger : r . cronTrigger ,
1236- workflow : r . workflow ,
1237- deployment :
1238- r . cronTrigger . versionAlias === "latest"
1239- ? r . latestDeployment
1240- : r . cronTrigger . versionAlias === "version"
1241- ? r . selectedDeployment
1242- : null ,
1243- } ) ) ;
1226+ return rows
1227+ . filter (
1228+ ( r ) =>
1229+ r . cronTrigger . versionAlias === "dev" ||
1230+ ( r . cronTrigger . versionAlias === "latest" && r . latestDeployment ) ||
1231+ ( r . cronTrigger . versionAlias === "version" && r . selectedDeployment )
1232+ )
1233+ . map ( ( r ) => ( {
1234+ cronTrigger : r . cronTrigger ,
1235+ workflow : r . workflow ,
1236+ deployment :
1237+ r . cronTrigger . versionAlias === "latest"
1238+ ? r . latestDeployment
1239+ : r . cronTrigger . versionAlias === "version"
1240+ ? r . selectedDeployment
1241+ : null ,
1242+ } ) ) ;
12441243}
0 commit comments