diff --git a/CHANGELOG.md b/CHANGELOG.md index a0b82a1..2fcb340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,13 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc ## [0.2.4] - Unreleased +### Added + +- **`df.list_instances_paginated()`:** new monitoring function that lists instances with keyset (cursor) pagination, ordered by `(created_at DESC, id DESC)`. It returns the page rows plus a `total_count` and a `next_cursor` (pass the previous page's `next_cursor` as `after_cursor`, or `NULL` for the first page). A new `idx_instances_created_at_desc_id` index keeps paging an index scan. + ### Changed +- **`df.list_instances()`:** now also returns the `created_at` and `completed_at` timestamps for each instance, and orders by `(created_at DESC, id DESC)` for a stable total order. - **`df.grant_usage()` / `df.revoke_usage()`:** dropped the explicit per-function `EXECUTE` allowlist. Schema `USAGE` on `df` is the real access gate for ordinary `df.*` functions, so the helpers now grant/revoke schema `USAGE`, the table privileges, and `EXECUTE` only on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`). Function signatures are unchanged and existing privileges are unaffected (#242). ### Removed diff --git a/USER_GUIDE.md b/USER_GUIDE.md index e67f438..212b722 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1433,7 +1433,31 @@ SELECT * FROM df.list_instances('failed'); SELECT * FROM df.list_instances(NULL, 10); ``` -**Columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output` +**Columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at` + +Instances are ordered newest-first by `(created_at DESC, id DESC)`. + +### List Instances with Pagination + +For large instance counts, use cursor (keyset) pagination instead of a raw limit: + +```sql +-- First page (no cursor) +SELECT * FROM df.list_instances_paginated(NULL, 100, NULL); + +-- Filter by status, 50 per page +SELECT * FROM df.list_instances_paginated('completed', 50, NULL); + +-- Next page: pass the previous page's next_cursor as after_cursor +SELECT * FROM df.list_instances_paginated(NULL, 100, '2026-06-23 12:00:00+00|a1b2c3d4'); +``` + +**Columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at`, `total_count`, `next_cursor` + +- `total_count` is the number of instances visible to the caller for the given `status_filter`. +- `next_cursor` is the value to pass as `after_cursor` for the next page; it is `NULL` once the last page has been returned. + +Pages are ordered by `(created_at DESC, id DESC)` and backed by the `idx_instances_created_at_desc_id` index, so paging stays efficient regardless of offset. ### Instance Details diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 586e359..90f9725 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -205,6 +205,14 @@ what the upgrade script handles, and any backward compatibility considerations. ### v0.2.3 → v0.2.4 +#### Chronological instance listing — index, timestamp columns, cursor pagination +- **DDL change (df schema):** Adds the `idx_instances_created_at_desc_id` index on `df.instances(created_at DESC, id)`. Fresh installs create it in `src/lib.rs`; the upgrade script `sql/pg_durable--0.2.3--0.2.4.sql` creates it with `CREATE INDEX IF NOT EXISTS`. The resulting `pg_get_indexdef` is identical on both paths. +- **DDL change (df schema):** `df.list_instances(text, integer)` gains two output columns (`created_at`, `completed_at`). Because the return `TABLE` shape changed, the upgrade script `DROP`s and re-`CREATE`s the function (a `CREATE OR REPLACE` cannot change output columns). The function carries PostgreSQL's default PUBLIC `EXECUTE` and is referenced by no other object, so drop/recreate restores identical access. The argument signature `df.list_instances(text, integer)` is unchanged. +- **DDL change (df schema):** Adds `df.list_instances_paginated(text, integer, text)` returning the page rows plus `total_count` and `next_cursor`. Fresh installs create it from the generated function SQL (`src/monitoring.rs`); the upgrade script creates the matching binding to `list_instances_paginated_wrapper`. +- **Scenario A considerations:** Fresh-install and upgraded schemas both expose the new index, the extended `df.list_instances` return shape, and `df.list_instances_paginated`. The hand-written upgrade DDL mirrors the pgrx-generated install DDL (same arg types, defaults, and return columns), so the equivalence contract passes. +- **Scenario B1 considerations:** The new `.so` reads only `df.instances` columns (`id`, `label`, `status`, `created_at`, `completed_at`) that exist in all prior schemas in this provider line, so it runs against pre-0.2.4 schemas that have not applied `ALTER EXTENSION UPDATE`. The chronological index is a performance aid, not a correctness dependency — queries fall back to a sort when it is absent. +- **Scenario B2 considerations:** No data migration. Existing instances, nodes, and vars are untouched; the upgrade only adds one index and two function bindings. + #### Simplify `df.grant_usage()` — drop the explicit function allowlist - **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, and conditionally grants `df.http()` / the admin helpers. The signature `df.grant_usage(text, boolean, boolean)` is unchanged. - **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`), and the table privileges. The signature `df.revoke_usage(text)` is unchanged. diff --git a/scripts/run-pgspot.sh b/scripts/run-pgspot.sh index cd15e5d..9a0de67 100755 --- a/scripts/run-pgspot.sh +++ b/scripts/run-pgspot.sh @@ -50,6 +50,15 @@ PGSPOT_ALLOW=( # object, so this is safe. Scoped to these two functions only. '^PS002: Unsafe function creation: df\.grant_usage\(p_role text,include_http boolean,with_grant boolean\) at line [0-9]+$' '^PS002: Unsafe function creation: df\.revoke_usage\(p_role text\) at line [0-9]+$' + # Upgrade scripts create idx_instances_created_at_desc_id on df.instances. As + # with PS002 above, pgspot flags PS014 only because a standalone upgrade script + # has no `CREATE SCHEMA df` to prove df.instances is extension-owned (the + # install SQL does, so the same CREATE INDEX is not flagged there). The index + # is a plain btree on the columns (created_at DESC, id) with the default + # operator class -- it references no user-defined function or operator whose + # resolution could be hijacked via search_path -- so it is safe. Scoped to this + # one index only. + '^PS014: Unsafe index creation: idx_instances_created_at_desc_id at line [0-9]+$' ) # Whole codes to suppress globally (pgspot --ignore). Prefer PGSPOT_ALLOW. Empty. diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index efaab86..cc6fc23 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -166,3 +166,65 @@ CREATE FUNCTION df."await_instance"( STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'await_instance_wrapper'; + +-- ============================================================================ +-- Chronological instance listing: index + timestamp columns + cursor pagination. +-- +-- Adds a chronological keyset index, extends df.list_instances() with the +-- created_at / completed_at timestamps, and introduces df.list_instances_paginated() +-- for cursor-based paging. Fresh 0.2.4 installs create all three via src/lib.rs +-- (the index) and the generated function SQL (src/monitoring.rs); this section +-- brings pre-existing installs to the same shape (Scenario A). +-- No data migration is required (Scenario B2); the new .so reads the same +-- df.instances columns that already exist in all prior schemas (Scenario B1). +-- ============================================================================ + +-- Index for efficient chronological (keyset) listing of instances. Matches the +-- ORDER BY (created_at DESC, id DESC) used by both listing functions so paging +-- stays an index scan instead of a sort. +CREATE INDEX IF NOT EXISTS idx_instances_created_at_desc_id + ON df.instances(created_at DESC, id); + +-- df.list_instances() gains created_at / completed_at output columns. The return +-- TABLE shape changed, so the function must be dropped and recreated rather than +-- CREATE OR REPLACE'd. It carries PostgreSQL's default PUBLIC EXECUTE and is not +-- referenced by any other object, so the drop/recreate restores identical access. +DROP FUNCTION IF EXISTS df."list_instances"(TEXT, INT); +CREATE FUNCTION df."list_instances"( + "status_filter" TEXT DEFAULT NULL, + "limit_count" INT DEFAULT 100 +) RETURNS TABLE ( + "instance_id" TEXT, + "label" TEXT, + "function_name" TEXT, + "status" TEXT, + "execution_count" bigint, + "output" TEXT, + "created_at" timestamp with time zone, + "completed_at" timestamp with time zone +) +LANGUAGE c +AS 'MODULE_PATHNAME', 'list_instances_wrapper'; + +-- df.list_instances_paginated(): keyset (cursor) pagination ordered by +-- (created_at DESC, id DESC), returning the page rows plus total_count and the +-- next_cursor to fetch the following page. Bound to the C symbol +-- list_instances_paginated_wrapper exported by the new .so. +CREATE FUNCTION df."list_instances_paginated"( + "status_filter" TEXT DEFAULT NULL, + "limit_count" INT DEFAULT 100, + "after_cursor" TEXT DEFAULT NULL +) RETURNS TABLE ( + "instance_id" TEXT, + "label" TEXT, + "function_name" TEXT, + "status" TEXT, + "execution_count" bigint, + "output" TEXT, + "created_at" timestamp with time zone, + "completed_at" timestamp with time zone, + "total_count" bigint, + "next_cursor" TEXT +) +LANGUAGE c +AS 'MODULE_PATHNAME', 'list_instances_paginated_wrapper'; diff --git a/src/lib.rs b/src/lib.rs index 76b26a3..8f0bbad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -220,6 +220,9 @@ COMMENT ON COLUMN df.instances.submitted_by IS -- Index for finding pending instances CREATE INDEX idx_instances_status ON df.instances(status); +-- Index for efficient chronological (keyset) listing of instances +CREATE INDEX idx_instances_created_at_desc_id ON df.instances(created_at DESC, id); + -- Index for finding nodes by instance CREATE INDEX idx_nodes_instance ON df.nodes(instance_id); diff --git a/src/monitoring.rs b/src/monitoring.rs index 97c583b..bcf0b1a 100644 --- a/src/monitoring.rs +++ b/src/monitoring.rs @@ -10,6 +10,29 @@ use pgrx::prelude::*; use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; +const MAX_LIST_INSTANCES_LIMIT: i32 = 10000; + +fn validate_list_instances_limit(limit_count: i32) { + if limit_count < 1 { + pgrx::error!("limit_count must be at least 1"); + } + if limit_count > MAX_LIST_INSTANCES_LIMIT { + pgrx::error!("limit_count must be at most {}", MAX_LIST_INSTANCES_LIMIT); + } +} + +/// Split a keyset cursor of the form `|` into its parts. +fn parse_list_instances_cursor(after_cursor: Option<&str>) -> Option<(String, String)> { + let cursor = after_cursor?; + let (created_at, id) = cursor.rsplit_once('|').unwrap_or_else(|| { + pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'") + }); + if created_at.is_empty() || id.is_empty() { + pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'"); + } + Some((created_at.to_string(), id.to_string())) +} + // ============================================================================ // Monitoring Functions // ============================================================================ @@ -28,12 +51,11 @@ pub fn list_instances( name!(status, String), name!(execution_count, i64), name!(output, Option), + name!(created_at, pgrx::datum::TimestampWithTimeZone), + name!(completed_at, Option), ), > { - if limit_count < 1 { - pgrx::error!("limit_count must be at least 1"); - } - let limit_count = limit_count.min(10000); + validate_list_instances_limit(limit_count); let pg_conn_str = postgres_connection_string(); let provider_schema = backend_duroxide_schema(); @@ -43,17 +65,23 @@ pub fn list_instances( // df.list_instances(), df.instance_info()) share the same authoritative source // for the status column, eliminating the vocabulary mismatch between // df.instances.status ('cancelled') and duroxide executions.status ('Failed'). - let user_instances: Vec<(String, Option, String)> = Spi::connect(|client| { + let user_instances: Vec<( + String, + Option, + String, + pgrx::datum::TimestampWithTimeZone, + Option, + )> = Spi::connect(|client| { use pgrx::datum::DatumWithOid; let (sql, args): (&str, Vec) = if let Some(status) = status_filter { ( - "SELECT id, label, status FROM df.instances WHERE status = $1 ORDER BY created_at DESC LIMIT $2", + "SELECT id, label, status, created_at, completed_at FROM df.instances WHERE status = $1 ORDER BY created_at DESC, id DESC LIMIT $2", vec![status.into(), (limit_count as i64).into()], ) } else { ( - "SELECT id, label, status FROM df.instances ORDER BY created_at DESC LIMIT $1", + "SELECT id, label, status, created_at, completed_at FROM df.instances ORDER BY created_at DESC, id DESC LIMIT $1", vec![(limit_count as i64).into()], ) }; @@ -63,7 +91,13 @@ pub fn list_instances( if let Ok(Some(id)) = row.get::(1) { let label: Option = row.get(2).ok().flatten(); let status: String = row.get(3).ok().flatten().unwrap_or_default(); - instances.push((id, label, status)); + let created_at: Option = + row.get(4).ok().flatten(); + let completed_at: Option = + row.get(5).ok().flatten(); + if let Some(created_at) = created_at { + instances.push((id, label, status, created_at, completed_at)); + } } } } @@ -94,7 +128,196 @@ pub fn list_instances( // Only query duroxide for function_name, execution_count, and output. // Status is read from df.instances (already fetched above) to ensure all // monitoring APIs agree on the status value. - for (id, label, df_status) in &user_instances { + for (id, label, df_status, created_at, completed_at) in &user_instances { + if let Ok(info) = client.get_instance_info(id).await { + rows.push(( + info.instance_id, + label.clone(), + info.orchestration_name, + df_status.clone(), + info.current_execution_id as i64, + info.output, + *created_at, + *completed_at, + )); + } + } + rows + }); + + TableIterator::new(results) +} + +/// List durable function instances with keyset (cursor) pagination. +/// +/// Pages are ordered by `(created_at DESC, id DESC)` and use the +/// `idx_instances_created_at_desc_id` index for efficient scans. The +/// `after_cursor` is the `next_cursor` returned by the previous page (format +/// `|`); pass `NULL` for the first page. `total_count` +/// reflects all rows visible to the caller for the given `status_filter`, and +/// `next_cursor` is `NULL` once the final page has been returned. +#[pg_extern(schema = "df")] +pub fn list_instances_paginated( + status_filter: default!(Option<&str>, "NULL"), + limit_count: default!(i32, "100"), + after_cursor: default!(Option<&str>, "NULL"), +) -> TableIterator< + 'static, + ( + name!(instance_id, String), + name!(label, Option), + name!(function_name, String), + name!(status, String), + name!(execution_count, i64), + name!(output, Option), + name!(created_at, pgrx::datum::TimestampWithTimeZone), + name!(completed_at, Option), + name!(total_count, i64), + name!(next_cursor, Option), + ), +> { + validate_list_instances_limit(limit_count); + let cursor = parse_list_instances_cursor(after_cursor); + // Fetch one extra row so we can tell whether another page exists. + let fetch_limit_plus_one = (limit_count as i64) + 1; + + let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); + + let (total_count, mut user_instances): ( + i64, + Vec<( + String, + Option, + String, + pgrx::datum::TimestampWithTimeZone, + Option, + String, + )>, + ) = Spi::connect(|client| { + use pgrx::datum::DatumWithOid; + + let (count_sql, count_args): (&str, Vec) = if let Some(status) = status_filter + { + ( + "SELECT COUNT(*) FROM df.instances WHERE status = $1", + vec![status.into()], + ) + } else { + ("SELECT COUNT(*) FROM df.instances", vec![]) + }; + + let total_count = client + .select(count_sql, Some(1), &count_args) + .ok() + .and_then(|table| { + table + .into_iter() + .next() + .and_then(|row| row.get::(1).ok().flatten()) + }) + .unwrap_or(0); + + let (sql, args): (&str, Vec) = match (status_filter, cursor.as_ref()) { + (Some(status), Some((cursor_created_at, cursor_id))) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + WHERE status = $1 \ + AND (created_at < $2::timestamptz OR (created_at = $2::timestamptz AND id < $3)) \ + ORDER BY created_at DESC, id DESC \ + LIMIT $4", + vec![ + status.into(), + cursor_created_at.as_str().into(), + cursor_id.as_str().into(), + fetch_limit_plus_one.into(), + ], + ), + (Some(status), None) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + WHERE status = $1 \ + ORDER BY created_at DESC, id DESC \ + LIMIT $2", + vec![status.into(), fetch_limit_plus_one.into()], + ), + (None, Some((cursor_created_at, cursor_id))) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + WHERE (created_at < $1::timestamptz OR (created_at = $1::timestamptz AND id < $2)) \ + ORDER BY created_at DESC, id DESC \ + LIMIT $3", + vec![ + cursor_created_at.as_str().into(), + cursor_id.as_str().into(), + fetch_limit_plus_one.into(), + ], + ), + (None, None) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + ORDER BY created_at DESC, id DESC \ + LIMIT $1", + vec![fetch_limit_plus_one.into()], + ), + }; + + let mut instances = Vec::new(); + if let Ok(table) = client.select(sql, None, &args) { + for row in table { + if let Ok(Some(id)) = row.get::(1) { + let label: Option = row.get(2).ok().flatten(); + let status: String = row.get(3).ok().flatten().unwrap_or_default(); + let created_at: Option = + row.get(4).ok().flatten(); + let completed_at: Option = + row.get(5).ok().flatten(); + let next_cursor: String = row.get(6).ok().flatten().unwrap_or_default(); + if let Some(created_at) = created_at { + instances.push((id, label, status, created_at, completed_at, next_cursor)); + } + } + } + } + (total_count, instances) + }); + + if user_instances.is_empty() { + return TableIterator::new(vec![]); + } + + let has_more = user_instances.len() > limit_count as usize; + if has_more { + // Drop the lookahead row so the page contains at most `limit_count` rows. + user_instances.pop(); + } + let next_cursor = if has_more { + // After removing the lookahead row, last() is the final visible row. + user_instances + .last() + .map(|(_, _, _, _, _, cursor)| cursor.clone()) + } else { + None + }; + + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(_) => return TableIterator::new(vec![]), + }; + + let results = rt.block_on(async { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { + Ok(s) => s, + Err(_) => return vec![], + }; + + let client = Client::new(store); + + let mut rows = Vec::new(); + for (id, label, df_status, created_at, completed_at, _) in &user_instances { if let Ok(info) = client.get_instance_info(id).await { rows.push(( info.instance_id, @@ -103,6 +326,10 @@ pub fn list_instances( df_status.clone(), info.current_execution_id as i64, info.output, + *created_at, + *completed_at, + total_count, + next_cursor.clone(), )); } } diff --git a/tests/e2e/sql/50_list_instances_paginated.sql b/tests/e2e/sql/50_list_instances_paginated.sql new file mode 100644 index 0000000..f61382e --- /dev/null +++ b/tests/e2e/sql/50_list_instances_paginated.sql @@ -0,0 +1,182 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.list_instances() timestamp columns and df.list_instances_paginated() +-- +-- Verifies: +-- 1. df.list_instances() returns created_at / completed_at, non-null for +-- completed instances. +-- 2. df.list_instances_paginated() pages through every visible instance with +-- a small page size, in the same (created_at DESC, id DESC) order as +-- df.list_instances(), with no gaps or duplicates. +-- 3. total_count matches the number of visible instances and next_cursor is +-- NULL only on the final page. +-- +-- The test is robust to instances created by earlier E2E tests (df_e2e_user is +-- shared across the run): it compares pagination output against the full +-- df.list_instances() result rather than asserting absolute counts. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- =========================================================================== +-- Setup: start a handful of instances and wait for completion. +-- =========================================================================== + +DROP TABLE IF EXISTS _paginate_known; +CREATE TEMP TABLE _paginate_known (instance_id TEXT); + +-- Start the instances as a committed top-level statement so the background +-- worker can see them (df.start + await in the same transaction would deadlock: +-- the worker runs in a separate session and only sees committed rows). +INSERT INTO _paginate_known(instance_id) +SELECT df.start('SELECT ' || g, 'paginate-test-' || g) +FROM generate_series(1, 5) g; + +DO $$ +DECLARE + r RECORD; + status TEXT; +BEGIN + FOR r IN SELECT instance_id FROM _paginate_known LOOP + status := df.await_instance(r.instance_id, 30); + IF lower(status) != 'completed' THEN + RAISE EXCEPTION 'Setup failed: instance % expected completed, got %', r.instance_id, status; + END IF; + END LOOP; +END $$; + +-- =========================================================================== +-- 1. df.list_instances() exposes created_at / completed_at. +-- =========================================================================== + +DO $$ +DECLARE + missing INT; +BEGIN + SELECT count(*) INTO missing + FROM df.list_instances(NULL, 10000) l + JOIN _paginate_known k ON k.instance_id = l.instance_id + WHERE l.created_at IS NULL OR l.completed_at IS NULL; + + IF missing > 0 THEN + RAISE EXCEPTION 'FAILED: % completed instances have NULL created_at/completed_at', missing; + END IF; + + RAISE NOTICE 'PASSED: created_at/completed_at populated for completed instances'; +END $$; + +-- =========================================================================== +-- 2 & 3. Page through every instance and compare against df.list_instances(). +-- =========================================================================== + +-- Expected order: df.list_instances() already orders by (created_at DESC, id DESC). +DROP TABLE IF EXISTS _expected_order; +CREATE TEMP TABLE _expected_order AS +SELECT row_number() OVER () AS seq, instance_id +FROM df.list_instances(NULL, 10000); + +DROP TABLE IF EXISTS _collected; +CREATE TEMP TABLE _collected (seq INT, instance_id TEXT); + +DO $$ +DECLARE + v_cursor TEXT := NULL; + v_next TEXT; + v_total BIGINT; + v_seq INT := 0; + v_page_rows INT; + v_iterations INT := 0; + rec RECORD; + v_expected BIGINT; +BEGIN + SELECT count(*) INTO v_expected FROM _expected_order; + + LOOP + v_iterations := v_iterations + 1; + IF v_iterations > 1000 THEN + RAISE EXCEPTION 'FAILED: pagination did not terminate (possible cursor bug)'; + END IF; + + v_page_rows := 0; + v_next := NULL; + FOR rec IN + SELECT * FROM df.list_instances_paginated(NULL, 2, v_cursor) + LOOP + v_seq := v_seq + 1; + v_page_rows := v_page_rows + 1; + INSERT INTO _collected(seq, instance_id) VALUES (v_seq, rec.instance_id); + v_next := rec.next_cursor; + v_total := rec.total_count; + END LOOP; + + -- Empty page ends pagination (e.g. zero visible instances). + EXIT WHEN v_page_rows = 0; + + -- total_count must reflect all visible instances on every page. + IF v_total != v_expected THEN + RAISE EXCEPTION 'FAILED: total_count % does not match visible instance count %', + v_total, v_expected; + END IF; + + -- next_cursor is NULL only on the final page. + EXIT WHEN v_next IS NULL; + v_cursor := v_next; + END LOOP; + + RAISE NOTICE 'PASSED: paginated through % instances in % pages', v_seq, v_iterations; +END $$; + +-- The paginated sequence must exactly match df.list_instances() order. +DO $$ +DECLARE + mismatches INT; + collected_cnt INT; + expected_cnt INT; + dup_cnt INT; + known_missing INT; +BEGIN + SELECT count(*) INTO collected_cnt FROM _collected; + SELECT count(*) INTO expected_cnt FROM _expected_order; + + IF collected_cnt != expected_cnt THEN + RAISE EXCEPTION 'FAILED: paginated row count % != df.list_instances() count %', + collected_cnt, expected_cnt; + END IF; + + -- No duplicate instance_ids across pages. + SELECT count(*) INTO dup_cnt FROM ( + SELECT instance_id FROM _collected GROUP BY instance_id HAVING count(*) > 1 + ) d; + IF dup_cnt > 0 THEN + RAISE EXCEPTION 'FAILED: % instance_id(s) appeared on more than one page', dup_cnt; + END IF; + + -- Same order, position by position. + SELECT count(*) INTO mismatches + FROM _collected c + JOIN _expected_order e ON e.seq = c.seq + WHERE c.instance_id != e.instance_id; + IF mismatches > 0 THEN + RAISE EXCEPTION 'FAILED: % positions differ between paginated and list order', mismatches; + END IF; + + -- All known instances appear in the paginated output. + SELECT count(*) INTO known_missing + FROM _paginate_known k + WHERE NOT EXISTS (SELECT 1 FROM _collected c WHERE c.instance_id = k.instance_id); + IF known_missing > 0 THEN + RAISE EXCEPTION 'FAILED: % known instances missing from paginated output', known_missing; + END IF; + + RAISE NOTICE 'PASSED: paginated output matches df.list_instances() exactly'; +END $$; + +-- =========================================================================== +-- Cleanup +-- =========================================================================== + +DROP TABLE _collected; +DROP TABLE _expected_order; +DROP TABLE _paginate_known; + +SELECT 'TEST PASSED: list_instances_paginated' AS result;