Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/spec-security-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ SELECT df.start(

**Threat**: User queries `df.instances` or `df.nodes` to see other users' durable functions.

**Mitigation (implemented)**: RLS policies on `df.instances` and `df.nodes` enforce per-user visibility using `submitted_by = current_user::regrole`. Auto-grants provide SELECT+INSERT on both tables and column-level `UPDATE (status, updated_at)` on instances (no DELETE). Ownership checks in `df.cancel()` and `df.signal()` prevent cross-user operations via the duroxide client. Monitoring functions (`df.list_instances()`, `df.instance_info()`, etc.) also enforce ownership.
**Mitigation (implemented)**: RLS policies on `df.instances` and `df.nodes` enforce per-user visibility using `submitted_by = current_user::regrole`. Auto-grants provide SELECT+INSERT on both tables and column-level `UPDATE` on runtime status columns for instances (no DELETE). Ownership checks in `df.cancel()` and `df.signal()` prevent cross-user operations via the duroxide client. Monitoring functions (`df.list_instances()`, `df.instance_info()`, etc.) also enforce ownership.

See [rls.md](rls.md) for the full design, policy definitions, grant strategy, and decisions.

Expand Down
7 changes: 7 additions & 0 deletions docs/upgrade-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ what the upgrade script handles, and any backward compatibility considerations.

### v0.2.3 → v0.2.4

#### #239 Expose signal waits on `df.instances`
- **DDL change (df schema):** Adds nullable `df.instances.blocked_on_signal TEXT`. Fresh installs (`src/lib.rs`) create it directly; the upgrade script adds the column and its comment. Existing rows need no backfill: non-waiting instances remain `NULL`, and future SIGNAL node execution sets the marker when the node enters `running`.
- **Runtime behavior:** The `update-node-status` activity recomputes `blocked_on_signal` from the instance's currently running SIGNAL nodes whenever a SIGNAL node changes status. `update-instance-status` clears the column on terminal transitions (`completed`, `failed`, `cancelled`), and `df.cancel()` clears it with the cancellation status update.
- **Privilege behavior:** `df.grant_usage()` now grants `UPDATE (status, updated_at, blocked_on_signal)` on `df.instances`; `df.revoke_usage()` revokes the same column set. During upgrade, roles that already had column-level UPDATE on `status` or `updated_at` receive UPDATE on `blocked_on_signal`, preserving `WITH GRANT OPTION` only when all existing relevant runtime UPDATE grants were grantable.
- **Scenario B1 considerations:** New Rust code probes `pg_catalog.pg_attribute` before referencing `blocked_on_signal`, so the new `.so` remains valid against pre-upgrade schemas that do not yet have the column. `df.cancel()` similarly falls back to the old two-column UPDATE until the column exists.
- **Scenario B2 considerations:** The migration is additive and nullable. Existing instances, nodes, and vars are untouched; active SIGNAL nodes set the marker on their next node-status transition after the new schema is present.

#### Simplify `df.grant_usage()` — drop the explicit function allowlist
- **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, and conditionally grants `df.http()` / the admin helpers. The signature `df.grant_usage(text, boolean, boolean)` is unchanged.
- **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`), and the table privileges. The signature `df.revoke_usage(text)` is unchanged.
Expand Down
64 changes: 62 additions & 2 deletions sql/pg_durable--0.2.3--0.2.4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,66 @@
-- See docs/upgrade-testing.md for the upgrade-script and backward-compatibility
-- requirements (Scenario A / B1 / B2).

-- ============================================================================
-- Expose signal waits at the instance level (issue #239).
--
-- `df.instances.blocked_on_signal` is set to the signal name while the instance
-- is parked on a SIGNAL node, and cleared when no SIGNAL wait remains or the
-- instance reaches a terminal state. Backfill is unnecessary: existing terminal
-- rows remain NULL, and newly executing SIGNAL nodes set the column when their
-- node status transitions to `running`.
--
-- Preserve existing delegated permissions by granting UPDATE on the new column
-- to every role that already had UPDATE on df.instances.status or updated_at.
-- ============================================================================
ALTER TABLE df.instances ADD COLUMN blocked_on_signal TEXT;

COMMENT ON COLUMN df.instances.blocked_on_signal IS
'Signal name while the instance is parked on a SIGNAL node; NULL otherwise';

DO $do$
DECLARE
r RECORD;
BEGIN
FOR r IN
SELECT role_ident, pg_catalog.bool_and(is_grantable) AS all_grantable
FROM (
SELECT
CASE
WHEN acl.grantee OPERATOR(pg_catalog.=) 0::oid THEN 'PUBLIC'
ELSE grantee_role.rolname
END AS role_ident,
acl.is_grantable
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace
JOIN pg_catalog.pg_attribute a ON a.attrelid OPERATOR(pg_catalog.=) c.oid
CROSS JOIN LATERAL pg_catalog.aclexplode(a.attacl) acl
LEFT JOIN pg_catalog.pg_roles grantee_role ON grantee_role.oid OPERATOR(pg_catalog.=) acl.grantee
WHERE n.nspname OPERATOR(pg_catalog.=) 'df'
AND c.relname OPERATOR(pg_catalog.=) 'instances'
AND a.attname OPERATOR(pg_catalog.=) ANY (ARRAY['status', 'updated_at'])
AND acl.privilege_type OPERATOR(pg_catalog.=) 'UPDATE'
) grants
WHERE role_ident IS NOT NULL
GROUP BY role_ident
LOOP
IF r.role_ident OPERATOR(pg_catalog.=) 'PUBLIC' THEN
EXECUTE 'GRANT UPDATE (blocked_on_signal) ON df.instances TO PUBLIC';
ELSIF r.all_grantable THEN
EXECUTE pg_catalog.format(
'GRANT UPDATE (blocked_on_signal) ON df.instances TO %I WITH GRANT OPTION',
r.role_ident
);
ELSE
EXECUTE pg_catalog.format(
'GRANT UPDATE (blocked_on_signal) ON df.instances TO %I',
r.role_ident
);
END IF;
END LOOP;
END;
$do$;

-- ============================================================================
-- Remove df.debug_connection() (issue #110, reclassified non-security cleanup).
--
Expand Down Expand Up @@ -87,7 +147,7 @@ BEGIN

-- Table privileges
EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt;
EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt;
EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at, blocked_on_signal) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt;
EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt;
EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt;
EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt;
Expand Down Expand Up @@ -155,7 +215,7 @@ BEGIN
EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role);
EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role);
EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role);
EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role);
EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at, blocked_on_signal) ON df.instances FROM %I CASCADE', p_role);
EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role);

-- Schema access — the access gate for all ordinary df.* functions.
Expand Down
43 changes: 42 additions & 1 deletion src/activities/update_instance_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ use std::sync::Arc;
/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::update-instance-status";

/// New binaries can briefly run against an old extension schema before
/// ALTER EXTENSION UPDATE adds this column.
pub async fn instances_have_blocked_on_signal(pool: &PgPool) -> Result<bool, String> {
sqlx::query_scalar::<_, bool>(
"SELECT EXISTS (
SELECT 1
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON c.oid = a.attrelid
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'df'
AND c.relname = 'instances'
AND a.attname = 'blocked_on_signal'
AND NOT a.attisdropped
)",
)
.fetch_one(pool)
.await
.map_err(|e| format!("Failed to inspect df.instances columns: {e}"))
}

/// Update the status of an instance in df.instances
pub async fn execute(
ctx: ActivityContext,
Expand All @@ -26,17 +46,38 @@ pub async fn execute(
"Updating instance {instance_id} status to {status}"
));

let has_blocked_on_signal = instances_have_blocked_on_signal(pool.as_ref()).await?;

// Never overwrite a terminal state ('completed', 'failed', 'cancelled') with any status.
// This prevents a race where an in-flight activity (scheduled just before cancel was
// processed) tries to flip the status back from 'cancelled' to 'running'/'completed'.
let query = if status == "completed" {
let clears_blocked_on_signal =
has_blocked_on_signal && matches!(status, "completed" | "failed" | "cancelled");

let query = if status == "completed" && clears_blocked_on_signal {
sqlx::query(
"UPDATE df.instances
SET status = $1, completed_at = now(), updated_at = now(), blocked_on_signal = NULL
WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')",
)
.bind(status)
.bind(instance_id)
} else if status == "completed" {
sqlx::query(
"UPDATE df.instances
SET status = $1, completed_at = now(), updated_at = now()
WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')",
)
.bind(status)
.bind(instance_id)
} else if clears_blocked_on_signal {
sqlx::query(
"UPDATE df.instances
SET status = $1, updated_at = now(), blocked_on_signal = NULL
WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')",
)
.bind(status)
.bind(instance_id)
} else {
sqlx::query(
"UPDATE df.instances
Expand Down
110 changes: 110 additions & 0 deletions src/activities/update_node_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

use duroxide::ActivityContext;
use sqlx::PgPool;
use sqlx::Row;
use std::sync::Arc;

use crate::activities::update_instance_status::instances_have_blocked_on_signal;

/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::update-node-status";

Expand Down Expand Up @@ -75,6 +78,11 @@ pub async fn execute(
Ok(done) => {
let rows = done.rows_affected();
if rows == 1 {
if let Err(e) = sync_instance_signal_wait(pool.as_ref(), instance_id, node_id).await
{
ctx.trace_info(&e);
return Err(e);
}
Ok("Node status updated".to_string())
} else {
// Exactly one row must match (instance_id, id). Anything else
Expand All @@ -96,3 +104,105 @@ pub async fn execute(
}
}
}

async fn sync_instance_signal_wait(
pool: &PgPool,
instance_id: &str,
node_id: &str,
) -> Result<(), String> {
if !instances_have_blocked_on_signal(pool).await? {
return Ok(());
}

let row = sqlx::query(
"SELECT node_type
FROM df.nodes
WHERE id = $1 AND instance_id = $2",
)
.bind(node_id)
.bind(instance_id)
.fetch_optional(pool)
.await
.map_err(|e| format!("Failed to load node for signal wait sync: {e}"))?;

let Some(row) = row else {
return Err(format!(
"signal wait sync found no node {node_id} in instance {instance_id}"
));
};

let node_type: String = row
.try_get("node_type")
.map_err(|e| format!("Failed to read node_type for signal wait sync: {e}"))?;
if node_type != "SIGNAL" {
return Ok(());
}

let mut tx = pool
.begin()
.await
.map_err(|e| format!("Failed to start signal wait sync transaction: {e}"))?;

let instance_row = sqlx::query(
"SELECT 1
FROM df.instances
WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')
FOR UPDATE",
)
.bind(instance_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| format!("Failed to lock instance for signal wait sync: {e}"))?;

if instance_row.is_none() {
tx.commit()
.await
.map_err(|e| format!("Failed to finish signal wait sync transaction: {e}"))?;
return Ok(());
}

// Parallel branches can have overlapping SIGNAL waits. Recompute from the
// current running SIGNAL nodes instead of blindly clearing this node's name.
let running_signal_query = sqlx::query_scalar::<_, Option<String>>(
"SELECT query
FROM df.nodes
WHERE instance_id = $1
AND node_type = 'SIGNAL'
AND status = 'running'
ORDER BY updated_at DESC, id
LIMIT 1",
)
.bind(instance_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| format!("Failed to find running SIGNAL node: {e}"))?
.flatten();

let blocked_on_signal = running_signal_query.and_then(|config_str| {
serde_json::from_str::<serde_json::Value>(&config_str)
.ok()
.and_then(|config| {
config
.get("signal_name")
.and_then(|value| value.as_str())
.map(str::to_string)
})
});

sqlx::query(
"UPDATE df.instances
SET blocked_on_signal = $1, updated_at = now()
WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')",
)
.bind(blocked_on_signal)
.bind(instance_id)
.execute(&mut *tx)
.await
.map_err(|e| format!("Failed to sync instance signal wait marker: {e}"))?;

tx.commit()
.await
.map_err(|e| format!("Failed to finish signal wait sync transaction: {e}"))?;

Ok(())
}
45 changes: 38 additions & 7 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@ fn legacy_login_role_schema() -> bool {
!owner_scoped_vars_enabled()
}

fn instances_have_blocked_on_signal() -> bool {
Spi::get_one::<bool>(
"SELECT EXISTS (
SELECT 1
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON c.oid = a.attrelid
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'df'
AND c.relname = 'instances'
AND a.attname = 'blocked_on_signal'
AND NOT a.attisdropped
)",
)
.ok()
.flatten()
.unwrap_or(false)
}

/// Sets a workflow variable. Must be called BEFORE df.start(), not inside a workflow.
/// Variables are captured at df.start() and remain immutable during execution.
/// Each user has their own variable namespace (owner = current_user).
Expand Down Expand Up @@ -1101,13 +1119,26 @@ pub fn cancel(instance_id: &str, reason: default!(&str, "'Cancelled by user'"))
// 1. Overwriting a 'completed' or 'failed' instance that finished before the cancel
// signal was processed by duroxide.
// 2. Calling df.cancel twice in a row (idempotent by guard).
// User has column-level UPDATE on (status, updated_at) with RLS restricting to own rows.
Spi::run_with_args(
"UPDATE df.instances SET status = 'cancelled', updated_at = pg_catalog.now() \
WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')",
&[instance_id.into()],
)
.unwrap_or_else(|e| warning!("Failed to update instance status: {e}"));
// User has column-level UPDATE on runtime status columns with RLS restricting to own rows.
if instances_have_blocked_on_signal() {
Spi::run_with_args(
"UPDATE df.instances
SET status = 'cancelled',
blocked_on_signal = NULL,
updated_at = pg_catalog.now()
WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')",
&[instance_id.into()],
)
.unwrap_or_else(|e| warning!("Failed to update instance status: {e}"));
} else {
Spi::run_with_args(
"UPDATE df.instances
SET status = 'cancelled', updated_at = pg_catalog.now()
WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')",
&[instance_id.into()],
)
.unwrap_or_else(|e| warning!("Failed to update instance status: {e}"));
}

format!("Instance {instance_id} cancelled: {reason}")
}
Expand Down
Loading
Loading