Skip to content

Commit 7aab36b

Browse files
committed
fix false-positive unconsumed event in hook loop replay (#1778)
Signed-off-by: Nathan Rajlich <n@n8.io>
1 parent 5432160 commit 7aab36b

3 files changed

Lines changed: 219 additions & 17 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@workflow/core': patch
3+
---
4+
5+
Fix false-positive unconsumed `step_created` errors when replay resumes a `for await` hook loop and appends more async work after the first promise-queue drain.

packages/core/src/events-consumer.ts

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,23 +114,32 @@ export class EventsConsumer {
114114
// is still unconsumed after the queue drains, it's truly orphaned.
115115
if (currentEvent !== null) {
116116
const checkVersion = ++this.unconsumedCheckVersion;
117-
this.pendingUnconsumedCheck = this.getPromiseQueue().then(() => {
118-
// Use a delayed setTimeout after the queue drains. The delay must be
119-
// long enough for promise chains to propagate across the VM boundary
120-
// (from resolve() in the host context through to the workflow code
121-
// calling subscribe() in the VM context). Node.js does not guarantee
122-
// that setTimeout(0) fires after all cross-context microtasks settle,
123-
// so we use a small but non-zero delay. Any subscribe() call that
124-
// arrives during this window will cancel the check via version
125-
// invalidation + clearTimeout.
126-
this.pendingUnconsumedTimeout = setTimeout(() => {
127-
this.pendingUnconsumedTimeout = null;
128-
if (this.unconsumedCheckVersion === checkVersion) {
129-
this.pendingUnconsumedCheck = null;
130-
this.onUnconsumedEvent(currentEvent);
131-
}
132-
}, 100);
133-
});
117+
this.pendingUnconsumedCheck = this.getPromiseQueue()
118+
.then(
119+
// Yield once after the first queue drain so promise chains resumed by
120+
// that drain can run across the VM boundary and append any follow-up
121+
// async work (for example: step_completed resolves -> for-await loop
122+
// resumes -> the next hook payload starts hydrating).
123+
() => new Promise<void>((resolve) => setTimeout(resolve, 0))
124+
)
125+
.then(() => this.getPromiseQueue())
126+
.then(() => {
127+
// Use a delayed setTimeout after the queue drains. The delay must be
128+
// long enough for promise chains to propagate across the VM boundary
129+
// (from resolve() in the host context through to the workflow code
130+
// calling subscribe() in the VM context). Node.js does not guarantee
131+
// that setTimeout(0) fires after all cross-context microtasks settle,
132+
// so we use a small but non-zero delay. Any subscribe() call that
133+
// arrives during this window will cancel the check via version
134+
// invalidation + clearTimeout.
135+
this.pendingUnconsumedTimeout = setTimeout(() => {
136+
this.pendingUnconsumedTimeout = null;
137+
if (this.unconsumedCheckVersion === checkVersion) {
138+
this.pendingUnconsumedCheck = null;
139+
this.onUnconsumedEvent(currentEvent);
140+
}
141+
}, 100);
142+
});
134143
}
135144
};
136145
}

packages/core/src/workflow.test.ts

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4145,6 +4145,194 @@ describe('runWorkflow', () => {
41454145
}
41464146
});
41474147

4148+
it('should not orphan the second step_created in a for-await hook loop when the next payload hydration is delayed', async () => {
4149+
const ops: Promise<any>[] = [];
4150+
const workflowRunId = 'wrun_123';
4151+
const workflowRun: WorkflowRun = {
4152+
runId: workflowRunId,
4153+
workflowName: 'workflow',
4154+
status: 'running',
4155+
input: await dehydrateWorkflowArguments(
4156+
[],
4157+
workflowRunId,
4158+
noEncryptionKey,
4159+
ops
4160+
),
4161+
createdAt: new Date('2024-01-01T00:00:00.000Z'),
4162+
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
4163+
startedAt: new Date('2024-01-01T00:00:00.000Z'),
4164+
deploymentId: 'test-deployment',
4165+
};
4166+
4167+
const payload1 = await dehydrateStepReturnValue(
4168+
{ type: 'subscribe', id: 1 },
4169+
workflowRunId,
4170+
noEncryptionKey,
4171+
ops
4172+
);
4173+
const payload2 = await dehydrateStepReturnValue(
4174+
{ type: 'done', done: true },
4175+
workflowRunId,
4176+
noEncryptionKey,
4177+
ops
4178+
);
4179+
const stepResult1 = await dehydrateStepReturnValue(
4180+
{ processed: true, type: 'subscribe', id: 1 },
4181+
workflowRunId,
4182+
noEncryptionKey,
4183+
ops
4184+
);
4185+
const stepResult2 = await dehydrateStepReturnValue(
4186+
{ processed: true, type: 'done' },
4187+
workflowRunId,
4188+
noEncryptionKey,
4189+
ops
4190+
);
4191+
4192+
const events: Event[] = [
4193+
{
4194+
eventId: 'evnt-run-created',
4195+
runId: workflowRunId,
4196+
eventType: 'run_created',
4197+
createdAt: new Date('2024-01-01T00:00:00.000Z'),
4198+
},
4199+
{
4200+
eventId: 'evnt-run-started',
4201+
runId: workflowRunId,
4202+
eventType: 'run_started',
4203+
createdAt: new Date('2024-01-01T00:00:00.100Z'),
4204+
},
4205+
{
4206+
eventId: 'evnt-hook-created',
4207+
runId: workflowRunId,
4208+
eventType: 'hook_created',
4209+
correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ',
4210+
eventData: { token: 'test-token' },
4211+
createdAt: new Date('2024-01-01T00:00:00.200Z'),
4212+
},
4213+
{
4214+
eventId: 'evnt-wait-created',
4215+
runId: workflowRunId,
4216+
eventType: 'wait_created',
4217+
correlationId: 'wait_01HK153X00SP082GGA0AAJC6PK',
4218+
eventData: { resumeAt: new Date('2024-01-02T00:00:00.000Z') },
4219+
createdAt: new Date('2024-01-01T00:00:00.300Z'),
4220+
},
4221+
{
4222+
eventId: 'evnt-hook-1',
4223+
runId: workflowRunId,
4224+
eventType: 'hook_received',
4225+
correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ',
4226+
eventData: { payload: payload1 },
4227+
createdAt: new Date('2024-01-01T00:00:01.000Z'),
4228+
},
4229+
{
4230+
eventId: 'evnt-step-1-created',
4231+
runId: workflowRunId,
4232+
eventType: 'step_created',
4233+
correlationId: 'step_01HK153X00SP082GGA0AAJC6PM',
4234+
eventData: { stepName: 'processPayload', input: payload1 },
4235+
createdAt: new Date('2024-01-01T00:00:01.100Z'),
4236+
},
4237+
{
4238+
eventId: 'evnt-step-1-started',
4239+
runId: workflowRunId,
4240+
eventType: 'step_started',
4241+
correlationId: 'step_01HK153X00SP082GGA0AAJC6PM',
4242+
createdAt: new Date('2024-01-01T00:00:01.200Z'),
4243+
},
4244+
{
4245+
eventId: 'evnt-step-1-completed',
4246+
runId: workflowRunId,
4247+
eventType: 'step_completed',
4248+
correlationId: 'step_01HK153X00SP082GGA0AAJC6PM',
4249+
eventData: { result: stepResult1 },
4250+
createdAt: new Date('2024-01-01T00:00:01.300Z'),
4251+
},
4252+
{
4253+
eventId: 'evnt-hook-2',
4254+
runId: workflowRunId,
4255+
eventType: 'hook_received',
4256+
correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ',
4257+
eventData: { payload: payload2 },
4258+
createdAt: new Date('2024-01-01T00:00:02.000Z'),
4259+
},
4260+
{
4261+
eventId: 'evnt-step-2-created',
4262+
runId: workflowRunId,
4263+
eventType: 'step_created',
4264+
correlationId: 'step_01HK153X00SP082GGA0AAJC6PN',
4265+
eventData: { stepName: 'processPayload', input: payload2 },
4266+
createdAt: new Date('2024-01-01T00:00:02.100Z'),
4267+
},
4268+
{
4269+
eventId: 'evnt-step-2-started',
4270+
runId: workflowRunId,
4271+
eventType: 'step_started',
4272+
correlationId: 'step_01HK153X00SP082GGA0AAJC6PN',
4273+
createdAt: new Date('2024-01-01T00:00:02.200Z'),
4274+
},
4275+
{
4276+
eventId: 'evnt-step-2-completed',
4277+
runId: workflowRunId,
4278+
eventType: 'step_completed',
4279+
correlationId: 'step_01HK153X00SP082GGA0AAJC6PN',
4280+
eventData: { result: stepResult2 },
4281+
createdAt: new Date('2024-01-01T00:00:02.300Z'),
4282+
},
4283+
];
4284+
4285+
const serialization = await import('./serialization.js');
4286+
const originalHydrate = serialization.hydrateStepReturnValue;
4287+
let callCount = 0;
4288+
const spy = vi
4289+
.spyOn(serialization, 'hydrateStepReturnValue')
4290+
.mockImplementation(async (...args) => {
4291+
callCount++;
4292+
const delay = [5, 5, 150, 5][callCount - 1] ?? 5;
4293+
await new Promise((resolve) => setTimeout(resolve, delay));
4294+
return originalHydrate(...args);
4295+
});
4296+
4297+
try {
4298+
const result = await runWorkflow(
4299+
`const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")];
4300+
const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")];
4301+
const processPayload = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("processPayload");
4302+
async function workflow() {
4303+
const hook = createHook({ token: 'test-token' });
4304+
void sleep('1d');
4305+
const results = [];
4306+
for await (const payload of hook) {
4307+
const processed = await processPayload(payload);
4308+
results.push(processed);
4309+
if (payload.done) {
4310+
break;
4311+
}
4312+
}
4313+
return results;
4314+
}${getWorkflowTransformCode('workflow')}`,
4315+
workflowRun,
4316+
events,
4317+
noEncryptionKey
4318+
);
4319+
4320+
expect(result).not.toBeInstanceOf(Error);
4321+
const hydrated = await hydrateWorkflowReturnValue(
4322+
result,
4323+
workflowRunId,
4324+
noEncryptionKey,
4325+
ops
4326+
);
4327+
expect(hydrated).toEqual([
4328+
{ processed: true, type: 'subscribe', id: 1 },
4329+
{ processed: true, type: 'done' },
4330+
]);
4331+
} finally {
4332+
spy.mockRestore();
4333+
}
4334+
});
4335+
41484336
it('should not trigger unconsumed event error for step_created with 3 sequential steps', async () => {
41494337
// Extended version: 3 sequential steps to increase the chance of
41504338
// the timing race manifesting. Each step_created immediately follows

0 commit comments

Comments
 (0)