Skip to content

Commit ec042e7

Browse files
authored
Backport workflow suspension failure fixes (#6196)
1 parent 5f468d8 commit ec042e7

4 files changed

Lines changed: 143 additions & 18 deletions

File tree

.changeset/short-cameras-watch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/workflow": patch
3+
---
4+
5+
Fix workflow suspension handling so mixed failures preserve non-interrupt causes and DurableDeferred.into isolates inner suspension state.

packages/workflow/src/DurableDeferred.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
import type { NonEmptyReadonlyArray } from "effect/Array"
55
import type * as Brand from "effect/Brand"
6-
import type * as Cause from "effect/Cause"
6+
import * as Cause from "effect/Cause"
77
import * as Context from "effect/Context"
88
import * as Effect from "effect/Effect"
99
import * as Encoding from "effect/Encoding"
@@ -159,12 +159,23 @@ export const into: {
159159
> =>
160160
Effect.contextWithEffect((context: Context.Context<WorkflowEngine | WorkflowInstance>) => {
161161
const engine = Context.get(context, EngineTag)
162-
const instance = Context.get(context, InstanceTag)
163-
return Effect.onExit(effect, (exit) => {
164-
if (instance.suspended) return Effect.void
162+
const parentInstance = Context.get(context, InstanceTag)
163+
const instance = { ...parentInstance }
164+
return Effect.onExit(Effect.provideService(effect, InstanceTag, instance), (exit) => {
165+
if (Exit.isFailure(exit) && Cause.isInterrupted(exit.cause)) {
166+
const isInterruptedOnly = Cause.isInterruptedOnly(exit.cause)
167+
if (isInterruptedOnly && instance.suspended) {
168+
parentInstance.suspended = true
169+
return Effect.void
170+
} else if (!isInterruptedOnly) {
171+
exit = Exit.failCause(
172+
Cause.filter(exit.cause, (cause) => !Cause.isInterruptType(cause))
173+
)
174+
}
175+
}
165176
return engine.deferredDone(self, {
166-
workflowName: instance.workflow.name,
167-
executionId: instance.executionId,
177+
workflowName: parentInstance.workflow.name,
178+
executionId: parentInstance.executionId,
168179
deferredName: self.name,
169180
exit
170181
})

packages/workflow/src/Workflow.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -532,12 +532,15 @@ export const intoResult = <A, E, R>(
532532
Effect.scoped,
533533
Effect.matchCauseEffect({
534534
onSuccess: (value) => Effect.succeed(new Complete({ exit: Exit.succeed(value) })),
535-
onFailure: (cause): Effect.Effect<Result<A, E>> =>
536-
instance.suspended
535+
onFailure(cause): Effect.Effect<Result<A, E>> {
536+
const isInterruptedOnly = Cause.isInterruptedOnly(cause)
537+
const filtered = isInterruptedOnly ? cause : withoutInterrupts(cause)
538+
return instance.suspended && isInterruptedOnly
537539
? Effect.succeed(new Suspended({ cause: instance.cause }))
538-
: (!instance.interrupted && Cause.isInterruptedOnly(cause)) || (!captureDefects && Cause.isDie(cause))
539-
? Effect.failCause(cause as Cause.Cause<never>)
540-
: Effect.succeed(new Complete({ exit: Exit.failCause(cause) }))
540+
: (!instance.interrupted && isInterruptedOnly) || (!captureDefects && Cause.isDie(cause))
541+
? Effect.failCause(filtered as Cause.Cause<never>)
542+
: Effect.succeed(new Complete({ exit: Exit.failCause(filtered) }))
543+
}
541544
}),
542545
Effect.onExit((exit) => {
543546
if (Exit.isFailure(exit)) {
@@ -551,6 +554,11 @@ export const intoResult = <A, E, R>(
551554
)
552555
})
553556

557+
const withoutInterrupts = <E>(cause: Cause.Cause<E>): Cause.Cause<E> =>
558+
Cause.isInterrupted(cause)
559+
? Cause.filter(cause, (cause) => !Cause.isInterruptType(cause))
560+
: cause
561+
554562
/**
555563
* @since 1.0.0
556564
* @category Result
@@ -562,11 +570,6 @@ export const wrapActivityResult = <A, E, R>(
562570
Effect.contextWithEffect((context: Context.Context<WorkflowInstance>) => {
563571
const instance = Context.get(context, InstanceTag)
564572
const state = instance.activityState
565-
if (instance.suspended) {
566-
return waitForZero(instance).pipe(
567-
Effect.andThen(suspend(instance))
568-
)
569-
}
570573
if (state.count === 0) state.latch.unsafeClose()
571574
state.count++
572575
return Effect.onExit(effect, (exit) => {

packages/workflow/test/WorkflowEngine.test.ts

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
import { describe, expect, it } from "@effect/vitest"
2-
import { DurableClock, Workflow, WorkflowEngine } from "@effect/workflow"
1+
import { assert, describe, expect, it } from "@effect/vitest"
2+
import { DurableClock, DurableDeferred, Workflow, WorkflowEngine } from "@effect/workflow"
3+
import * as Cause from "effect/Cause"
34
import * as Effect from "effect/Effect"
45
import * as Exit from "effect/Exit"
6+
import * as FiberId from "effect/FiberId"
57
import * as Layer from "effect/Layer"
68
import * as Schema from "effect/Schema"
79
import * as TestClock from "effect/TestClock"
@@ -36,8 +38,112 @@ describe("WorkflowEngine", () => {
3638
)
3739
)
3840
))
41+
42+
it.effect("does not squash workflow failures after suspension", () =>
43+
Effect.gen(function*() {
44+
const instance = WorkflowEngine.WorkflowInstance.initial(TestWorkflow, "workflow-failure")
45+
instance.suspended = true
46+
47+
const result = yield* Workflow.intoResult(Effect.fail("boom")).pipe(
48+
Effect.provideService(WorkflowEngine.WorkflowInstance, instance)
49+
)
50+
51+
assert.deepStrictEqual(result, new Workflow.Complete({ exit: Exit.fail("boom") }))
52+
}))
53+
54+
it.effect("removes suspension interrupts from mixed workflow failures", () =>
55+
Effect.gen(function*() {
56+
const instance = WorkflowEngine.WorkflowInstance.initial(TestWorkflow, "mixed-workflow-failure")
57+
const cause = Cause.parallel(Cause.fail("boom"), Cause.interrupt(FiberId.none))
58+
59+
const result = yield* Workflow.intoResult(Effect.failCause(cause)).pipe(
60+
Effect.provideService(WorkflowEngine.WorkflowInstance, instance)
61+
)
62+
63+
assert.deepStrictEqual(result, new Workflow.Complete({ exit: Exit.fail("boom") }))
64+
}))
65+
66+
it.effect("DurableDeferred.into isolates inner suspension from failure recording", () =>
67+
Effect.gen(function*() {
68+
const exits: Array<Exit.Exit<number, string>> = []
69+
const instance = WorkflowEngine.WorkflowInstance.initial(TestWorkflow, "deferred-failure")
70+
const deferred = DurableDeferred.make("deferred-failure", {
71+
success: Schema.Number,
72+
error: Schema.String
73+
})
74+
75+
yield* DurableDeferred.into(
76+
Effect.flatMap(WorkflowEngine.WorkflowInstance, (instance) =>
77+
Effect.zipRight(
78+
Effect.sync(() => {
79+
instance.suspended = true
80+
}),
81+
Effect.fail("boom")
82+
)),
83+
deferred
84+
).pipe(
85+
Effect.exit,
86+
Effect.provideService(WorkflowEngine.WorkflowInstance, instance),
87+
Effect.provideService(WorkflowEngine.WorkflowEngine, makeDeferredEngine(exits))
88+
)
89+
90+
assert.isFalse(instance.suspended)
91+
assert.deepStrictEqual(exits, [Exit.fail("boom")])
92+
}))
93+
94+
it.effect("DurableDeferred.into propagates interrupt-only suspension to the parent", () =>
95+
Effect.gen(function*() {
96+
const exits: Array<Exit.Exit<number, string>> = []
97+
const instance = WorkflowEngine.WorkflowInstance.initial(TestWorkflow, "deferred-suspended")
98+
const deferred = DurableDeferred.make("deferred-suspended", {
99+
success: Schema.Number,
100+
error: Schema.String
101+
})
102+
103+
yield* DurableDeferred.into(
104+
Effect.flatMap(WorkflowEngine.WorkflowInstance, (instance) =>
105+
Effect.zipRight(
106+
Effect.sync(() => {
107+
instance.suspended = true
108+
}),
109+
Effect.interrupt
110+
)),
111+
deferred
112+
).pipe(
113+
Effect.exit,
114+
Effect.provideService(WorkflowEngine.WorkflowInstance, instance),
115+
Effect.provideService(WorkflowEngine.WorkflowEngine, makeDeferredEngine(exits))
116+
)
117+
118+
assert.isTrue(instance.suspended)
119+
assert.deepStrictEqual(exits, [])
120+
}))
39121
})
40122

123+
const TestWorkflow = Workflow.make({
124+
name: "TestWorkflow",
125+
payload: {},
126+
idempotencyKey: () => "test",
127+
success: Schema.Number,
128+
error: Schema.String
129+
})
130+
131+
const makeDeferredEngine = (exits: Array<Exit.Exit<number, string>>): WorkflowEngine.WorkflowEngine["Type"] =>
132+
WorkflowEngine.WorkflowEngine.of({
133+
register: () => Effect.void,
134+
execute: () => Effect.die("not implemented"),
135+
poll: () => Effect.succeed(undefined),
136+
interrupt: () => Effect.void,
137+
resume: () => Effect.void,
138+
activityExecute: () => Effect.die("not implemented"),
139+
deferredResult: () => Effect.succeed(undefined),
140+
deferredDone: (_deferred: DurableDeferred.Any, options: { readonly exit: Exit.Exit<unknown, unknown> }) =>
141+
Effect.sync(() => {
142+
exits.push(options.exit as Exit.Exit<number, string>)
143+
}),
144+
scheduleClock: () => Effect.void
145+
} as any)
146+
41147
const LongWorkflow = Workflow.make({
42148
name: "LongWorkflow",
43149
payload: {

0 commit comments

Comments
 (0)