Skip to content

Commit d62437b

Browse files
committed
feat: add integration helpers and comprehensive tests for conditional flow logic
- Introduced new helper functions for creating flows and retrieving step states with skip and error info - Added detailed integration tests covering conditional step execution, skipping, cascade behavior, and optional dependencies - Included tests for 'if', 'ifNot', 'whenUnmet', 'whenFailed' conditions, and cascade vs non-cascade skips - Created a new test file for retries exhausted scenarios (placeholder for future tests)
1 parent 208f76f commit d62437b

3 files changed

Lines changed: 1148 additions & 0 deletions

File tree

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import type { postgres } from '../../sql.ts';
2+
import type { Json } from '@pgflow/core';
3+
4+
// ============= Test Helpers for Conditional Flow Integration Tests =============
5+
6+
/**
7+
* Step definition for conditional flows
8+
*
9+
* Note: baseDelay, timeout, and startDelay are in seconds (integers) per
10+
* pgflow.steps schema. The SQL function handles conversion to INTERVAL type.
11+
*/
12+
export interface ConditionalStepDef {
13+
slug: string;
14+
deps: string[];
15+
maxAttempts?: number;
16+
baseDelay?: number;
17+
timeout?: number;
18+
startDelay?: number;
19+
stepType?: 'single' | 'map';
20+
requiredInputPattern?: Json;
21+
forbiddenInputPattern?: Json;
22+
whenUnmet?: 'skip' | 'skip-cascade' | 'fail';
23+
whenFailed?: 'skip' | 'skip-cascade' | 'fail';
24+
}
25+
26+
/**
27+
* Create a flow with conditional steps using full add_step parameters
28+
*/
29+
export const createConditionalFlow = async (
30+
sql: postgres.Sql,
31+
flowSlug: string,
32+
steps: ConditionalStepDef[]
33+
) => {
34+
await sql`select pgflow.create_flow(${flowSlug});`;
35+
for (const step of steps) {
36+
const deps = step.deps.length > 0 ? step.deps : null;
37+
const maxAttempts = step.maxAttempts ?? null;
38+
const baseDelay = step.baseDelay ?? null;
39+
const timeout = step.timeout ?? null;
40+
const startDelay = step.startDelay ?? null;
41+
const stepType = step.stepType ?? 'single';
42+
const requiredPattern = step.requiredInputPattern
43+
? JSON.stringify(step.requiredInputPattern)
44+
: null;
45+
const forbiddenPattern = step.forbiddenInputPattern
46+
? JSON.stringify(step.forbiddenInputPattern)
47+
: null;
48+
const whenUnmet = step.whenUnmet ?? null;
49+
const whenFailed = step.whenFailed ?? null;
50+
51+
await sql`
52+
select pgflow.add_step(
53+
${flowSlug},
54+
${step.slug},
55+
deps_slugs => ${deps}::text[],
56+
max_attempts => ${maxAttempts}::smallint,
57+
base_delay => ${baseDelay}::interval,
58+
timeout => ${timeout}::interval,
59+
start_delay => ${startDelay}::interval,
60+
step_type => ${stepType}::pgflow.step_type,
61+
required_input_pattern => ${requiredPattern}::jsonb,
62+
forbidden_input_pattern => ${forbiddenPattern}::jsonb,
63+
when_unmet => ${whenUnmet}::pgflow.skip_option,
64+
when_failed => ${whenFailed}::pgflow.skip_option
65+
);
66+
`;
67+
}
68+
};
69+
70+
/**
71+
* Extended step state info including skip details
72+
*/
73+
export interface StepStateWithSkip {
74+
step_slug: string;
75+
status: string;
76+
skip_reason: string | null;
77+
skipped_at: string | null;
78+
}
79+
80+
/**
81+
* Get step states with skip information
82+
*/
83+
export const getStepStatesWithSkip = async (
84+
sql: postgres.Sql,
85+
runId: string
86+
): Promise<StepStateWithSkip[]> => {
87+
return await sql<StepStateWithSkip[]>`
88+
SELECT step_slug, status, skip_reason, skipped_at
89+
FROM pgflow.step_states
90+
WHERE run_id = ${runId}
91+
ORDER BY step_slug;
92+
`;
93+
};
94+
95+
/**
96+
* Extended task info including error details
97+
*/
98+
export interface TaskWithError {
99+
step_slug: string;
100+
status: string;
101+
error_message: string | null;
102+
attempts_count: number;
103+
}
104+
105+
/**
106+
* Get step tasks with error information
107+
*/
108+
export const getStepTasksWithError = async (
109+
sql: postgres.Sql,
110+
runId: string
111+
): Promise<TaskWithError[]> => {
112+
return await sql<TaskWithError[]>`
113+
SELECT step_slug, status, error_message, attempts_count
114+
FROM pgflow.step_tasks
115+
WHERE run_id = ${runId}
116+
ORDER BY step_slug, task_index;
117+
`;
118+
};

0 commit comments

Comments
 (0)