@@ -6,10 +6,11 @@ import { createDatabase, ExecutionStatusType } from "./db";
66import {
77 getDueCronTriggers ,
88 getOrganizationComputeCredits ,
9- saveExecution ,
109 updateCronTriggerRunTimes ,
1110} from "./db/queries" ;
12- import { ObjectStore } from "./runtime/object-store" ;
11+ import { DeploymentStore } from "./stores/deployment-store" ;
12+ import { ExecutionStore } from "./stores/execution-store" ;
13+ import { WorkflowStore } from "./stores/workflow-store" ;
1314
1415// This function will now handle the actual execution triggering and initial record saving
1516async function executeWorkflow (
@@ -24,7 +25,7 @@ async function executeWorkflow(
2425 db : ReturnType < typeof createDatabase > ,
2526 env : Bindings ,
2627 _ctx : ExecutionContext ,
27- objectStore : ObjectStore
28+ executionStore : ExecutionStore
2829) : Promise < void > {
2930 console . log ( `Attempting to execute workflow ${ workflowInfo . id } via cron.` ) ;
3031
@@ -66,7 +67,7 @@ async function executeWorkflow(
6667 status : "idle" as const ,
6768 } ) ) ;
6869
69- const initialExecution = await saveExecution ( db , {
70+ await executionStore . save ( {
7071 id : executionId ,
7172 workflowId : workflowInfo . id ,
7273 deploymentId : deploymentId ,
@@ -79,13 +80,6 @@ async function executeWorkflow(
7980 startedAt : new Date ( ) ,
8081 } ) ;
8182 console . log ( `Initial execution record saved for ${ executionId } ` ) ;
82-
83- // Save execution data to R2
84- try {
85- await objectStore . writeExecution ( initialExecution ) ;
86- } catch ( error ) {
87- console . error ( `Failed to save execution to R2: ${ executionId } ` , error ) ;
88- }
8983 } catch ( execError ) {
9084 console . error (
9185 `Error executing workflow ${ workflowInfo . id } or saving initial record:` ,
@@ -101,7 +95,9 @@ export async function handleCronTriggers(
10195) : Promise < void > {
10296 console . log ( `Cron event triggered at: ${ new Date ( event . scheduledTime ) } ` ) ;
10397 const db = createDatabase ( env . DB ) ;
104- const objectStore = new ObjectStore ( env . RESSOURCES ) ;
98+ const executionStore = new ExecutionStore ( env . DB , env . RESSOURCES ) ;
99+ const workflowStore = new WorkflowStore ( env . DB , env . RESSOURCES ) ;
100+ const deploymentStore = new DeploymentStore ( env . DB , env . RESSOURCES ) ;
105101 const now = new Date ( ) ;
106102
107103 try {
@@ -129,7 +125,17 @@ export async function handleCronTriggers(
129125 if ( versionAlias === "dev" ) {
130126 // Load workflow data from R2
131127 try {
132- workflowToExecute = await objectStore . readWorkflow ( workflow . id ) ;
128+ const workflowWithData = await workflowStore . getWithData (
129+ workflow . id ,
130+ workflow . organizationId
131+ ) ;
132+ if ( ! workflowWithData ) {
133+ console . error (
134+ `Failed to load workflow data for ${ workflow . id } : not found`
135+ ) ;
136+ continue ;
137+ }
138+ workflowToExecute = workflowWithData . data ;
133139 } catch ( error ) {
134140 console . error (
135141 `Failed to load workflow data from R2 for ${ workflow . id } :` ,
@@ -140,7 +146,7 @@ export async function handleCronTriggers(
140146 } else if ( deployment ) {
141147 // Load deployment workflow snapshot from R2
142148 try {
143- workflowToExecute = await objectStore . readDeploymentWorkflow (
149+ workflowToExecute = await deploymentStore . readWorkflowSnapshot (
144150 deployment . id
145151 ) ;
146152 } catch ( error ) {
@@ -173,7 +179,7 @@ export async function handleCronTriggers(
173179 db ,
174180 env ,
175181 ctx ,
176- objectStore
182+ executionStore
177183 ) ;
178184 } else {
179185 // This case should ideally not be reached due to the checks above,
0 commit comments