Skip to content

Commit 27bc474

Browse files
authored
feat: add support for skipped steps in client (#601)
# Add support for skipped steps in the client This PR adds support for handling skipped steps in the PgFlow client. Skipped steps can occur in three scenarios: - When a step's condition evaluates to false (`condition_unmet`) - When a dependency was skipped, causing cascading skips (`dependency_skipped`) - When a handler fails during evaluation (`handler_failed`) The implementation: - Adds a new `Skipped` status to the `FlowStepStatus` enum - Introduces a `SkipReason` type to track why a step was skipped - Extends `FlowStep` with `skipped_at` and `skip_reason` properties - Updates event handling to process skipped step events - Treats skipped as a terminal state (like completed and failed) - Adds support for `waitForStatus(FlowStepStatus.Skipped)` Comprehensive tests verify the client correctly: - Handles skipped step state from database snapshots - Processes skipped broadcast events - Maintains skipped as a terminal state - Resolves `waitForStatus(Skipped)` correctly - Handles all skip reasons appropriately
1 parent 84c189c commit 27bc474

File tree

12 files changed

+840
-186
lines changed

12 files changed

+840
-186
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import { describe, it, expect } from 'vitest';
2+
import { withPgNoTransaction } from '../helpers/db.js';
3+
import { createTestSupabaseClient } from '../helpers/setup.js';
4+
import { createTestFlow } from '../helpers/fixtures.js';
5+
import { grantMinimalPgflowPermissions } from '../helpers/permissions.js';
6+
import { PgflowClient } from '../../src/lib/PgflowClient.js';
7+
import { FlowStepStatus } from '../../src/lib/types.js';
8+
import { cleanupFlow } from '../helpers/cleanup.js';
9+
import { createEventTracker } from '../helpers/test-utils.js';
10+
import { skipStep } from '../helpers/skip-step.js';
11+
12+
/**
13+
* Tests for skipped step event handling in the client.
14+
*
15+
* Skipped steps can occur when:
16+
* - A step's condition evaluates to false (condition_unmet)
17+
* - A dependency was skipped, causing cascading skips (dependency_skipped)
18+
* - A handler fails during evaluation (handler_failed)
19+
*
20+
* These tests verify the client correctly:
21+
* - Receives and processes skipped broadcast events
22+
* - Updates step state with skipped_at and skip_reason
23+
* - Treats skipped as a terminal state
24+
* - Handles waitForStatus(Skipped) correctly
25+
*/
26+
describe('Skipped Step Handling', () => {
27+
it(
28+
'client handles skipped step state from database snapshot',
29+
withPgNoTransaction(async (sql) => {
30+
// This test verifies the client correctly handles skipped step state
31+
// when fetched from the database (e.g., on reconnect or late join)
32+
33+
const testFlow = createTestFlow('skip_snap');
34+
await cleanupFlow(sql, testFlow.slug);
35+
await grantMinimalPgflowPermissions(sql);
36+
37+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
38+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'will_skip_step')`;
39+
40+
const supabaseClient = createTestSupabaseClient();
41+
const pgflowClient = new PgflowClient(supabaseClient, {
42+
realtimeStabilizationDelayMs: 1000,
43+
});
44+
45+
// Start the flow
46+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
47+
const step = run.step('will_skip_step');
48+
49+
// Verify initial state is Started (root step)
50+
expect(step.status).toBe(FlowStepStatus.Started);
51+
52+
// Directly call pgflow.skip_step to simulate the step being skipped
53+
// This mimics what would happen when a condition evaluates to false
54+
await skipStep(sql, run.run_id, 'will_skip_step', 'condition_unmet');
55+
56+
// Wait for the skipped event to be received
57+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
58+
59+
// Verify skipped state
60+
expect(step.status).toBe(FlowStepStatus.Skipped);
61+
expect(step.skipped_at).toBeInstanceOf(Date);
62+
expect(step.skip_reason).toBe('condition_unmet');
63+
64+
// Verify output is null for skipped steps (per design decision Q1)
65+
expect(step.output).toBeNull();
66+
67+
await supabaseClient.removeAllChannels();
68+
}),
69+
{ timeout: 15000 }
70+
);
71+
72+
it(
73+
'receives skipped broadcast event and updates step state',
74+
withPgNoTransaction(async (sql) => {
75+
// This test verifies the client receives and processes skipped events
76+
// broadcast via Supabase realtime
77+
78+
const testFlow = createTestFlow('skip_broadcast');
79+
await cleanupFlow(sql, testFlow.slug);
80+
await grantMinimalPgflowPermissions(sql);
81+
82+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
83+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'skipped_step')`;
84+
85+
const supabaseClient = createTestSupabaseClient();
86+
const pgflowClient = new PgflowClient(supabaseClient, {
87+
realtimeStabilizationDelayMs: 1000,
88+
});
89+
90+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
91+
const step = run.step('skipped_step');
92+
93+
// Set up event tracking BEFORE the skip happens
94+
const tracker = createEventTracker();
95+
step.on('*', tracker.callback);
96+
97+
// Skip the step
98+
await skipStep(sql, run.run_id, 'skipped_step', 'handler_failed');
99+
100+
// Wait for the skipped status
101+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
102+
103+
// Verify we received the skipped event
104+
expect(tracker).toHaveReceivedEvent('step:skipped');
105+
expect(tracker).toHaveReceivedEvent('step:skipped', {
106+
run_id: run.run_id,
107+
step_slug: 'skipped_step',
108+
status: FlowStepStatus.Skipped,
109+
skip_reason: 'handler_failed',
110+
});
111+
112+
// Verify step state
113+
expect(step.status).toBe(FlowStepStatus.Skipped);
114+
expect(step.skip_reason).toBe('handler_failed');
115+
116+
await supabaseClient.removeAllChannels();
117+
}),
118+
{ timeout: 15000 }
119+
);
120+
121+
it(
122+
'waitForStatus(Skipped) resolves when step is skipped',
123+
withPgNoTransaction(async (sql) => {
124+
// Verify waitForStatus works correctly with Skipped status
125+
126+
const testFlow = createTestFlow('wait_skip');
127+
await cleanupFlow(sql, testFlow.slug);
128+
await grantMinimalPgflowPermissions(sql);
129+
130+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
131+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'wait_step')`;
132+
133+
const supabaseClient = createTestSupabaseClient();
134+
const pgflowClient = new PgflowClient(supabaseClient, {
135+
realtimeStabilizationDelayMs: 1000,
136+
});
137+
138+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
139+
const step = run.step('wait_step');
140+
141+
// Start waiting for skipped BEFORE the skip happens
142+
const waitPromise = step.waitForStatus(FlowStepStatus.Skipped, {
143+
timeoutMs: 10000,
144+
});
145+
146+
// Skip the step after a small delay
147+
setTimeout(async () => {
148+
await skipStep(sql, run.run_id, 'wait_step', 'condition_unmet');
149+
}, 100);
150+
151+
// Wait should resolve with the step
152+
const result = await waitPromise;
153+
expect(result).toBe(step);
154+
expect(result.status).toBe(FlowStepStatus.Skipped);
155+
expect(result.skip_reason).toBe('condition_unmet');
156+
157+
await supabaseClient.removeAllChannels();
158+
}),
159+
{ timeout: 15000 }
160+
);
161+
162+
it(
163+
'handles all skip reasons correctly',
164+
withPgNoTransaction(async (sql) => {
165+
// Verify all three skip reasons are handled correctly
166+
167+
const skipReasons = [
168+
'condition_unmet',
169+
'handler_failed',
170+
'dependency_skipped',
171+
] as const;
172+
173+
for (const skipReason of skipReasons) {
174+
const testFlow = createTestFlow(`skip_${skipReason}`);
175+
await cleanupFlow(sql, testFlow.slug);
176+
await grantMinimalPgflowPermissions(sql);
177+
178+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
179+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'reason_step')`;
180+
181+
const supabaseClient = createTestSupabaseClient();
182+
const pgflowClient = new PgflowClient(supabaseClient, {
183+
realtimeStabilizationDelayMs: 1000,
184+
});
185+
186+
const run = await pgflowClient.startFlow(testFlow.slug, {
187+
test: 'data',
188+
});
189+
const step = run.step('reason_step');
190+
191+
// Skip with specific reason
192+
await skipStep(sql, run.run_id, 'reason_step', skipReason);
193+
194+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
195+
196+
// Verify the skip reason was captured correctly
197+
expect(step.status).toBe(FlowStepStatus.Skipped);
198+
expect(step.skip_reason).toBe(skipReason);
199+
200+
await supabaseClient.removeAllChannels();
201+
}
202+
}),
203+
{ timeout: 45000 }
204+
);
205+
});

pkgs/client/__tests__/e2e/supabase-setup.test.ts

Lines changed: 0 additions & 77 deletions
This file was deleted.

pkgs/client/__tests__/helpers/event-factories.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
BroadcastStepStartedEvent,
88
BroadcastStepCompletedEvent,
99
BroadcastStepFailedEvent,
10+
BroadcastStepSkippedEvent,
1011
} from '../../src/lib/types';
1112

1213
/**
@@ -98,3 +99,17 @@ export function createStepFailedEvent(
9899
...overrides,
99100
};
100101
}
102+
103+
export function createStepSkippedEvent(
104+
overrides: Partial<BroadcastStepSkippedEvent> = {}
105+
): BroadcastStepSkippedEvent {
106+
return {
107+
event_type: 'step:skipped',
108+
run_id: '123e4567-e89b-12d3-a456-426614174000',
109+
step_slug: 'test-step',
110+
status: FlowStepStatus.Skipped,
111+
skipped_at: new Date().toISOString(),
112+
skip_reason: 'condition_unmet',
113+
...overrides,
114+
};
115+
}

pkgs/client/__tests__/helpers/fixtures.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ export function createTestFlow(flowSlug?: string) {
44
const uniqueSuffix = `${Date.now()}_${Math.random()
55
.toString(36)
66
.substr(2, 5)}`;
7+
8+
const maxBaseLength = 48 - uniqueSuffix.length - 1;
9+
const baseSlug = flowSlug ? flowSlug.slice(0, maxBaseLength) : 'test_flow';
10+
711
return {
8-
slug: flowSlug
9-
? `${flowSlug}_${uniqueSuffix}`
10-
: `test_flow_${uniqueSuffix}`,
12+
slug: `${baseSlug}_${uniqueSuffix}`,
1113
options: {},
1214
};
1315
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import type postgres from 'postgres';
2+
3+
/**
4+
* Skip a step using the internal _cascade_force_skip_steps function.
5+
* This is a test helper that wraps the internal function.
6+
* If pgflow.skip_step() is exposed publicly later, swap implementation here.
7+
*/
8+
export async function skipStep(
9+
sql: postgres.Sql,
10+
runId: string,
11+
stepSlug: string,
12+
skipReason: 'condition_unmet' | 'handler_failed' | 'dependency_skipped'
13+
): Promise<void> {
14+
await sql`SELECT pgflow._cascade_force_skip_steps(
15+
${runId}::uuid,
16+
${stepSlug}::text,
17+
${skipReason}::text
18+
)`;
19+
}

0 commit comments

Comments
 (0)