Skip to content

Commit 8e7fa36

Browse files
authored
feat: add allowDataLoss parameter to flow compilation (#545)
# Add `allow_data_loss` Parameter to Flow Compilation This PR adds a new `allow_data_loss` parameter to the `pgflow.ensure_flow_compiled` function, enabling destructive flow recompilation in production environments when explicitly enabled. ## Key Changes - Added an optional `allow_data_loss` boolean parameter (default: false) to the `pgflow.ensure_flow_compiled` function - Modified the recompilation logic to check `v_is_local OR allow_data_loss` before allowing destructive operations - Updated the edge worker API to support a new `compilation` configuration object that can include `allowDataLoss: true` - Replaced the deprecated `ensureCompiledOnStartup` boolean with the more flexible `compilation` config - Added comprehensive tests for the new parameter in both SQL and TypeScript ## Benefits - Provides a safe way to iterate on flow designs in production environments when needed - Maintains the default safe behavior (no data loss in production) while adding an explicit opt-in for destructive changes - Improves developer experience by allowing controlled recompilation without environment switching ## Usage ```typescript // Allow destructive recompilation in any environment const worker = createFlowWorker(MyFlow, { compilation: { allowDataLoss: true }, // other config... }); // Skip compilation entirely (pre-compiled flows) const worker = createFlowWorker(MyFlow, { compilation: false, // other config... }); ```
1 parent 92e72cb commit 8e7fa36

16 files changed

Lines changed: 589 additions & 52 deletions

pkgs/core/schemas/0100_function_ensure_flow_compiled.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
-- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] }
44
create or replace function pgflow.ensure_flow_compiled(
55
flow_slug text,
6-
shape jsonb
6+
shape jsonb,
7+
allow_data_loss boolean default false
78
)
89
returns jsonb
910
language plpgsql
@@ -48,7 +49,7 @@ BEGIN
4849
-- 6. Shapes differ - auto-detect environment via is_local()
4950
v_is_local := pgflow.is_local();
5051

51-
IF v_is_local THEN
52+
IF v_is_local OR allow_data_loss THEN
5253
-- Recompile in local/dev: full deletion + fresh compile
5354
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
5455
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);

pkgs/core/src/database-types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ export type Database = {
485485
Returns: undefined
486486
}
487487
ensure_flow_compiled: {
488-
Args: { flow_slug: string; shape: Json }
488+
Args: { allow_data_loss?: boolean; flow_slug: string; shape: Json }
489489
Returns: Json
490490
}
491491
ensure_workers: {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
-- Drop old 2-parameter version before creating new 3-parameter version
2+
DROP FUNCTION IF EXISTS "pgflow"."ensure_flow_compiled" (text, jsonb);
3+
4+
-- Create "ensure_flow_compiled" function with allow_data_loss parameter
5+
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("flow_slug" text, "shape" jsonb, "allow_data_loss" boolean DEFAULT false) RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
6+
DECLARE
7+
v_lock_key int;
8+
v_flow_exists boolean;
9+
v_db_shape jsonb;
10+
v_differences text[];
11+
v_is_local boolean;
12+
BEGIN
13+
-- Generate lock key from flow_slug (deterministic hash)
14+
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);
15+
16+
-- Acquire transaction-level advisory lock
17+
-- Serializes concurrent compilation attempts for same flow
18+
PERFORM pg_advisory_xact_lock(1, v_lock_key);
19+
20+
-- 1. Check if flow exists
21+
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
22+
INTO v_flow_exists;
23+
24+
-- 2. If flow missing: compile (both environments)
25+
IF NOT v_flow_exists THEN
26+
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
27+
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
28+
END IF;
29+
30+
-- 3. Get current shape from DB
31+
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);
32+
33+
-- 4. Compare shapes
34+
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);
35+
36+
-- 5. If shapes match: return verified
37+
IF array_length(v_differences, 1) IS NULL THEN
38+
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
39+
END IF;
40+
41+
-- 6. Shapes differ - auto-detect environment via is_local()
42+
v_is_local := pgflow.is_local();
43+
44+
IF v_is_local OR allow_data_loss THEN
45+
-- Recompile in local/dev: full deletion + fresh compile
46+
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
47+
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
48+
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
49+
ELSE
50+
-- Fail in production
51+
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
52+
END IF;
53+
END;
54+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:s6n1PLk8xdvwsEbFcEbLUMaRj0zxGmAfBcaVkk3GKak=
1+
h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -13,3 +13,4 @@ h1:s6n1PLk8xdvwsEbFcEbLUMaRj0zxGmAfBcaVkk3GKak=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
1414
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
1515
20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw=
16+
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Simulate production environment (not local)
6+
select set_config('app.settings.jwt_secret', 'production-jwt-secret-that-differs-from-local', true);
7+
8+
-- Setup: Create flow with different shape
9+
select pgflow.create_flow('default_loss_flow');
10+
select pgflow.add_step('default_loss_flow', 'old_step');
11+
12+
-- Test: Different shape should return mismatch when allow_data_loss defaults to false
13+
select is(
14+
(
15+
select result->>'status'
16+
from pgflow.ensure_flow_compiled(
17+
'default_loss_flow',
18+
'{
19+
"steps": [
20+
{"slug": "new_step", "stepType": "single", "dependencies": []}
21+
]
22+
}'::jsonb,
23+
false -- allow_data_loss = false (explicit)
24+
) as result
25+
),
26+
'mismatch',
27+
'Should return mismatch when allow_data_loss=false in production'
28+
);
29+
30+
-- Verify differences are returned
31+
select ok(
32+
(
33+
select jsonb_array_length(result->'differences') > 0
34+
from pgflow.ensure_flow_compiled(
35+
'default_loss_flow',
36+
'{
37+
"steps": [
38+
{"slug": "new_step", "stepType": "single", "dependencies": []}
39+
]
40+
}'::jsonb,
41+
false
42+
) as result
43+
),
44+
'Should return differences when allow_data_loss=false'
45+
);
46+
47+
-- Verify database was NOT modified
48+
select is(
49+
(select step_slug from pgflow.steps where flow_slug = 'default_loss_flow'),
50+
'old_step',
51+
'Database should not be modified when allow_data_loss=false'
52+
);
53+
54+
select finish();
55+
rollback;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Simulate production environment (not local)
6+
select set_config('app.settings.jwt_secret', 'production-jwt-secret-that-differs-from-local', true);
7+
8+
-- Setup: Create flow with different shape
9+
select pgflow.create_flow('allow_loss_flow');
10+
select pgflow.add_step('allow_loss_flow', 'old_step');
11+
12+
-- Test: Different shape should recompile when allow_data_loss=true even when is_local()=false
13+
select is(
14+
(
15+
select result->>'status'
16+
from pgflow.ensure_flow_compiled(
17+
'allow_loss_flow',
18+
'{
19+
"steps": [
20+
{"slug": "new_step", "stepType": "single", "dependencies": []}
21+
]
22+
}'::jsonb,
23+
true -- allow_data_loss = true
24+
) as result
25+
),
26+
'recompiled',
27+
'Should recompile when allow_data_loss=true even in production'
28+
);
29+
30+
-- Verify old step is gone
31+
select is(
32+
(select count(*)::int from pgflow.steps where flow_slug = 'allow_loss_flow' and step_slug = 'old_step'),
33+
0,
34+
'Old step should be deleted when allow_data_loss=true'
35+
);
36+
37+
-- Verify new step exists
38+
select is(
39+
(select count(*)::int from pgflow.steps where flow_slug = 'allow_loss_flow' and step_slug = 'new_step'),
40+
1,
41+
'New step should be created when allow_data_loss=true'
42+
);
43+
44+
select finish();
45+
rollback;

pkgs/edge-worker/src/core/Queries.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ export class Queries {
5454

5555
async ensureFlowCompiled(
5656
flowSlug: string,
57-
shape: FlowShape
57+
shape: FlowShape,
58+
allowDataLoss = false
5859
): Promise<EnsureFlowCompiledResult> {
5960
// SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers,
6061
// arrays, and plain objects), but TypeScript can't prove this because FlowShape
@@ -68,7 +69,8 @@ export class Queries {
6869
const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>`
6970
SELECT pgflow.ensure_flow_compiled(
7071
${flowSlug},
71-
${shapeJson}::jsonb
72+
${shapeJson}::jsonb,
73+
${allowDataLoss}
7274
) as result
7375
`;
7476
return result.result;

pkgs/edge-worker/src/core/workerConfigTypes.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,31 @@ export type QueueWorkerConfig = {
140140
*/
141141
export type ResolvedQueueWorkerConfig = Required<Omit<QueueWorkerConfig, 'retryDelay' | 'retryLimit'>>;
142142

143+
/**
144+
* Compilation configuration for flow workers.
145+
*/
146+
export interface CompilationConfig {
147+
/**
148+
* Allow destructive recompilation on shape mismatch.
149+
* When true, ALL flow data (runs, tasks, history) will be deleted
150+
* if the flow shape changes. Use only during iteration phases (e.g., Lovable).
151+
* When false (default), recompilation only happens in local environment.
152+
* @default false
153+
*/
154+
allowDataLoss?: boolean;
155+
}
156+
143157
/**
144158
* Configuration for the flow worker with two-phase polling
145159
*/
146160
export type FlowWorkerConfig = {
147161
/**
148-
* Whether to verify/compile flow at worker startup.
149-
* When true (default), worker calls pgflow.ensure_flow_compiled() before polling.
150-
* Set to false to skip compilation check (useful if flows are pre-compiled via CLI).
151-
* @default true
162+
* Compilation behavior at worker startup.
163+
* - undefined or {}: Auto-detect (local=recompile, prod=fail on mismatch)
164+
* - false: Skip compilation check (flows pre-compiled via CLI)
165+
* - { allowDataLoss: true }: Allow destructive recompile anywhere
152166
*/
153-
ensureCompiledOnStartup?: boolean;
167+
compilation?: false | CompilationConfig;
154168

155169
/**
156170
* How many tasks are processed at the same time
@@ -209,7 +223,7 @@ export type FlowWorkerConfig = {
209223
/**
210224
* Resolved flow configuration with all defaults applied
211225
*/
212-
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env' | 'ensureCompiledOnStartup'>> & {
226+
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env' | 'compilation'>> & {
213227
connectionString: string | undefined;
214228
env: Record<string, string | undefined>;
215229
};

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import type { AnyFlow } from '@pgflow/dsl';
66
import { extractFlowShape } from '@pgflow/dsl';
77
import { FlowShapeMismatchError } from './errors.js';
88

9+
import type { CompilationConfig } from '../core/workerConfigTypes.js';
10+
911
export interface FlowLifecycleConfig {
1012
heartbeatInterval?: number;
11-
ensureCompiledOnStartup?: boolean;
13+
compilation?: false | CompilationConfig;
1214
}
1315

1416
/**
@@ -30,15 +32,16 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3032
private _edgeFunctionName?: string;
3133
private heartbeatInterval: number;
3234
private lastHeartbeat = 0;
33-
private ensureCompiledOnStartup: boolean;
35+
private compilation: false | CompilationConfig;
3436

3537
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
3638
this.queries = queries;
3739
this.flow = flow;
3840
this.logger = logger;
3941
this.workerState = new WorkerState(logger);
4042
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
41-
this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true;
43+
// Default to {} (enable compilation with default settings) if not specified
44+
this.compilation = config?.compilation ?? {};
4245
}
4346

4447
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
@@ -53,10 +56,11 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
5356

5457
// Compile/verify flow as part of Starting (before registering worker)
5558
let compilationStatus: CompilationStatus = 'verified';
56-
if (this.ensureCompiledOnStartup) {
57-
compilationStatus = await this.ensureFlowCompiled();
59+
if (this.compilation !== false) {
60+
const allowDataLoss = this.compilation.allowDataLoss ?? false;
61+
compilationStatus = await this.ensureFlowCompiled(allowDataLoss);
5862
} else {
59-
this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`);
63+
this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (compilation: false)`);
6064
}
6165

6266
// Log startup banner with compilation status
@@ -71,12 +75,13 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
7175
this.workerState.transitionTo(States.Running);
7276
}
7377

74-
private async ensureFlowCompiled(): Promise<CompilationStatus> {
78+
private async ensureFlowCompiled(allowDataLoss: boolean): Promise<CompilationStatus> {
7579
const shape = extractFlowShape(this.flow);
7680

7781
const result = await this.queries.ensureFlowCompiled(
7882
this.flow.slug,
79-
shape
83+
shape,
84+
allowDataLoss
8085
);
8186

8287
if (result.status === 'mismatch') {

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
9191
flow,
9292
createLogger('FlowWorkerLifecycle'),
9393
{
94-
ensureCompiledOnStartup: config.ensureCompiledOnStartup ?? true
94+
compilation: config.compilation
9595
}
9696
);
9797

0 commit comments

Comments
 (0)