diff --git a/Cargo.lock b/Cargo.lock index 55e3e3d..500d612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -689,8 +689,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "duroxide" version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9377f5bf81d9a8ce56a13f913f60ca0e0ba8a9464349c407e7d11c326488bf0c" +source = "git+https://github.com/microsoft/duroxide.git?branch=pinodeca%2Fcontinue-parent-link#6650410626110f5e29c32269bec0156b15d6ed7c" dependencies = [ "async-trait", "futures", @@ -702,6 +701,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 45a400a..fffe89c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,14 @@ serde_json = { version = "1.0", features = ["preserve_order"] } uuid = { version = "1.0", features = ["v4", "serde"] } # duroxide integration -duroxide = "=0.1.29" +# Using git dependency on pinodeca/continue-parent-link branch which preserves the +# parent link when a sub-orchestration calls continue_as_new, unblocking the +# sub-orchestration approach for df.loop. +# Compatibility: duroxide-pg 0.1.34 has been verified to compile and run correctly +# against this branch (same public API as 0.1.29 — PR #31 is a runtime-only change). +# Once the branch is merged and a new duroxide release is published, revert both +# entries back to crates.io version pins as a compatible pair. +duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" } duroxide-pg = "=0.1.34" tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] } @@ -55,6 +62,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] pgrx-tests = "=0.16.1" +# Override the crates.io duroxide with the git branch for both the direct dependency +# and duroxide-pg's transitive dependency on duroxide. +[patch.crates-io] +duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" } + [profile.dev] panic = "unwind" diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 12efe7d..2d9e9b6 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -946,6 +946,12 @@ SELECT df.start( Use `@>` operator or `df.loop()` to create functions that run forever. Each iteration creates a new execution with fresh state (via continue-as-new). +> **Note:** Each `df.loop()` runs as its own child orchestration. The loop's iterations +> (the continue-as-new generations) are scoped to that child, so any nodes *before* the +> loop in the graph run exactly once and any nodes *after* the loop run once the loop +> exits. When inspecting `df.instances`, a looping function therefore shows a parent +> instance plus a separate child instance for the loop. + ```sql -- Simple heartbeat every 30 seconds (using @> operator) SELECT df.start( diff --git a/docs/exec-id-plan.md b/docs/exec-id-plan.md new file mode 100644 index 0000000..e1cac55 --- /dev/null +++ b/docs/exec-id-plan.md @@ -0,0 +1,134 @@ +# Plan: execution_id node state (TEMPORARY — do not commit) + +> This plan file is scratch. Delete `docs/exec-id-plan.md` before the feature +> branch is finalized (last step below). It must not land in history. + +## Goal + +Replace the `node-state-model` PR's stored `status_reason` approach with an +`execution_id`-based model where **physical** statuses are written by the owning +orchestration and **implicit** statuses (`skipped`, `cancelled`) are *derived at +read time* in `df.instance_nodes` from each node's ancestors — with no duroxide +dependency in the read path. + +--- + +## Step 1 — Schema: `status_details JSONB` on `df.nodes` + +- Add `status_details JSONB` (nullable) to `df.nodes` in [src/lib.rs](src/lib.rs) + `create_tables` DDL. +- Stores the writer's `execution_id` (the ordered path; see Step 4) plus room + for small read-path payloads we already need (the IF decision; the RACE + winner — see Step 6). +- Incidental cleanup agreed earlier: **drop the dead `error` column** and stop + overloading `result` for non-result data. +- Keep `nodes_status_chk` limited to the **physical** statuses + (`pending`/`running`/`completed`/`failed`). `skipped`/`cancelled` are never + stored, so they must NOT be added to the check constraint. +- Grants: add `status_details` to nothing in the user INSERT column list (worker + writes it); remove `error` from any column lists in `df.grant_usage` / + `df.revoke_usage`. +- **Upgrade script** `sql/pg_durable--0.2.4--0.2.5.sql` (next version): `ADD + COLUMN status_details`, `DROP COLUMN error`, constraint + grant adjustments. +- **Binary backward-compat (B1):** the new `.so` must still run against older + schemas that lack `status_details`/still have `error`. The write path (Step 2) + and read path (Step 3) must degrade gracefully when the column is absent + (runtime column detection, or a guarded query) — verify with + `scripts/test-upgrade.sh`. + +## Step 2 — Write path: fence status updates on `execution_id` + +- In [src/activities/update_node_status.rs](src/activities/update_node_status.rs): + accept the caller's `execution_id`, write it to `status_details`, and make the + `UPDATE` **conditional**: apply only when the incoming `execution_id` is **not + superseded** by the stored one (`incoming >= stored`, equal allowed for + running→terminal within one execution). +- Ordering: JSONB does not compare the way we need. Store/compare via a + **comparable form** (e.g. an `int[]` of the execution numbers) so Postgres' + native lexicographic array order gives "first differing segment, larger number + wins". The JSONB can carry the human-readable path; the gate compares the + array form (column, generated column, or extracted in the `WHERE`). +- **Fire-and-forget / determinism:** the activity returns `Ok` regardless of + whether the row was updated; the orchestration must never branch on the + outcome (keep `let _ = ctx.schedule_activity(...)`). + +## Step 3 — Read path: `df.instance_nodes` infers status from `df.nodes` only + +- Rewrite [src/monitoring.rs](src/monitoring.rs) `instance_nodes` to: + 1. **Drop the duroxide dependency** — read solely from `df.nodes` (one + `SELECT` of the instance's rows). Remove `list_executions` / the fabricated + `execution_id` cross join. + 2. For each node, **leave `status` and the stored `execution_id` untouched** + and instead **add inferred fields to the returned `status_details`**: + - `inferred_status` — the *effective* state at read time (`skipped`, + `cancelled`, `pending`, or the node's own physical status when no + ancestor overrides it). + - `inferred_status_from_ancestor_id` — the node whose state *determined* + `inferred_status` (the first ancestor whose `execution_id` is not a + prefix of this node's). Makes the derivation explainable per row. + - We deliberately do NOT add `inferred_status_execution_id`: it is just the + `execution_id` of `inferred_status_from_ancestor_id`'s row, so a reader + can join to that node if needed. + - Rationale: the read path **must not compete with the write path**. By + augmenting (not overwriting) `status_details`, the helper never races the + orchestration's fenced writes. (If we later materialize this at the DB + level it would compete — defer that decision.) + 3. **Derivation (leaf→root climb):** find the first ancestor whose + `execution_id` is not a prefix of the node's; that ancestor's state sets + `inferred_status`. Ancestor `failed` ⇒ `skipped`; `running`/newer execution + ⇒ `pending`; `completed` but branch/iteration not taken ⇒ `skipped`; RACE + loser ⇒ `cancelled`. Use the IF decision and RACE winner recorded in + `status_details` (Step 6) to know which branch/iteration is "taken". + 4. Children are reachable via `left_node`/`right_node`; the tree is immutable + after `df.start()`, so no parent column is required (optional safe + denormalization only). +- Output columns: `status` and `status_details` keep their stored values; the + inferred fields live inside the returned `status_details` JSONB only, never in + the base table. + +## Step 4 — Document the `execution_id` + +- Fold the structure from [docs/execution-id.md](docs/execution-id.md) into the + permanent doc (see Step 7): ordered path `root:n₀ : seg₁:n₁ : … : segₖ:nₖ`; + segment added only at sub-orchestration boundaries (loop iteration, join/race + branch); branch numbers = 1, loop numbers = 1..N; same node across executions + shares segment identities+length, differing only in numbers ⇒ total order. + +## Step 5 — Document the state-transition graph + helper + +- Add a diagram of node states with the **physical** transitions + (`pending→running→completed|failed`) drawn solid and the **implicit, never + materialized** ones (`→skipped`, `→cancelled`, re-entry `→pending` on a new + execution) drawn dashed, with a clear note that the dashed ones exist only in + `df.instance_nodes` output. +- Document the inference helper used by `instance_nodes` (the ancestor-climb / + derivation function): inputs, the prefix rule, each case, and that it emits + `inferred_status` + `inferred_status_from_ancestor_id` into `status_details` + rather than mutating the stored `status`/`execution_id`. + +## Step 6 — RACE winner recording (small behavioral add) + +- The root→leaves / branch-taken inference needs the RACE node to record its + **winning branch** in `status_details`. Add this write in the race + orchestration if not already present. + +## Step 7 — Docs cleanup + +- Delete [docs/execution-id.md](docs/execution-id.md) and + [docs/node-state-model.md](docs/node-state-model.md); their content is folded + into the permanent doc/diagram (Steps 4–5). +- Delete this plan file `docs/exec-id-plan.md` (must not be committed on the + feature branch). + +--- + +## Open dependency you may be assuming implicitly + +**Loop boundary (prerequisite, not in your list).** Clean per-iteration +`execution_id` segments require a nested loop to run as a **sub-orchestration +rooted at the loop node**. Today the loop uses `ctx.continue_as_new` from +`graph.root_node_id` (the only `continue_as_new` caller), which both produces the +existing nested-loop "restarts wrong root" bug and prevents a nested loop from +owning its own segment. This must be addressed (or explicitly deferred with +top-level-loops-only scope) for Steps 2–3 to be correct. Track it as a separate +work item. diff --git a/docs/execution-id.md b/docs/execution-id.md new file mode 100644 index 0000000..024649a --- /dev/null +++ b/docs/execution-id.md @@ -0,0 +1,42 @@ +# Execution IDs for Node State + +## Principles + +Each node row is written by a **single writer** (the orchestration that owns it). We keep changes **minimal**, and we **never materialize** implicit states (`skipped`, `cancelled`): they are *derived* at read time. This avoids extra writes, write contention, and conflict resolution. + +## 1. `execution_id` + +An `execution_id` is the **sub-orchestration path** from the root to the orchestration that last wrote a node, with an **execution number** per segment: + +``` +root:3:loopA:4:branchC:1 +``` + +- A segment is added **only at a sub-orchestration boundary** — a loop iteration, or a join/race branch. Plain nodes (SQL, IF, sequencing, …) inherit their owning orchestration's `execution_id`; the path is therefore *coarser* than the full graph. +- Execution numbers: **1** for join/race branches; **1..N** for loop iterations (one per `continue_as_new`); the root counts root-level loop iterations. +- Two `execution_id`s that can ever write the **same** node share identical segment *identities* and length — they differ only in the **numbers**. This makes them **totally ordered**: at the first segment whose number differs, the larger number is more recent (“supersedes”). + +## 2. `status_detail` column + +Add `status_detail` to `df.nodes`. On every status write, the owning orchestration stamps the node's current `execution_id` there. + +*(Incidental cleanup, not core to this proposal: `status_detail` lets us drop the unused `error` column and stop overloading `result` for non-result data.)* + +## 3. Most-recent-execution wins + +The status update becomes a **fenced conditional write**: apply only if the incoming `execution_id` is **not superseded** by the one already stored (`incoming >= stored`). A stale writer — e.g. a race loser still draining, or a previous loop iteration — cannot clobber a newer execution's state, and the row converges to the most-recent execution regardless of arrival order. The write stays single-writer-*effective* and order-independent; the orchestration never branches on whether the write landed. + +## 4. Monitoring: it's enough for both tree walks + +With `status` + `status_detail`, plus what we already store (the IF decision in `result`, and the race winner on the race node), a single `SELECT` of an instance's nodes supports both directions: + +- **Leaf → root:** climb a node's ancestors; the first ancestor whose `execution_id` is not a prefix of the node's tells you the node's *effective* state (ancestor failed ⇒ skipped; running ⇒ pending in a new execution; completed ⇒ branch/iteration not taken ⇒ skipped). +- **Root → leaves:** descend carrying the live lineage; decisions at IF/RACE/LOOP paint whole sub-graphs at once (not-taken ⇒ skipped, race loser ⇒ cancelled, under-failure ⇒ skipped, unreached ⇒ pending). + +Intuitively: structure tells you *what could run*, and `execution_id` tells you *which execution each node's status belongs to* — together enough to color the current state of **every sub-graph**, with no materialized transitions. + +## Out of scope / dependencies + +- **Loop boundary**: requires loops to run as a sub-orchestration rooted at the loop node so iterations get a clean `execution_id` segment — see separate note. +- **Race winner**: the root→leaves walk needs the race node to record its winning branch (small add if not already present). +- **Representation & upgrade**: `execution_id` ordering can be stored in a comparable form (implementation detail); `status_detail` is a new nullable column and the fenced `UPDATE` is compatible with pre-existing rows. diff --git a/docs/node-state-model.md b/docs/node-state-model.md new file mode 100644 index 0000000..4726db6 --- /dev/null +++ b/docs/node-state-model.md @@ -0,0 +1,416 @@ +# Node & Instance State Model + +Status: **Design proposal** (consolidates issues [#240] and [#171]). + +This document defines the lifecycle states that `pg_durable` workflow **instances** +and **nodes** move through, the legal transitions between them, and the reason a +node ends in a non-executed terminal state. It exists so that operators inspecting +`df.instances` / `df.nodes` (and the `df.status()` / `df.instance_nodes` views) can +unambiguously tell *what happened* to every step of a run. + +## Proposal summary + +This is the proposed transition graph for node states (states in **bold** are +terminal): + +```mermaid +stateDiagram-v2 + [*] --> pending + pending --> running: scheduled + pending --> skipped: upstream_failed / branch_not_taken + pending --> cancelled: race_lost (never started) + running --> completed + running --> failed + running --> cancelled: race_lost (abandoned) + completed --> pending: loop boundary reset + failed --> pending: loop boundary reset + skipped --> pending: loop boundary reset + cancelled --> pending: loop boundary reset + + classDef terminal font-weight:bold; + class completed,failed,skipped,cancelled terminal +``` + +**Loop reset:** a `LOOP` body node has one row but runs once per iteration, so at each `continue_as_new` boundary the loop-body subgraph is reset to `pending`, preventing stale terminal marks from leaking into the next iteration (see [Loops and node-state reuse](#loops-and-node-state-reuse-iteration-scoping)). + +**Key open decision** — how `df.nodes` projects looped nodes: + +- **Latest-only row + loop reset** (this proposal): one mutable row per node; simplest, but no per-iteration history in SQL. +- **Per-execution rows** (`df.node_executions` keyed by `(node_id, iteration)`, à la Airflow): full per-iteration history and no reset needed, at the cost of a heavier schema. + +--- + +## Reference model + +We align with **Apache Airflow's `TaskInstance` state model**, which is the closest +well-known analog: a DAG of steps with branching, short-circuiting, and parallelism. +The decisions below also stay consistent with **Temporal** (durable-execution +semantics, since `pg_durable` runs on duroxide) at the instance level, and borrow +**BPMN 2.0's** notion of a *withdrawn* activity for race-losers. + +| Concept here | Airflow | Temporal | BPMN 2.0 | AWS Step Functions | +|---|---|---|---|---| +| Step ran OK | `success` | `Completed` | `Completed` | `SUCCEEDED` | +| Step errored | `failed` | `Failed` | `Failed` | `FAILED` | +| Not run — upstream failed | `upstream_failed` | (n/a) | `Terminated` | `ABORTED` | +| Not run — branch not taken | `skipped` | (n/a) | `Withdrawn` | (n/a) | +| Abandoned — lost a race | (cancelled) | `Canceled` | `Withdrawn` | `ABORTED` | +| Run still in flight, not reached | `scheduled`/`none` | `Scheduled` | `Ready` | `RUNNING` | + +Citing this lets us say: *"node states follow Airflow's task-instance model, with +BPMN 'withdrawn' semantics for race-losers and Temporal alignment at the instance +level."* + +The crucial lesson from Airflow: **"not executed" is more than one state.** Airflow +deliberately separates `skipped` (a branch deliberately not taken) from +`upstream_failed` (blocked by a failure). We adopt that distinction — but capture +the nuance in a **reason** rather than an ever-growing status enum (see below). + +## Design decision: coarse status + reason + +We keep a **small, stable terminal status set** and add a nullable **`status_reason`** +that explains *why* a node reached a non-obvious terminal state. + +Rationale: + +- `pg_durable` is a PostgreSQL extension with a strict schema-upgrade contract (see + [docs/upgrade-testing.md](upgrade-testing.md)). Every new status value is a + `CHECK` constraint migration plus a binary backward-compatibility concern. Adding + statuses should be rare. +- A reason column captures arbitrarily fine nuance (`upstream_failed`, + `branch_not_taken`, `race_lost`, `loop_not_entered`, …) without schema churn, and + is exactly the "note or reason" operators want when debugging. +- It keeps dashboards simple: filter on a handful of statuses, drill into the reason. + +### Node status set + +| Status | Terminal | Meaning | +|---|---|---| +| `pending` | no | Materialized by `df.start()`, not yet started. | +| `running` | no | Currently executing. | +| `completed` | yes | Finished successfully (may carry `result`). | +| `failed` | yes | Raised an error (carries error in `result`). | +| `skipped` | yes | Eligible work that was never started because the run already terminated, or a branch the run chose not to take. **(#240, new in 0.2.4)** | +| `cancelled` | yes | Work that had started (`running`) or was queued and was abandoned because its enclosing scope (a decided `RACE`) no longer needs it. **(#171, proposed)** | + +> Current state of the code: `pending`, `running`, `completed`, `failed`, and +> `skipped` exist; `skipped` was added in 0.2.4 for issue #240 (PR #249). +> `cancelled` for **nodes** is proposed here for #171 (the `df.instances` table +> already has a `cancelled` status, but `df.nodes` does not yet). + +### `status_reason` vocabulary (proposed) + +`status_reason` is `NULL` for the self-explanatory terminal states (`completed`, +`failed` carrying their own error). It is populated for `skipped` / `cancelled`: + +| Reason | Applies to | Set when | +|---|---|---| +| `upstream_failed` | `skipped` | A prior/sibling step failed and the run is terminating ([#240]). | +| `branch_not_taken` | `skipped` | An `IF`/`LOOP` condition chose another path, so this branch never runs. | +| `race_lost` | `cancelled` | This node is in the losing branch of a resolved `RACE` ([#171]). | +| `scope_cancelled` | `cancelled` | A reserved, more general form for future cancellation scopes. | + +## Instance state model + +Instance statuses are **unchanged** by this work (it is a non-goal to alter instance +vocabulary). They are: `pending`, `running`, `completed`, `failed`, `cancelled` +(see `instances_status_chk` in [src/lib.rs](../src/lib.rs)). + +```mermaid +stateDiagram-v2 + [*] --> pending + pending --> running: worker picks up + running --> completed: root node completed + running --> failed: root node / orchestration failed + running --> cancelled: instance cancelled + pending --> cancelled: cancelled before start + completed --> [*] + failed --> [*] + cancelled --> [*] +``` + +A node-level failure drives the instance to `failed`; node-level `skipped` / +`cancelled` reconciliation happens *underneath* a `failed` (or `completed`, for race +winners) instance and does not change the instance status. + +## Node state model + +```mermaid +stateDiagram-v2 + [*] --> pending + pending --> running: scheduled by orchestration + pending --> skipped: upstream_failed / branch_not_taken + pending --> cancelled: race_lost (never started) + running --> completed + running --> failed + running --> cancelled: race_lost (abandoned mid-flight) + completed --> pending: loop iteration boundary (reset) + failed --> pending: loop iteration boundary (reset) + skipped --> pending: loop iteration boundary (reset) + cancelled --> pending: loop iteration boundary (reset) + completed --> [*] + failed --> [*] + skipped --> [*] + cancelled --> [*] +``` + +Invariants: + +- `result` may only be non-NULL when status is `completed` or `failed` + (enforced today by `nodes_result_status_chk`). `skipped` and `cancelled` carry no + `result`; the explanation lives in `status_reason`. +- **Within a single loop iteration (or a non-looping run), terminal is final:** a + reconciliation pass never moves an already-terminal node to a different terminal + state. The *only* terminal → non-terminal transition is the explicit + **loop iteration-boundary reset** (terminal → `pending`), which clears the body + subgraph between iterations (see [Loops and node-state reuse](#loops-and-node-state-reuse-iteration-scoping)). + Outside a loop, that reset never fires, so terminal states are truly final. + +## Per-construct outcomes + +How each construct's children may end. "Children" = the nodes referenced by +`left_node` / `right_node` (and `extra_nodes` for `JOIN`/`RACE` of arity > 2). + +| Construct | On success | On failure of a child | Notes | +|---|---|---|---| +| `SQL` / `HTTP` / `SLEEP` / `WAIT_SCHEDULE` / `SIGNAL` | self `completed` | self `failed` | Leaf nodes; no children. | +| `THEN` (sequence) | both children `completed` | failing child `failed`; **later children `skipped` (`upstream_failed`)** | The #240 case. Right branch is never reached after a left failure. | +| `IF` | taken branch runs; **untaken branch `skipped` (`branch_not_taken`)** | condition failure ⇒ IF `failed`, **both branches `skipped` (`upstream_failed`)** | Today the untaken branch is left `pending`; it should be `skipped/branch_not_taken`. | +| `LOOP` | body runs each iteration; on exit, condition path resolved | body/condition failure ⇒ LOOP `failed`, unreached body nodes `skipped` | A loop that never enters its body leaves the body `skipped/branch_not_taken`. | +| `BREAK` | self `completed` (control-flow value) | n/a | Unwinds to enclosing loop; not a failure. | +| `JOIN` (all branches) | all branches `completed` | any branch fails ⇒ JOIN `failed`; other branches run to their natural end | Fan-out/fan-in; siblings are *not* cancelled today. | +| `RACE` (first wins) | winner `completed`; **loser branch `cancelled` (`race_lost`)** | if the *winner* errors, RACE `failed` | The #171 case. Today the loser's started nodes stay `running` and unreached nodes stay `pending`. | + +## Loops and node-state reuse (iteration scoping) + +This is the subtlest part of the model, and the one the single-status-per-row +shape gets wrong if treated naively. + +`df.start()` materializes **one** `df.nodes` row per node, with a stable `id`. A +`LOOP` re-executes the *same* subgraph (same node ids) every iteration via +`continue_as_new`. Therefore a node inside a loop body is executed **once per +iteration**, but it has only **one** status row. That row can only ever show the +*latest* iteration's outcome — it is iteration-scoped state, not run-scoped state. + +### Current behavior (the gap) + +There is no reset logic today. Within an iteration, a visited node is re-marked +`running` on entry (overwriting its prior-iteration terminal status), then +`completed`/`failed` on exit. That means: + +- Nodes that **are** re-visited every iteration self-correct (they get re-marked). +- Nodes that are **not** visited this iteration keep their **stale** status from a + previous iteration. The textbook case is a `RACE` (or `IF`) inside a `LOOP`: in + iteration 1 the left branch wins and the right (loser) nodes are marked terminal; + in iteration 2 the right branch wins — but the left branch's nodes still carry + iteration-1 statuses, and the iteration-1 loser marks were never cleared. + +So any per-iteration reconciliation (race-loser cancellation, branch-not-taken +skip) is only correct *for the current iteration* and must not leak across the +`continue_as_new` boundary. + +### Rule: reset the loop-body subgraph at each iteration boundary + +The two halves of the user-identified distinction become explicit rules: + +1. **Within one execution (same iteration):** losing-`RACE` nodes and untaken + branches transition to their terminal reconciliation state + (`cancelled/race_lost`, `skipped/branch_not_taken`) as described above. These + marks describe *this* iteration only. +2. **At the iteration boundary (`continue_as_new` fires):** every node in the loop + body subgraph is reset to `pending` (clearing `status`, `result`, and + `status_reason`) before the next iteration runs, so the new iteration starts from + a clean slate and no stale terminal status survives. + +Concretely, reset happens at the **start of each loop body execution** (covering +iteration 1 too, harmlessly, since those nodes are already `pending`), scoped to the +loop-body subtree — a recursive walk over `left_node`/`right_node` from the loop's +`left_node` (body) root. This is the same subtree-scoping primitive used for +`RACE`-loser cancellation, just applied to "the whole body" instead of "the losing +branch." + +```mermaid +sequenceDiagram + participant L as LOOP node + participant B as Body subgraph + Note over L,B: iteration N + L->>B: reset body subgraph → pending (clear result/reason) + B->>B: run body, RACE/IF reconcile losers/untaken → terminal (this iteration) + L->>L: evaluate condition + alt continue + L->>L: continue_as_new (iteration N+1) + Note over L,B: iteration N+1 — body reset again, prior marks cleared + else exit + Note over L,B: last iteration's marks are the final observable state + end +``` + +When the loop finally exits (via `df.break()` or a false condition), the **last** +iteration's node states are preserved as the final, correct observable result — +nothing resets them after the loop ends. + +### Determinism + +The reset is performed by a **scheduled activity** (a set-based `UPDATE` over the +body subtree), so the orchestration stays deterministic. Within a single execution, +replay reads the reset's result from history. Across `continue_as_new`, the next +iteration is a fresh execution whose history records the reset once. Both are +replay-safe. As with all reconciliation activities, the reset must no-op gracefully +against schemas that predate the relevant columns (`status_reason`). + +### Why not per-iteration history in `df.nodes` + +Resetting means `df.nodes` shows only the *current/last* iteration — it deliberately +does **not** retain a per-iteration audit trail. That is the right trade-off for the +operational question these tables answer ("what is the run doing / where did it +stop"). Full per-iteration history already lives in the duroxide event history; if a +first-class per-execution projection is ever wanted, model node *executions* +separately (e.g. a `df.node_executions` table keyed by `(node_id, iteration)`) and +keep `df.nodes` as the static definition + latest status. That is explicitly out of +scope here and noted as a future option. + +### Prior art: how Airflow and Temporal handle loop/iteration state + +Both reference systems avoid the stale-iteration problem the same way — by giving +each iteration its **own identity** rather than overwriting one mutable status. The +contrast is instructive because it frames the projection decision below. + +**Apache Airflow.** A DAG is *acyclic* — there are no in-graph loops. "Iteration" is +modeled by minting new identities, never by overwriting: + +- TaskInstance identity is `(dag_id, task_id, run_id, map_index)`. The repeated-run + unit is the **DagRun** (one per schedule/trigger); each gets a fresh set of + TaskInstance rows keyed by `run_id`, so re-running a task is a *different row*. +- **Dynamic Task Mapping** (`map_index`) models for-each/fan-out inside one run: + a task expands at runtime into N mapped TaskInstances, each its own row. Fan-out is + more rows, not mutated state. +- Even retries are non-destructive: a TaskInstance tracks `try_number` and recent + versions persist prior attempts in a `TaskInstanceHistory` table. +- True while-loops aren't first-class; you self-trigger a new DagRun (a new + `run_id`). + +→ Airflow's projection is *per-iteration by construction*; "loop body state for +iteration 3" is just `WHERE run_id = …` / `map_index = …`. + +**Temporal** (same durable-execution family as duroxide). Loops are ordinary code +loops; state is an **append-only Event History**, not a mutable table: + +- Each activity invocation emits distinct events (`ActivityTaskScheduled` → + `Started` → `Completed/Failed`) with unique, increasing `eventId`s. Looping five + times yields five event triples — nothing is overwritten; the activity's identity + is its sequence number. +- **`ContinueAsNew`** (the direct analog of duroxide's `continue_as_new` and the + pg_durable loop boundary) closes the current execution and starts a fresh one with + a new `RunId` and empty history, carrying state forward as input. Segments chain + via `continuedExecutionRunId`/`firstExecutionRunId`, preserving a per-segment audit + trail. +- Parallel/raced branches are **child workflows** with their own execution, history, + and terminal status. A race-loser is explicitly **`Canceled`** (a terminal event), + never a lingering `running`. + +→ Temporal's "node state" is an immutable log keyed by `eventId`; loops add events, +`ContinueAsNew` segments runs by `RunId`, branch state is a child's terminal status. + +**Implication for pg_durable.** The difference is that `df.nodes.status` is a single +**mutable** row updated in place — a shape neither system uses. Crucially, pg_durable +*already has* the Temporal-style answer underneath: duroxide keeps an append-only +event history and segments loops via `continue_as_new`. `df.nodes` is just a +denormalized SQL **projection** on top of that log for operator queries. So the real +fork is the projection shape, not the state tracking: + +| Projection option | Analog | Buys | Cost | +|---|---|---|---| +| **Latest-only row + reset on `continue_as_new`** (this proposal) | neither — pragmatic simplification | Simple "what is the run doing now" view | No per-iteration history in SQL (still in duroxide log) | +| **Per-execution rows** — `df.node_executions` keyed by `(node_id, iteration)` | Airflow's `(task_id, run_id, map_index)` | Full per-iteration history in SQL; *eliminates* the stale-state bug class | New table, more writes, larger schema change | +| **Expose duroxide history directly** | Temporal event history | No duplication; single source of truth | Not relational/queryable like `df.nodes` | + +The proposal takes the **pragmatic middle**: keep the single-row projection, accept +"latest iteration only," and delegate per-iteration history to duroxide's log. The +**per-execution-rows** direction is the more future-proof, Airflow-style design — it +makes the iteration-boundary reset *unnecessary* (nothing is overwritten, so nothing +goes stale) and expresses "race-loser in iteration 2" as just another execution row — +at the cost of a heavier schema. This is the key long-term design fork to decide +before committing to the reset mechanism. + +## How this resolves the two issues + +### #240 — downstream steps after a failure + +When a run terminates with a node-level failure, remaining un-started nodes should +become `skipped` with reason `upstream_failed`. The `skipped` status (#240, shipped +in PR #249) already implements the set-based sweep (`pending → skipped` guarded by +"an instance node is `failed`"). Under this model that sweep should additionally +stamp `status_reason = 'upstream_failed'`. + +Open refinement: the coarse sweep also catches `IF`/`LOOP` branches that were +*deliberately* not taken. With the reason column those should ideally be classified +`branch_not_taken` at the point the branch decision is made (in `execute_if_node` / +`execute_loop_node`), rather than lumped as `upstream_failed` by the terminal sweep. + +### #171 — race-loser nodes left running/pending + +`RACE` runs each branch as a **sub-orchestration** (`schedule_sub_orchestration`) and +`select2` keeps only the winner. The loser's sub-orchestration nodes are never moved +to a terminal state. The fix is a **reconciliation at race resolution**: from the +losing branch's root node, mark every descendant still in `pending`/`running` as +`cancelled` with reason `race_lost`. This is a distinct code path from #240 because: + +- it triggers on race resolution, not on terminal instance failure; +- it must handle `running → cancelled`, not just `pending → skipped`; +- it needs subtree scoping (a recursive walk over `left_node`/`right_node` from the + losing root), not an instance-wide sweep. + +The same subtree-cancellation primitive generalizes to any future explicit +cancellation scope (`scope_cancelled`). + +When the `RACE` is **inside a `LOOP`**, the loser marks apply only to the current +iteration and are wiped by the iteration-boundary reset before the next iteration +re-runs the race (see [Loops and node-state reuse](#loops-and-node-state-reuse-iteration-scoping)). +Only the final iteration's loser marks survive as the observable result. + +## Schema & implementation implications + +- **Status set:** `skipped` is already added (done in PR #249 for #240); add + `cancelled` to `nodes_status_chk` in both the install DDL + ([src/lib.rs](../src/lib.rs)) and a new upgrade script, per the upgrade contract. +- **Reason column:** add `status_reason TEXT` to `df.nodes` (nullable, optionally a + `CHECK` against the reason vocabulary). New column ⇒ install DDL + upgrade DDL; + binary backward-compat is straightforward (new `.so` only writes it when the + column exists, mirroring the existing schema-probe pattern in + `mark_pending_nodes_skipped`). +- **Reconciliation points:** + - terminal failure sweep → `skipped/upstream_failed` (#240), + - `IF`/`LOOP` branch decision → `skipped/branch_not_taken`, + - `RACE` resolution → subtree `cancelled/race_lost` (#171). +- **Backward compatibility:** every reconciliation must no-op gracefully on schemas + that predate the relevant status/column (probe first, as the current activity + already does for `skipped`). + +## Backward-compatibility note (history determinism) + +Reconciliation work is performed by scheduling **activities** from the orchestration, +which keeps orchestration code deterministic. Adding such a scheduled activity to an +existing failure/resolution path changes the recorded history for any orchestration +that crosses that path *during* a binary upgrade — a narrow window, but worth calling +out when each reconciliation is introduced. + +## Open questions + +1. Should a `RACE` loser that had already `completed` a node keep that node + `completed`, or also be rolled to `cancelled`? (Proposed: keep terminal nodes as-is; + only `pending`/`running` loser nodes become `cancelled`.) +2. Do we expose `status_reason` in `df.status()` / `df.instance_nodes` output and the + `USER_GUIDE.md` legend? (Proposed: yes, as an optional column + a legend row.) +3. Should `JOIN` cancel still-running siblings when one branch fails, or let them + finish? (Today they finish; Step Functions would abort them. Out of scope here but + the `cancelled/scope_cancelled` primitive would cover it.) +4. Loop-body reset granularity: reset the **entire** body subgraph at each iteration + start (proposed — simplest and always-correct), or reset lazily/only the nodes + that won't be re-visited? The eager whole-subtree reset is preferred because the + set of re-visited nodes is not known ahead of a `RACE`/`IF` decision. +5. Do we want a first-class per-iteration projection (`df.node_executions`) for audit, + or is "latest iteration in `df.nodes` + duroxide history" sufficient? (Proposed: + the latter, for now.) + +[#240]: https://github.com/microsoft/pg_durable/issues/240 +[#171]: https://github.com/microsoft/pg_durable/issues/171 diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index fd17722..ca7c687 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -526,32 +526,109 @@ async fn execute_wait_schedule_node( /// deficit so an empty-bodied loop can't busy-spin via continue_as_new. const LOOP_MIN_ITER_DURATION: Duration = Duration::from_secs(1); -async fn execute_loop_node( - ctx: &OrchestrationContext, - graph: &FunctionGraph, - node: &FunctionNode, - node_id: &str, - results: &mut HashMap, - exec_ctx: &ExecutionContext, -) -> NodeResult { +/// Orchestration name for the loop sub-orchestration. +/// +/// Each `df.loop()` node spawns a child orchestration under this name. The +/// child handles all iterations via `continue_as_new`; when the loop exits it +/// returns a `SubtreeEnvelope` to the parent. The parent link is preserved +/// across `continue_as_new` generations by duroxide (see duroxide PR #31), so +/// the parent orchestration is notified when the loop finally completes. +pub const LOOP_NAME: &str = "pg_durable::orchestration::execute-loop"; + +/// Build the `SubtreeEnvelope` a loop returns to its parent on exit. +/// +/// A loop always exits with a *normal* result: a `df.break()` inside the body is the loop's +/// own terminator (caught here as `NodeError::Break`), not a break that should unwind past +/// the loop, so the envelope is always tagged `Normal`. `execute_loop_node` merges `results` +/// back into the parent map via `parse_subtree_envelope`. +fn loop_exit_envelope(result: String, results: HashMap) -> Result { + let envelope = SubtreeEnvelope { + control: Some(SubtreeControl::Normal), + result, + results, + }; + serde_json::to_string(&envelope).map_err(|e| format!("Failed to serialize loop envelope: {e}")) +} + +/// Sub-orchestration that runs a single loop iteration and either returns or +/// calls `continue_as_new` for the next iteration. +/// +/// Input JSON: +/// ```json +/// { "instance_id": "...", "loop_node_id": "...", +/// "results": "", "vars": "", "label": "..." } +/// ``` +/// +/// `load_function_graph` is called at the start of **every** generation +/// (including after `continue_as_new`) so that cross-iteration security +/// tampering is caught and the instance is failed — the same guarantee the +/// main `execute()` orchestration provides at its generation boundary. +/// +/// On loop exit the function returns a `SubtreeEnvelope` containing the final +/// result and any named results accumulated during the loop. +pub async fn execute_loop(ctx: OrchestrationContext, input_json: String) -> Result { + let input: serde_json::Value = serde_json::from_str(&input_json) + .map_err(|e| format!("Failed to parse ExecuteLoop input: {e}"))?; + + let instance_id = input["instance_id"] + .as_str() + .ok_or("Missing instance_id in ExecuteLoop input")? + .to_string(); + let loop_node_id = input["loop_node_id"] + .as_str() + .ok_or("Missing loop_node_id in ExecuteLoop input")? + .to_string(); + let results_json = input["results"] + .as_str() + .ok_or("Missing results in ExecuteLoop input")?; + + // re-load the graph from the database on every generation — this re-validates + // submitted_by and catches cross-iteration security tampering. + let graph_json = ctx + .schedule_activity(activities::load_function_graph::NAME, instance_id.clone()) + .await?; + let graph: FunctionGraph = serde_json::from_str(&graph_json) + .map_err(|e| format!("Failed to parse graph in ExecuteLoop: {e}"))?; + + let mut results: HashMap = serde_json::from_str(results_json) + .map_err(|e| format!("Failed to parse results in ExecuteLoop: {e}"))?; + + let vars: HashMap = if let Some(vars_str) = input["vars"].as_str() { + serde_json::from_str(vars_str) + .map_err(|e| format!("Failed to parse vars in ExecuteLoop: {e}"))? + } else { + HashMap::new() + }; + let label: Option = input["label"].as_str().map(|s| s.to_string()); + + let exec_ctx = ExecutionContext { vars, label }; + + let node = graph + .nodes + .get(&loop_node_id) + .ok_or_else(|| format!("Loop node not found: {loop_node_id}"))?; + let body_id = node .left_node .as_ref() - .ok_or_else(|| format!("LOOP node {node_id} has no body"))?; + .ok_or_else(|| format!("LOOP node {loop_node_id} has no body"))? + .clone(); - // Capture the iteration start time so we can rate-limit `continue_as_new` - // below. `utc_now()` is duroxide's deterministic clock (recorded in - // history and replayed verbatim), so this remains replay-safe. + // Capture iteration start time for rate-limiting continue_as_new. let iter_started = ctx.utc_now().await.ok(); ctx.trace_info("Executing loop iteration"); - // The loop is the only place that catches `NodeError::Break`: a break unwinds through - // every compound node in the body via `?` and is converted here into a normal loop exit. - // A `Failure` still propagates out of the loop unchanged. - let body_result = match Box::pin(execute_function_node_with_vars( - ctx, graph, body_id, results, exec_ctx, - )) + // The loop is where `NodeError::Break` is caught: a break unwinds through the body via + // `?` and is converted here into the loop's normal exit value. A `Failure` propagates + // out of the sub-orchestration unchanged. + let body_result = match execute_function_node_with_vars( + &ctx, + &graph, + &body_id, + &mut results, + &exec_ctx, + ) .await { Ok(v) => v, @@ -559,27 +636,34 @@ async fn execute_loop_node( ctx.trace_info(format!( "Loop terminated by break with value: {break_value}" )); - store_named_result(ctx, node, &break_value, results, "LOOP"); - return Ok(break_value); + store_named_result(&ctx, node, &break_value, &mut results, "LOOP"); + return loop_exit_envelope(break_value, results); } - Err(e @ NodeError::Failure(_)) => return Err(e), + Err(NodeError::Failure(e)) => return Err(e), }; - // Check while-condition if present + // While-condition: if present and false, exit the loop. if let Some(ref config_str) = node.query { if let Ok(config) = serde_json::from_str::(config_str) { if let Some(condition_node_id) = config["condition_node"].as_str() { ctx.trace_info("Evaluating loop condition"); - let condition_result = Box::pin(execute_function_node_with_vars( - ctx, - graph, + let condition_result = match execute_function_node_with_vars( + &ctx, + &graph, condition_node_id, - results, - exec_ctx, - )) - .await?; + &mut results, + &exec_ctx, + ) + .await + { + Ok(v) => v, + Err(NodeError::Break(break_value)) => { + store_named_result(&ctx, node, &break_value, &mut results, "LOOP"); + return loop_exit_envelope(break_value, results); + } + Err(NodeError::Failure(e)) => return Err(e), + }; - // Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result) let should_continue = evaluate_condition(&condition_result).unwrap_or(false); ctx.trace_info(format!( "Loop condition evaluated to: {condition_result} (continue={should_continue})" @@ -587,20 +671,14 @@ async fn execute_loop_node( if !should_continue { ctx.trace_info("Loop condition false, exiting loop"); - store_named_result(ctx, node, &body_result, results, "LOOP"); - return Ok(body_result); + store_named_result(&ctx, node, &body_result, &mut results, "LOOP"); + return loop_exit_envelope(body_result, results); } } } } - ctx.trace_info("Continuing as new for next loop iteration"); - - // Enforce a minimum per-iteration wall-clock duration to prevent - // busy-looping (e.g. `df.loop(df.sleep(0))`). Compute the elapsed time - // from the deterministic clock; if the iteration finished faster than - // LOOP_MIN_ITER_DURATION, schedule a timer for the deficit so the next - // continue_as_new is gated by at least that much real-clock time. + // Enforce a minimum per-iteration wall-clock duration to prevent busy-looping. if let Some(started) = iter_started { if let Ok(now) = ctx.utc_now().await { let elapsed = now.duration_since(started).unwrap_or(Duration::ZERO); @@ -615,19 +693,83 @@ async fn execute_loop_node( } } - // Preserve vars in continue_as_new input - let new_input = FunctionInput { - instance_id: graph.instance_id.clone(), - label: exec_ctx.label.clone(), - vars: exec_ctx.vars.clone(), - }; + // Another iteration needed: continue_as_new within this sub-orchestration. + // The parent orchestration keeps its awaiting handle because duroxide preserves + // the parent link across continue_as_new (duroxide PR #31). + ctx.trace_info(format!( + "Loop continuing with continue_as_new at node {loop_node_id}" + )); + let new_results_json = serde_json::to_string(&results) + .map_err(|e| format!("Failed to serialize updated results: {e}"))?; + let mut new_input = input.clone(); + new_input["results"] = serde_json::Value::String(new_results_json); + let new_input_json = serde_json::to_string(&new_input) + .map_err(|e| format!("Failed to serialize loop input: {e}"))?; + ctx.continue_as_new(new_input_json) + .await + .map(|_| String::new()) + .map_err(|e| format!("continue_as_new failed: {e:?}")) +} + +/// Build a deterministic, generation-qualified child instance ID for a sub-orchestration. +/// +/// Duroxide's auto-generated child IDs (`{parent}::sub::{event_id}`) reset their event +/// counter across `continue_as_new` generations. When a loop body itself spawns +/// sub-orchestrations (a nested `df.loop`, or a parallel/race branch), each loop +/// generation would otherwise re-derive the *same* child ID, colliding with the previous +/// (now terminal) generation's child and stalling forever. Embedding the current +/// execution (generation) ID plus the spawning node ID makes the child ID unique per +/// generation while staying deterministic across replays of the same generation. +fn child_instance_id(ctx: &OrchestrationContext, tag: &str, node_id: &str) -> String { + format!( + "{}::e{}::{tag}::{node_id}", + ctx.instance_id(), + ctx.execution_id() + ) +} + +async fn execute_loop_node( + ctx: &OrchestrationContext, + graph: &FunctionGraph, + node: &FunctionNode, + node_id: &str, + results: &mut HashMap, + exec_ctx: &ExecutionContext, +) -> NodeResult { + // Validate that the loop has a body before spawning the sub-orchestration. + node.left_node + .as_ref() + .ok_or_else(|| format!("LOOP node {node_id} has no body"))?; + + let results_json = + serde_json::to_string(results).map_err(|e| format!("Failed to serialize results: {e}"))?; + let vars_json = serde_json::to_string(&exec_ctx.vars) + .map_err(|e| format!("Failed to serialize vars: {e}"))?; + + let loop_input = serde_json::json!({ + "instance_id": graph.instance_id, + "loop_node_id": node_id, + "results": results_json, + "vars": vars_json, + "label": exec_ctx.label, + }) + .to_string(); - // duroxide 0.1.1: continue_as_new returns an awaitable future - return it directly - return ctx - .continue_as_new(serde_json::to_string(&new_input).unwrap_or(graph.instance_id.clone())) + ctx.trace_info(format!( + "Spawning loop sub-orchestration for node {node_id}" + )); + + let child_id = child_instance_id(ctx, "loop", node_id); + let raw = ctx + .schedule_sub_orchestration_with_id(LOOP_NAME, child_id, loop_input) .await - .map(|_| body_result) - .map_err(|e| NodeError::Failure(format!("continue_as_new failed: {e:?}"))); + .map_err(|e| format!("Loop sub-orchestration failed: {e}"))?; + + // Merge named results from the loop sub-orchestration back into the parent map and + // return the loop's final result. The loop always returns a `Normal` envelope (a break + // inside the body is the loop's own terminator), so `parse_subtree_envelope` will not + // re-raise a `NodeError::Break` here. + parse_subtree_envelope(&raw, "LOOP", results) } async fn execute_break_node( @@ -853,8 +995,11 @@ async fn execute_join_node( }) .to_string(); - // Build list of branch inputs - let mut branch_inputs = vec![left_input, right_input]; + // Build list of (branch node id, branch input) pairs. + let mut branch_inputs: Vec<(String, String)> = vec![ + (left_id.clone(), left_input), + (right_id.clone(), right_input), + ]; // Check for extra nodes (join3) if let Some(config_str) = &node.query { @@ -870,17 +1015,20 @@ async fn execute_join_node( "label": exec_ctx.label }) .to_string(); - branch_inputs.push(extra_input); + branch_inputs.push((extra_id.to_string(), extra_input)); } } } } } - // Schedule sub-orchestrations and collect DurableFutures + // Schedule sub-orchestrations and collect DurableFutures. Use explicit, + // generation-qualified child IDs so a JOIN inside a loop body does not collide + // across continue_as_new generations. let mut durable_futures = Vec::new(); - for input in branch_inputs { - let fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, input); + for (branch_node_id, input) in branch_inputs { + let child_id = child_instance_id(ctx, "subtree", &branch_node_id); + let fut = ctx.schedule_sub_orchestration_with_id(SUBTREE_NAME, child_id, input); durable_futures.push(fut); } @@ -972,9 +1120,18 @@ async fn execute_race_node( }) .to_string(); - // Schedule sub-orchestrations - let left_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, left_input); - let right_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, right_input); + // Schedule sub-orchestrations with explicit, generation-qualified child IDs so a + // RACE inside a loop body does not collide across continue_as_new generations. + let left_fut = ctx.schedule_sub_orchestration_with_id( + SUBTREE_NAME, + child_instance_id(ctx, "subtree", left_id), + left_input, + ); + let right_fut = ctx.schedule_sub_orchestration_with_id( + SUBTREE_NAME, + child_instance_id(ctx, "subtree", right_id), + right_input, + ); // Use ctx.select2() - first to complete wins // select2 now returns Either2 instead of (winner_idx, DurableOutput) diff --git a/src/registry.rs b/src/registry.rs index 03e5c73..f20d128 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -55,5 +55,9 @@ pub fn create_orchestration_registry() -> OrchestrationRegistry { orchestrations::execute_function_graph::SUBTREE_NAME, orchestrations::execute_function_graph::execute_subtree, ) + .register( + orchestrations::execute_function_graph::LOOP_NAME, + orchestrations::execute_function_graph::execute_loop, + ) .build() } diff --git a/tests/e2e/sql/24_nonroot_loop.sql b/tests/e2e/sql/24_nonroot_loop.sql new file mode 100644 index 0000000..4aca198 --- /dev/null +++ b/tests/e2e/sql/24_nonroot_loop.sql @@ -0,0 +1,409 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Regression tests for: df.loop continue_as_new restarts from root when loop is not root +-- (https://github.com/microsoft/pg_durable/issues/227) +-- +-- When a loop is not the root node of a function graph (i.e., there are prefix or suffix +-- nodes), continue_as_new must NOT re-execute the prefix or skip the suffix. Loops are +-- now executed as a scoped sub-orchestration so continue_as_new is scoped to the loop +-- child, leaving the parent graph orchestration parked. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Test 1: Non-root loop — prefix runs once, body runs N times === +-- +-- Graph: INSERT into prefix_table ~> df.loop(INSERT into body_table ~> break after 3) +-- Expected: prefix_table has exactly 1 row after completion, body_table has exactly 3 rows. + +DROP TABLE IF EXISTS test_nonroot_prefix; +DROP TABLE IF EXISTS test_nonroot_body; +CREATE TABLE test_nonroot_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t1 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_nonroot_prefix DEFAULT VALUES', + df.loop( + 'INSERT INTO test_nonroot_body DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 3 FROM test_nonroot_body' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-nonroot-loop-prefix' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; +BEGIN + SELECT instance_id INTO v_id FROM _t1; + RAISE NOTICE 'Test 1 - non-root loop prefix: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_nonroot_prefix; + SELECT COUNT(*) INTO v_body FROM test_nonroot_body; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: prefix ran % time(s) (expected 1); ' + 'prefix nodes must not be re-executed on each loop iteration', v_prefix; + END IF; + + IF v_body != 3 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: body ran % time(s) (expected 3)', v_body; + END IF; + + RAISE NOTICE 'PASSED: non-root loop prefix — prefix ran once, body ran 3 times'; +END $$; + +DROP TABLE _t1; +DROP TABLE test_nonroot_prefix; +DROP TABLE test_nonroot_body; + +-- === Test 2: Non-root loop — prefix once, body N times, suffix once === +-- +-- Graph: INSERT prefix ~> df.loop(body, break after 2) ~> INSERT suffix +-- Expected: prefix_table = 1 row, body_table = 2 rows, suffix_table = 1 row. + +DROP TABLE IF EXISTS test_nonroot2_prefix; +DROP TABLE IF EXISTS test_nonroot2_body; +DROP TABLE IF EXISTS test_nonroot2_suffix; +CREATE TABLE test_nonroot2_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot2_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot2_suffix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t2 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_nonroot2_prefix DEFAULT VALUES', + df.seq( + df.loop( + 'INSERT INTO test_nonroot2_body DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nonroot2_body' + ?> df.break() + !> df.sleep(1) + ) + ), + 'INSERT INTO test_nonroot2_suffix DEFAULT VALUES' + ) + ), + 'test-nonroot-loop-prefix-suffix' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; + v_suffix INT; +BEGIN + SELECT instance_id INTO v_id FROM _t2; + RAISE NOTICE 'Test 2 - non-root loop prefix+suffix: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_nonroot2_prefix; + SELECT COUNT(*) INTO v_body FROM test_nonroot2_body; + SELECT COUNT(*) INTO v_suffix FROM test_nonroot2_suffix; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_body != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: body ran % time(s) (expected 2)', v_body; + END IF; + + IF v_suffix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: suffix ran % time(s) (expected 1)', v_suffix; + END IF; + + RAISE NOTICE 'PASSED: non-root loop prefix+suffix — prefix once, body twice, suffix once'; +END $$; + +DROP TABLE _t2; +DROP TABLE test_nonroot2_prefix; +DROP TABLE test_nonroot2_body; +DROP TABLE test_nonroot2_suffix; + +-- === Test 3: Non-root loop — named result from prefix available inside loop body === +-- +-- Verify that named results accumulated before the loop are still accessible +-- inside the loop body after continue_as_new (they are preserved in the loop +-- sub-orchestration's input). + +DROP TABLE IF EXISTS test_nonroot3_log; +CREATE TABLE test_nonroot3_log (id SERIAL, val TEXT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t3 AS +SELECT df.start( + df.seq( + ('SELECT ''hello'' AS greeting' |=> 'prefix_result'), + df.loop( + ($$INSERT INTO test_nonroot3_log (val) + VALUES ($prefix_result) + RETURNING val$$ + |=> 'last_val') + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nonroot3_log' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-nonroot-loop-named-result' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_cnt INT; + v_val TEXT; +BEGIN + SELECT instance_id INTO v_id FROM _t3; + RAISE NOTICE 'Test 3 - non-root loop uses prefix named result: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_cnt FROM test_nonroot3_log; + IF v_cnt != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected 2 rows, got %', v_cnt; + END IF; + + SELECT val INTO v_val FROM test_nonroot3_log ORDER BY id LIMIT 1; + IF v_val != 'hello' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected ''hello'', got ''%''', v_val; + END IF; + + RAISE NOTICE 'PASSED: non-root loop uses prefix named result across iterations'; +END $$; + +DROP TABLE _t3; +DROP TABLE test_nonroot3_log; + +-- === Test 4: Nested loop — a loop body that itself contains a loop === +-- +-- Each df.loop() spawns an execute_loop sub-orchestration, so a nested loop spawns a +-- child execute_loop from *within* another execute_loop generation. This verifies that +-- continue_as_new in the outer loop does not disturb the inner loop and vice versa. +-- +-- Outer loop runs 2 iterations (break when outer_marker has 2 rows); each outer iteration +-- runs an inner loop that inserts exactly one row and breaks immediately. +-- Expected: outer_marker = 2 rows, inner_table = 2 rows. + +DROP TABLE IF EXISTS test_nested_outer; +DROP TABLE IF EXISTS test_nested_inner; +CREATE TABLE test_nested_outer (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nested_inner (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t4 AS +SELECT df.start( + df.loop( + 'INSERT INTO test_nested_outer DEFAULT VALUES' + ~> df.loop( + 'INSERT INTO test_nested_inner DEFAULT VALUES' + ~> df.break() + ) + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nested_outer' + ?> df.break() + !> df.sleep(1) + ) + ), + 'test-nested-loop' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_outer INT; + v_inner INT; +BEGIN + SELECT instance_id INTO v_id FROM _t4; + RAISE NOTICE 'Test 4 - nested loop: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nested]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_outer FROM test_nested_outer; + SELECT COUNT(*) INTO v_inner FROM test_nested_inner; + + IF v_outer != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nested]: outer ran % time(s) (expected 2)', v_outer; + END IF; + + IF v_inner != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nested]: inner ran % time(s) (expected 2)', v_inner; + END IF; + + RAISE NOTICE 'PASSED: nested loop — outer ran twice, inner ran once per outer iteration'; +END $$; + +DROP TABLE _t4; +DROP TABLE test_nested_outer; +DROP TABLE test_nested_inner; + +-- === Test 5: Loop inside a JOIN branch === +-- +-- JOIN branches execute as execute_subtree sub-orchestrations, so a loop in a branch +-- spawns an execute_loop child from *within* execute_subtree. This verifies the loop +-- sub-orchestration nests correctly under a parallel branch and that the JOIN still +-- completes once both branches finish. +-- +-- Left branch inserts one row; right branch loops until join_loop has 2 rows. +-- Expected: join_left = 1 row, join_loop = 2 rows. + +DROP TABLE IF EXISTS test_join_left; +DROP TABLE IF EXISTS test_join_loop; +CREATE TABLE test_join_left (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_join_loop (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t5 AS +SELECT df.start( + 'INSERT INTO test_join_left DEFAULT VALUES' + & df.loop( + 'INSERT INTO test_join_loop DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_join_loop' + ?> df.break() + !> df.sleep(1) + ) + ), + 'test-loop-in-join-branch' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_left INT; + v_loop INT; +BEGIN + SELECT instance_id INTO v_id FROM _t5; + RAISE NOTICE 'Test 5 - loop in JOIN branch: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [loop-in-join]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_left FROM test_join_left; + SELECT COUNT(*) INTO v_loop FROM test_join_loop; + + IF v_left != 1 THEN + RAISE EXCEPTION 'TEST FAILED [loop-in-join]: left branch ran % time(s) (expected 1)', v_left; + END IF; + + IF v_loop != 2 THEN + RAISE EXCEPTION 'TEST FAILED [loop-in-join]: loop branch body ran % time(s) (expected 2)', v_loop; + END IF; + + RAISE NOTICE 'PASSED: loop in JOIN branch — left ran once, loop body ran twice'; +END $$; + +DROP TABLE _t5; +DROP TABLE test_join_left; +DROP TABLE test_join_loop; + +-- === Test 6: Non-root while-loop — prefix once, while-condition exit, suffix once === +-- +-- The earlier tests exit via df.break(); this one exits via a false while-condition +-- (df.loop(body, condition)). The condition node also runs inside the loop +-- sub-orchestration, so this exercises the while-false exit path across generations. +-- +-- Graph: INSERT prefix ~> df.loop(body, 'COUNT < 3') ~> INSERT suffix +-- Expected: prefix = 1 row, body = 3 rows (loop stops when count reaches 3), suffix = 1 row. + +DROP TABLE IF EXISTS test_while_prefix; +DROP TABLE IF EXISTS test_while_body; +DROP TABLE IF EXISTS test_while_suffix; +CREATE TABLE test_while_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_while_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_while_suffix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t6 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_while_prefix DEFAULT VALUES', + df.seq( + df.loop( + 'INSERT INTO test_while_body DEFAULT VALUES' ~> df.sleep(1), + 'SELECT COUNT(*) < 3 FROM test_while_body' + ), + 'INSERT INTO test_while_suffix DEFAULT VALUES' + ) + ), + 'test-nonroot-while-loop' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; + v_suffix INT; +BEGIN + SELECT instance_id INTO v_id FROM _t6; + RAISE NOTICE 'Test 6 - non-root while loop: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_while_prefix; + SELECT COUNT(*) INTO v_body FROM test_while_body; + SELECT COUNT(*) INTO v_suffix FROM test_while_suffix; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_body != 3 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: body ran % time(s) (expected 3)', v_body; + END IF; + + IF v_suffix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: suffix ran % time(s) (expected 1)', v_suffix; + END IF; + + RAISE NOTICE 'PASSED: non-root while loop — prefix once, body 3x via while-condition, suffix once'; +END $$; + +DROP TABLE _t6; +DROP TABLE test_while_prefix; +DROP TABLE test_while_body; +DROP TABLE test_while_suffix; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED' AS result;