Skip to content

Commit a33d705

Browse files
WIP: start issue 240 skipped node status implementation
1 parent 3327a60 commit a33d705

10 files changed

Lines changed: 404 additions & 2 deletions

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc
66

77
## [0.2.4] - Unreleased
88

9+
### Added
10+
11+
- **Skipped node terminal status:** downstream nodes that are not executed after an upstream node failure are now marked `skipped` instead of remaining indefinitely `pending` (#240).
12+
13+
### Changed
14+
15+
- **Failure observability:** on node-level workflow failure, pg_durable now performs a terminal reconciliation pass to mark remaining `pending` nodes as `skipped` when supported by the installed schema.
16+
917
## [0.2.3] - 2026-06-17
1018

1119
Provider-line note: v0.2.3 stays in the `duroxide-pg` provider compatibility line started in v0.2.2, so the upgrade source is v0.2.2 (`sql/pg_durable--0.2.2--0.2.3.sql`).

USER_GUIDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,7 @@ SQL |=> 'a': SELECT 1
13001300
| `✗ Failed` | Node encountered an error |
13011301
| `⏳ Running` | Node currently executing |
13021302
| `○ Pending` | Node waiting to execute |
1303+
| `⊘ Skipped` | Node was not executed because an upstream node failed |
13031304

13041305
### Visualizing Complex Structures
13051306

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# Issue 240 Design and Implementation Plan
2+
3+
## Summary
4+
5+
Issue 240 requests a clear way to distinguish nodes that were never executed because the workflow already failed. Today those nodes remain in `pending`, which is ambiguous.
6+
7+
Proposed enhancement:
8+
- Add a terminal node status: `skipped`.
9+
- On workflow failure, convert remaining `pending` nodes to `skipped` when the failure came from node execution (that is, at least one node is `failed`).
10+
11+
This keeps the existing instance-level status model unchanged (`df.instances.status` still ends as `failed`) while making node-level outcomes explicit.
12+
13+
## Current Behavior (Observed)
14+
15+
Live repro on local pg instance:
16+
- Step 1 SQL node: `completed`
17+
- Step 2 SQL node (intentional error): `failed`
18+
- Step 3 SQL node (never executed): `pending`
19+
- Instance: `failed`
20+
21+
This is the ambiguity reported in issue 240.
22+
23+
## Goals
24+
25+
- Make unexecuted downstream nodes observable as `skipped` after terminal workflow failure.
26+
- Preserve backward compatibility of the new binary against old schemas.
27+
- Keep implementation minimal and low-risk (no new tables or public function signatures).
28+
29+
## Non-Goals
30+
31+
- No change to `df.instances.status` vocabulary.
32+
- No new monitoring projection table in this iteration.
33+
- No attempt to classify every failure mode as producing `skipped` (for example, pre-execution policy rejection may remain as-is).
34+
35+
## Proposed Design
36+
37+
### 1. Schema: add `skipped` to allowed node statuses
38+
39+
Update node status check constraints so `skipped` is valid:
40+
- Install DDL in `src/lib.rs`:
41+
- `nodes_status_chk`: include `skipped`.
42+
- Upgrade DDL in next upgrade script:
43+
- drop and recreate `nodes_status_chk` (or equivalent alteration) to include `skipped`.
44+
45+
`nodes_result_status_chk` can remain unchanged because `skipped` nodes should not carry `result`.
46+
47+
### 2. Runtime: mark pending nodes as skipped at terminal failure
48+
49+
Add an activity that performs one set-based update for a single instance:
50+
51+
- New activity (suggested): `mark_pending_nodes_skipped`.
52+
- SQL behavior:
53+
- `UPDATE df.nodes`
54+
- `SET status = 'skipped', updated_at = now()`
55+
- `WHERE instance_id = $1 AND status = 'pending'`
56+
- guarded by `EXISTS (SELECT 1 FROM df.nodes WHERE instance_id = $1 AND status = 'failed')`
57+
58+
Guard rationale:
59+
- Avoid changing semantics for failures that occur before any node execution (for example, instance-level rejection paths that currently do not mark node failures).
60+
- Keep behavior aligned with issue wording: downstream steps skipped due to an earlier step failure.
61+
62+
### 3. Orchestration integration point
63+
64+
In `execute_function_graph` top-level failure path (when instance is being moved to `failed`):
65+
- After node failure is recorded and before/after instance status update, schedule the new activity once for that instance.
66+
- Make the update idempotent and best-effort (safe if retried).
67+
68+
Why this placement:
69+
- Central place where terminal failure is decided.
70+
- Avoids needing per-node graph traversal logic.
71+
- Handles linear and composite graphs (`THEN`, `IF`, `JOIN`, `RACE`, `LOOP`) uniformly.
72+
73+
### 4. Optional hardening in `update_node_status`
74+
75+
No required behavior change, but add a small guard in plan review:
76+
- Keep allowing transitions to `completed` / `failed` as today.
77+
- Ensure no code path writes result for `skipped`.
78+
79+
## Backward Compatibility and Upgrade Strategy
80+
81+
### Binary backward compatibility (B1)
82+
83+
New `.so` may run against an older schema where `nodes_status_chk` does not include `skipped`.
84+
If runtime writes `skipped` in that state, updates would fail.
85+
86+
Plan:
87+
- Runtime schema detection for `skipped` support before attempting the bulk update.
88+
- If unsupported, no-op and keep legacy behavior (`pending`).
89+
90+
Implementation options:
91+
- Option A (preferred): activity checks `pg_constraint` definition for `nodes_status_chk` containing `skipped`.
92+
- Option B: attempt update inside savepoint-like handling and ignore check-constraint violation.
93+
94+
Option A is clearer and avoids noisy errors.
95+
96+
### Schema upgrade (A/B2)
97+
98+
- Create next upgrade script `sql/pg_durable--0.2.2--0.2.3.sql` (version number illustrative; use actual next version).
99+
- Add DDL to update `nodes_status_chk` to include `skipped`.
100+
- Ensure fresh-install schema (from current `src/lib.rs` extension SQL) matches upgraded schema.
101+
102+
## Test Plan
103+
104+
### Unit / Rust-level
105+
106+
- Activity test: when instance has a failed node plus pending nodes, only pending nodes become `skipped`.
107+
- Activity test: when no failed node exists, no rows are changed.
108+
- Compatibility test hook: when schema does not support `skipped`, activity no-ops without error.
109+
110+
### E2E SQL
111+
112+
Add a new E2E SQL test (for example `tests/e2e/sql/49_failed_downstream_nodes_skipped.sql`):
113+
- Build a 3-step sequence where step 2 fails.
114+
- Wait for terminal instance status `failed`.
115+
- Assert:
116+
- step 1 node is `completed`
117+
- step 2 node is `failed`
118+
- step 3 node is `skipped` (not `pending`)
119+
- Include clear failure messages.
120+
121+
Also verify an instance-level failure path with no node failure (if represented in existing tests) does not force all nodes to `skipped`.
122+
123+
### Upgrade tests
124+
125+
Run:
126+
- `./scripts/test-upgrade.sh`
127+
128+
Focus expectations:
129+
- Scenario A: fresh install vs upgraded schema parity for `nodes_status_chk`.
130+
- Scenario B1: new `.so` still works against old schema; `skipped` behavior degrades safely to legacy (`pending`) until upgrade.
131+
- Scenario B2: data remains accessible post-upgrade.
132+
133+
## Docs Plan
134+
135+
Update user-facing status vocabulary references:
136+
- `USER_GUIDE.md` (node status semantics)
137+
- `docs/api-reference.md` if status values are documented there
138+
- Optional release note entry in `CHANGELOG.md`
139+
140+
## Rollout and Risk
141+
142+
Risks:
143+
- Writing `skipped` against non-upgraded schema (mitigated by runtime check).
144+
- Unexpected interactions with in-flight parallel constructs (mitigated by set-based terminal update and idempotence).
145+
146+
Rollout:
147+
1. Land schema + runtime + tests in one PR.
148+
2. Validate full local matrix: fmt, clippy, unit, e2e, upgrade.
149+
3. Document behavior change as node-level observability enhancement.
150+
151+
## Acceptance Criteria
152+
153+
- Failed workflow with downstream unexecuted nodes shows `skipped` for those nodes (post-upgrade schema).
154+
- No regression to existing instance terminal statuses.
155+
- New binary remains functional against pre-upgrade schema.
156+
- E2E and upgrade tests pass.
157+
158+
## Implementation Checklist
159+
160+
- [ ] Add `skipped` to node status check constraint in install DDL (`src/lib.rs`).
161+
- [ ] Add upgrade script change for `nodes_status_chk`.
162+
- [ ] Add new activity to mark pending nodes as skipped for failed instances.
163+
- [ ] Register activity in `src/registry.rs`.
164+
- [ ] Call activity from orchestration failure path.
165+
- [ ] Add schema-compatibility guard for pre-upgrade schemas.
166+
- [ ] Add E2E SQL coverage.
167+
- [ ] Update docs and changelog notes.
168+
- [ ] Run fmt, clippy, unit, e2e, upgrade tests.

docs/upgrade-testing.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,14 @@ gate, so they never need to be added to the exclude list.
203203
Each schema-changing PR should add a section here documenting what changed,
204204
what the upgrade script handles, and any backward compatibility considerations.
205205

206+
### v0.2.3 → v0.2.4
207+
208+
#### #240 node-level `skipped` status for downstream unexecuted steps
209+
- **DDL change:** `df.nodes.status` constraint (`nodes_status_chk`) now allows `skipped` in addition to `pending`, `running`, `completed`, and `failed`. Upgrade script: `sql/pg_durable--0.2.3--0.2.4.sql`.
210+
- **Scenario A considerations:** fresh-install and upgraded schemas must agree on the `nodes_status_chk` status vocabulary including `skipped`.
211+
- **Scenario B1 considerations:** the new `.so` must still run against pre-0.2.4 schemas where `nodes_status_chk` does not allow `skipped`. Runtime logic therefore detects schema support first and no-ops (retains legacy `pending` behavior) when unsupported.
212+
- **Scenario B2 considerations:** no data migration required. Existing rows are preserved; only new terminal reconciliation on failed runs can mark unexecuted nodes as `skipped` post-upgrade.
213+
206214
### v0.2.2 → v0.2.3
207215

208216
#### Rename duroxide provider schema to `_duroxide` for fresh installs
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the PostgreSQL License.
3+
4+
//! MarkPendingNodesSkipped activity - marks unexecuted nodes as skipped
5+
//! after a node-level failure.
6+
7+
use duroxide::ActivityContext;
8+
use sqlx::PgPool;
9+
use std::sync::Arc;
10+
11+
/// Activity name for registration and scheduling
12+
pub const NAME: &str = "pg_durable::activity::mark-pending-nodes-skipped";
13+
14+
/// Mark pending nodes as skipped for a failed instance.
15+
///
16+
/// Behavior:
17+
/// - No-op on schemas that do not support 'skipped' in nodes_status_chk.
18+
/// - No-op unless the instance has at least one failed node.
19+
/// - Updates only nodes still in 'pending'.
20+
pub async fn execute(
21+
ctx: ActivityContext,
22+
pool: Arc<PgPool>,
23+
input_json: String,
24+
) -> Result<String, String> {
25+
let input: serde_json::Value = serde_json::from_str(&input_json)
26+
.map_err(|e| format!("Failed to parse skipped-status input: {e}"))?;
27+
28+
let instance_id = input["instance_id"].as_str().ok_or("Missing instance_id")?;
29+
30+
let skipped_supported: bool = sqlx::query_scalar(
31+
"SELECT COALESCE(
32+
(
33+
SELECT pg_catalog.pg_get_constraintdef(c.oid)
34+
FROM pg_catalog.pg_constraint c
35+
JOIN pg_catalog.pg_class t ON t.oid = c.conrelid
36+
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
37+
WHERE n.nspname = 'df'
38+
AND t.relname = 'nodes'
39+
AND c.conname = 'nodes_status_chk'
40+
LIMIT 1
41+
) LIKE '%''skipped''%',
42+
false
43+
)",
44+
)
45+
.fetch_one(pool.as_ref())
46+
.await
47+
.map_err(|e| format!("Failed to detect skipped status support: {e}"))?;
48+
49+
if !skipped_supported {
50+
ctx.trace_info(format!(
51+
"Schema does not support node status 'skipped'; leaving pending nodes unchanged for instance {instance_id}"
52+
));
53+
return Ok("Skipped status unsupported on schema; no-op".to_string());
54+
}
55+
56+
let rows_affected = sqlx::query(
57+
"UPDATE df.nodes n
58+
SET status = 'skipped', updated_at = now()
59+
WHERE n.instance_id = $1
60+
AND n.status = 'pending'
61+
AND EXISTS (
62+
SELECT 1
63+
FROM df.nodes f
64+
WHERE f.instance_id = $1
65+
AND f.status = 'failed'
66+
)",
67+
)
68+
.bind(instance_id)
69+
.execute(pool.as_ref())
70+
.await
71+
.map_err(|e| format!("Failed to mark pending nodes as skipped: {e}"))?
72+
.rows_affected();
73+
74+
let msg = format!("Marked {rows_affected} pending nodes as skipped for instance {instance_id}");
75+
ctx.trace_info(&msg);
76+
Ok(msg)
77+
}

src/activities/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@
99
pub mod execute_http;
1010
pub mod execute_sql;
1111
pub mod load_function_graph;
12+
pub mod mark_pending_nodes_skipped;
1213
pub mod update_instance_status;
1314
pub mod update_node_status;

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ ALTER TABLE df.nodes
248248
ADD CONSTRAINT nodes_result_name_chk
249249
CHECK (result_name IS NULL OR result_name OPERATOR(pg_catalog.~) '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID,
250250
ADD CONSTRAINT nodes_status_chk
251-
CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed'])) NOT VALID,
251+
CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed', 'skipped'])) NOT VALID,
252252
ADD CONSTRAINT nodes_result_status_chk
253253
CHECK (result IS NULL OR status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed'])) NOT VALID,
254254
ADD CONSTRAINT nodes_structure_chk

src/orchestrations/execute_function_graph.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,28 @@ pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result<St
216216
}
217217
Err(err) => {
218218
ctx.trace_info(format!("Function failed with error: {err}"));
219+
let instance_id = input.instance_id.clone();
220+
221+
// If this was a node-level failure, mark any remaining pending nodes
222+
// as skipped for clearer terminal observability.
223+
let skipped_input = serde_json::json!({
224+
"instance_id": instance_id,
225+
});
226+
if let Err(e) = ctx
227+
.schedule_activity(
228+
activities::mark_pending_nodes_skipped::NAME,
229+
skipped_input.to_string(),
230+
)
231+
.await
232+
{
233+
ctx.trace_info(format!(
234+
"Failed to mark pending nodes as skipped for instance {}: {}",
235+
input.instance_id, e
236+
));
237+
}
238+
219239
let status_input = serde_json::json!({
220-
"instance_id": input.instance_id,
240+
"instance_id": instance_id,
221241
"status": "failed"
222242
});
223243
let _ = ctx

src/registry.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
1818
let graph_pool = pool.clone();
1919
let status_pool = pool.clone();
2020
let node_status_pool = pool.clone();
21+
let skipped_pool = pool.clone();
2122
let http_pool = pool.clone();
2223

2324
ActivityRegistry::builder()
@@ -37,6 +38,10 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
3738
let pool = node_status_pool.clone();
3839
async move { activities::update_node_status::execute(ctx, pool, input_json).await }
3940
})
41+
.register(activities::mark_pending_nodes_skipped::NAME, move |ctx: ActivityContext, input_json: String| {
42+
let pool = skipped_pool.clone();
43+
async move { activities::mark_pending_nodes_skipped::execute(ctx, pool, input_json).await }
44+
})
4045
.register(activities::execute_http::NAME, move |ctx: ActivityContext, config_json: String| {
4146
let pool = http_pool.clone();
4247
async move { activities::execute_http::execute(ctx, pool, config_json).await }

0 commit comments

Comments
 (0)