Skip to content

Commit 97c76b4

Browse files
committed
feat: Optimize credit exhaustion handling and execution state persistence in BaseRuntime
1 parent e55735d commit 97c76b4

1 file changed

Lines changed: 52 additions & 90 deletions

File tree

apps/api/src/runtime/base-runtime.ts

Lines changed: 52 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -191,30 +191,12 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
191191

192192
await this.monitoringService.sendUpdate(workflowSessionId, executionRecord);
193193

194-
// Check for credit exhaustion
195-
const exhaustedRecord = await this.handleCreditExhaustion(
196-
step,
197-
event.payload,
198-
instanceId,
199-
executionState,
200-
workflowSessionId
201-
);
202-
if (exhaustedRecord) {
203-
return exhaustedRecord;
204-
}
205-
206-
// Declare context outside try block for finally access
194+
// Declare context and exhaustion flag outside try block for finally access
207195
let executionContext: WorkflowExecutionContext | undefined;
196+
let isExhausted = false;
208197

209198
try {
210-
// Preload organization resources (secrets + integrations)
211-
await step.do(
212-
"preload organization resources",
213-
BaseRuntime.defaultStepConfig,
214-
async () => this.resourceProvider.initialize(organizationId)
215-
);
216-
217-
// Initialize workflow (validation + ordering)
199+
// Initialize workflow first (validation + ordering) to create context
218200
// @ts-expect-error - TS2589: Type instantiation depth limitation with Cloudflare Workflows step.do
219201
const { context, state } = await step.do(
220202
"initialise workflow",
@@ -230,6 +212,38 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
230212

231213
executionContext = context;
232214
executionState = state;
215+
216+
// Check for credit exhaustion early (before resource loading)
217+
const computeCost = this.getNodesComputeCost(
218+
event.payload.workflow.nodes
219+
);
220+
if (
221+
!(await this.hasEnoughComputeCredits(
222+
organizationId,
223+
event.payload.computeCredits,
224+
computeCost
225+
))
226+
) {
227+
isExhausted = true;
228+
executionRecord.startedAt = new Date();
229+
executionRecord.status = "exhausted" as any;
230+
executionRecord.error = "Insufficient compute credits";
231+
this.logTransition("submitted", "exhausted");
232+
await this.monitoringService.sendUpdate(
233+
workflowSessionId,
234+
executionRecord
235+
);
236+
// Skip to finally block to save
237+
return executionRecord;
238+
}
239+
240+
// Preload organization resources (secrets + integrations)
241+
await step.do(
242+
"preload organization resources",
243+
BaseRuntime.defaultStepConfig,
244+
async () => this.resourceProvider.initialize(organizationId)
245+
);
246+
233247
executionRecord.startedAt = new Date();
234248
executionRecord.status = getExecutionStatus(
235249
executionContext,
@@ -276,6 +290,7 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
276290
} finally {
277291
executionRecord.endedAt = new Date();
278292

293+
// Save to execution store only once, at the very end
279294
if (executionContext) {
280295
executionRecord = await this.persistFinalState(
281296
step,
@@ -284,7 +299,8 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
284299
executionRecord,
285300
userId,
286301
organizationId,
287-
instanceId
302+
instanceId,
303+
isExhausted
288304
);
289305
}
290306

@@ -758,70 +774,9 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
758774
}, 0);
759775
}
760776

761-
/**
762-
* Handles the case where organization has insufficient compute credits.
763-
* Returns execution record if exhausted, null otherwise.
764-
*/
765-
private async handleCreditExhaustion(
766-
step: WorkflowStep,
767-
params: RuntimeParams,
768-
instanceId: string,
769-
executionState: ExecutionState,
770-
workflowSessionId?: string
771-
): Promise<WorkflowExecution | null> {
772-
const { workflow, userId, organizationId, computeCredits } = params;
773-
774-
if (
775-
await this.hasEnoughComputeCredits(
776-
organizationId,
777-
computeCredits,
778-
this.getNodesComputeCost(workflow.nodes)
779-
)
780-
) {
781-
return null; // Sufficient credits, continue execution
782-
}
783-
784-
// Create a minimal context for status computation
785-
const minimalContext: WorkflowExecutionContext = {
786-
workflow,
787-
orderedNodeIds: workflow.nodes.map((n) => n.id),
788-
workflowId: workflow.id,
789-
organizationId,
790-
executionId: instanceId,
791-
};
792-
793-
this.logTransition(
794-
getExecutionStatus(minimalContext, executionState),
795-
"exhausted"
796-
);
797-
798-
const executionRecord = await step.do(
799-
"persist exhausted execution state",
800-
BaseRuntime.defaultStepConfig,
801-
async () =>
802-
this.executionStore.save({
803-
id: instanceId,
804-
workflowId: workflow.id,
805-
userId,
806-
organizationId,
807-
status: "exhausted" as any,
808-
nodeExecutions: this.buildNodeExecutions(
809-
workflow,
810-
minimalContext,
811-
executionState
812-
),
813-
error: this.createErrorReport(executionState),
814-
startedAt: new Date(),
815-
endedAt: new Date(),
816-
})
817-
);
818-
819-
await this.monitoringService.sendUpdate(workflowSessionId, executionRecord);
820-
return executionRecord;
821-
}
822-
823777
/**
824778
* Persists the final execution state with credit updates.
779+
* This is the ONLY place where executionStore.save() should be called.
825780
*/
826781
private async persistFinalState(
827782
step: WorkflowStep,
@@ -830,14 +785,20 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
830785
executionRecord: WorkflowExecution,
831786
userId: string,
832787
organizationId: string,
833-
instanceId: string
788+
instanceId: string,
789+
isExhausted: boolean
834790
): Promise<WorkflowExecution> {
835791
return await step.do(
836792
"persist final execution record",
837793
BaseRuntime.defaultStepConfig,
838794
async () => {
839-
// Update compute credits for executed nodes (skip in development)
840-
if (this.env.CLOUDFLARE_ENV !== "development") {
795+
// Compute final status
796+
const finalStatus = isExhausted
797+
? ("exhausted" as any)
798+
: getExecutionStatus(context, state);
799+
800+
// Update compute credits for executed nodes (skip in development and exhausted cases)
801+
if (!isExhausted && this.env.CLOUDFLARE_ENV !== "development") {
841802
await updateOrganizationComputeUsage(
842803
this.env.KV,
843804
organizationId,
@@ -849,18 +810,19 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
849810
);
850811
}
851812

813+
// Save to execution store - this happens exactly once per execution
852814
return this.executionStore.save({
853815
id: instanceId,
854816
workflowId: context.workflowId,
855817
userId,
856818
organizationId,
857-
status: getExecutionStatus(context, state) as any,
819+
status: finalStatus,
858820
nodeExecutions: this.buildNodeExecutions(
859821
context.workflow,
860822
context,
861823
state
862824
),
863-
error: this.createErrorReport(state),
825+
error: this.createErrorReport(state) ?? executionRecord.error,
864826
startedAt: executionRecord.startedAt,
865827
endedAt: executionRecord.endedAt,
866828
});
@@ -987,7 +949,7 @@ export class BaseRuntime extends WorkflowEntrypoint<Bindings, RuntimeParams> {
987949
return {
988950
nodeId: node.id,
989951
status: "skipped" as const,
990-
outputs: null,
952+
outputs: undefined,
991953
...skipInfo,
992954
};
993955
}

0 commit comments

Comments
 (0)