Skip to content

Commit cffae03

Browse files
Copilotpinodeca
andauthored
Add indexed paginated instance listing APIs
Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com>
1 parent 001bcb8 commit cffae03

4 files changed

Lines changed: 239 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc
99

1010
- Behavior change (bug fix): `df.join` / `df.join3` results are now a proper JSON array of objects instead of an array of double-encoded JSON strings. Consumers that previously unescaped each element (e.g. `(elem #>> '{}')::jsonb`) must now read the element directly (#143)
1111

12+
- Monitoring: add `idx_instances_created_at_desc_id` on `df.instances(created_at DESC, id)` to support efficient chronological listing; extend `df.list_instances` to return `created_at` and `completed_at`; add `df.list_instances_paginated(status_filter, limit_count, after_cursor)` with keyset cursor pagination and `total_count` / `next_cursor`.
13+
1214
## v0.2.1 (Released)
1315

1416
- Dependency: upgrade duroxide `0.1.26→0.1.28` and duroxide-pg-opt `v0.1.23→v0.1.26`; adds cached-plan retryability, instance stats API, and error propagation fixes; switches TLS backend to `native-tls`, removing the `ring` crate entirely (#116)

USER_GUIDE.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1391,9 +1391,17 @@ SELECT * FROM df.list_instances('Failed');
13911391

13921392
-- With limit
13931393
SELECT * FROM df.list_instances(NULL, 10);
1394+
1395+
-- Cursor-based pagination (first page)
1396+
SELECT * FROM df.list_instances_paginated(NULL, 20, NULL);
1397+
1398+
-- Cursor-based pagination (next page)
1399+
SELECT * FROM df.list_instances_paginated(NULL, 20, '2026-01-01 12:00:00+00:00|a1b2c3d4');
13941400
```
13951401

1396-
**Columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`
1402+
**df.list_instances columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at`
1403+
1404+
**df.list_instances_paginated columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at`, `total_count`, `next_cursor`
13971405

13981406
### Instance Details
13991407

@@ -1626,6 +1634,7 @@ GRANT EXECUTE ON FUNCTION df.cancel(text, text) TO app_role;
16261634
GRANT EXECUTE ON FUNCTION df.wait_for_completion(text, integer) TO app_role;
16271635
GRANT EXECUTE ON FUNCTION df.run(text) TO app_role;
16281636
GRANT EXECUTE ON FUNCTION df.list_instances(text, integer) TO app_role;
1637+
GRANT EXECUTE ON FUNCTION df.list_instances_paginated(text, integer, text) TO app_role;
16291638
GRANT EXECUTE ON FUNCTION df.instance_info(text) TO app_role;
16301639
GRANT EXECUTE ON FUNCTION df.instance_nodes(text, integer) TO app_role;
16311640
GRANT EXECUTE ON FUNCTION df.instance_executions(text, integer) TO app_role;

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ COMMENT ON COLUMN df.instances.submitted_by IS
188188
189189
-- Index for finding pending instances
190190
CREATE INDEX IF NOT EXISTS idx_instances_status ON df.instances(status);
191+
CREATE INDEX IF NOT EXISTS idx_instances_created_at_desc_id ON df.instances(created_at DESC, id);
191192
192193
-- Index for finding nodes by instance
193194
CREATE INDEX IF NOT EXISTS idx_nodes_instance ON df.nodes(instance_id);
@@ -392,6 +393,7 @@ DECLARE
392393
'df.wait_for_completion(text, integer)',
393394
'df.run(text)',
394395
'df.list_instances(text, integer)',
396+
'df.list_instances_paginated(text, integer, text)',
395397
'df.instance_info(text)',
396398
'df.instance_nodes(text, integer)',
397399
'df.instance_executions(text, integer)',

src/monitoring.rs

Lines changed: 225 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,28 @@ use pgrx::prelude::*;
77

88
use crate::types::{new_backend_provider, postgres_connection_string};
99

10+
const MAX_LIST_INSTANCES_LIMIT: i32 = 10000;
11+
12+
fn validate_list_instances_limit(limit_count: i32) {
13+
if limit_count < 1 {
14+
pgrx::error!("limit_count must be at least 1");
15+
}
16+
if limit_count > MAX_LIST_INSTANCES_LIMIT {
17+
pgrx::error!("limit_count must be at most {}", MAX_LIST_INSTANCES_LIMIT);
18+
}
19+
}
20+
21+
fn parse_list_instances_cursor(after_cursor: Option<&str>) -> Option<(String, String)> {
22+
let cursor = after_cursor?;
23+
let (created_at, id) = cursor.rsplit_once('|').unwrap_or_else(|| {
24+
pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'")
25+
});
26+
if created_at.is_empty() || id.is_empty() {
27+
pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'");
28+
}
29+
Some((created_at.to_string(), id.to_string()))
30+
}
31+
1032
// ============================================================================
1133
// Monitoring Functions
1234
// ============================================================================
@@ -25,12 +47,11 @@ pub fn list_instances(
2547
name!(status, String),
2648
name!(execution_count, i64),
2749
name!(output, Option<String>),
50+
name!(created_at, pgrx::datum::TimestampWithTimeZone),
51+
name!(completed_at, Option<pgrx::datum::TimestampWithTimeZone>),
2852
),
2953
> {
30-
if limit_count < 1 {
31-
pgrx::error!("limit_count must be at least 1");
32-
}
33-
let limit_count = limit_count.min(10000);
54+
validate_list_instances_limit(limit_count);
3455

3556
let pg_conn_str = postgres_connection_string();
3657

@@ -39,17 +60,23 @@ pub fn list_instances(
3960
// df.list_instances(), df.instance_info()) share the same authoritative source
4061
// for the status column, eliminating the vocabulary mismatch between
4162
// df.instances.status ('cancelled') and duroxide executions.status ('Failed').
42-
let user_instances: Vec<(String, Option<String>, String)> = Spi::connect(|client| {
63+
let user_instances: Vec<(
64+
String,
65+
Option<String>,
66+
String,
67+
pgrx::datum::TimestampWithTimeZone,
68+
Option<pgrx::datum::TimestampWithTimeZone>,
69+
)> = Spi::connect(|client| {
4370
use pgrx::datum::DatumWithOid;
4471

4572
let (sql, args): (&str, Vec<DatumWithOid>) = if let Some(status) = status_filter {
4673
(
47-
"SELECT id, label, status FROM df.instances WHERE status = $1 ORDER BY created_at DESC LIMIT $2",
74+
"SELECT id, label, status, created_at, completed_at FROM df.instances WHERE status = $1 ORDER BY created_at DESC, id DESC LIMIT $2",
4875
vec![status.into(), (limit_count as i64).into()],
4976
)
5077
} else {
5178
(
52-
"SELECT id, label, status FROM df.instances ORDER BY created_at DESC LIMIT $1",
79+
"SELECT id, label, status, created_at, completed_at FROM df.instances ORDER BY created_at DESC, id DESC LIMIT $1",
5380
vec![(limit_count as i64).into()],
5481
)
5582
};
@@ -59,7 +86,13 @@ pub fn list_instances(
5986
if let Ok(Some(id)) = row.get::<String>(1) {
6087
let label: Option<String> = row.get(2).ok().flatten();
6188
let status: String = row.get(3).ok().flatten().unwrap_or_default();
62-
instances.push((id, label, status));
89+
let created_at: Option<pgrx::datum::TimestampWithTimeZone> =
90+
row.get(4).ok().flatten();
91+
let completed_at: Option<pgrx::datum::TimestampWithTimeZone> =
92+
row.get(5).ok().flatten();
93+
if let Some(created_at) = created_at {
94+
instances.push((id, label, status, created_at, completed_at));
95+
}
6396
}
6497
}
6598
}
@@ -90,7 +123,186 @@ pub fn list_instances(
90123
// Only query duroxide for function_name, execution_count, and output.
91124
// Status is read from df.instances (already fetched above) to ensure all
92125
// monitoring APIs agree on the status value.
93-
for (id, label, df_status) in &user_instances {
126+
for (id, label, df_status, created_at, completed_at) in &user_instances {
127+
if let Ok(info) = client.get_instance_info(id).await {
128+
rows.push((
129+
info.instance_id,
130+
label.clone(),
131+
info.orchestration_name,
132+
df_status.clone(),
133+
info.current_execution_id as i64,
134+
info.output,
135+
*created_at,
136+
*completed_at,
137+
));
138+
}
139+
}
140+
rows
141+
});
142+
143+
TableIterator::new(results)
144+
}
145+
146+
/// List durable function instances with keyset pagination.
147+
#[pg_extern(schema = "df")]
148+
pub fn list_instances_paginated(
149+
status_filter: default!(Option<&str>, "NULL"),
150+
limit_count: default!(i32, "100"),
151+
after_cursor: default!(Option<&str>, "NULL"),
152+
) -> TableIterator<
153+
'static,
154+
(
155+
name!(instance_id, String),
156+
name!(label, Option<String>),
157+
name!(function_name, String),
158+
name!(status, String),
159+
name!(execution_count, i64),
160+
name!(output, Option<String>),
161+
name!(created_at, pgrx::datum::TimestampWithTimeZone),
162+
name!(completed_at, Option<pgrx::datum::TimestampWithTimeZone>),
163+
name!(total_count, i64),
164+
name!(next_cursor, Option<String>),
165+
),
166+
> {
167+
validate_list_instances_limit(limit_count);
168+
let cursor = parse_list_instances_cursor(after_cursor);
169+
let fetch_limit_plus_one = (limit_count as i64) + 1;
170+
let pg_conn_str = postgres_connection_string();
171+
172+
let (total_count, mut user_instances): (
173+
i64,
174+
Vec<(
175+
String,
176+
Option<String>,
177+
String,
178+
pgrx::datum::TimestampWithTimeZone,
179+
Option<pgrx::datum::TimestampWithTimeZone>,
180+
String,
181+
)>,
182+
) = Spi::connect(|client| {
183+
use pgrx::datum::DatumWithOid;
184+
185+
let (count_sql, count_args): (&str, Vec<DatumWithOid>) = if let Some(status) = status_filter
186+
{
187+
(
188+
"SELECT COUNT(*) FROM df.instances WHERE status = $1",
189+
vec![status.into()],
190+
)
191+
} else {
192+
("SELECT COUNT(*) FROM df.instances", vec![])
193+
};
194+
195+
let total_count = client
196+
.select(count_sql, Some(1), &count_args)
197+
.ok()
198+
.and_then(|table| {
199+
table
200+
.into_iter()
201+
.next()
202+
.and_then(|row| row.get::<i64>(1).ok().flatten())
203+
})
204+
.unwrap_or(0);
205+
206+
let (sql, args): (&str, Vec<DatumWithOid>) = match (status_filter, cursor.as_ref()) {
207+
(Some(status), Some((cursor_created_at, cursor_id))) => (
208+
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
209+
FROM df.instances \
210+
WHERE status = $1 \
211+
AND (created_at < $2::timestamptz OR (created_at = $2::timestamptz AND id < $3)) \
212+
ORDER BY created_at DESC, id DESC \
213+
LIMIT $4",
214+
vec![
215+
status.into(),
216+
cursor_created_at.as_str().into(),
217+
cursor_id.as_str().into(),
218+
fetch_limit_plus_one.into(),
219+
],
220+
),
221+
(Some(status), None) => (
222+
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
223+
FROM df.instances \
224+
WHERE status = $1 \
225+
ORDER BY created_at DESC, id DESC \
226+
LIMIT $2",
227+
vec![status.into(), fetch_limit_plus_one.into()],
228+
),
229+
(None, Some((cursor_created_at, cursor_id))) => (
230+
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
231+
FROM df.instances \
232+
WHERE (created_at < $1::timestamptz OR (created_at = $1::timestamptz AND id < $2)) \
233+
ORDER BY created_at DESC, id DESC \
234+
LIMIT $3",
235+
vec![
236+
cursor_created_at.as_str().into(),
237+
cursor_id.as_str().into(),
238+
fetch_limit_plus_one.into(),
239+
],
240+
),
241+
(None, None) => (
242+
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
243+
FROM df.instances \
244+
ORDER BY created_at DESC, id DESC \
245+
LIMIT $1",
246+
vec![fetch_limit_plus_one.into()],
247+
),
248+
};
249+
250+
let mut instances = Vec::new();
251+
if let Ok(table) = client.select(sql, None, &args) {
252+
for row in table {
253+
if let Ok(Some(id)) = row.get::<String>(1) {
254+
let label: Option<String> = row.get(2).ok().flatten();
255+
let status: String = row.get(3).ok().flatten().unwrap_or_default();
256+
let created_at: Option<pgrx::datum::TimestampWithTimeZone> =
257+
row.get(4).ok().flatten();
258+
let completed_at: Option<pgrx::datum::TimestampWithTimeZone> =
259+
row.get(5).ok().flatten();
260+
let next_cursor: String = row.get(6).ok().flatten().unwrap_or_default();
261+
if let Some(created_at) = created_at {
262+
instances.push((id, label, status, created_at, completed_at, next_cursor));
263+
}
264+
}
265+
}
266+
}
267+
(total_count, instances)
268+
});
269+
270+
if user_instances.is_empty() {
271+
return TableIterator::new(vec![]);
272+
}
273+
274+
// Fetching one extra row (limit + 1) lets us detect whether another page exists.
275+
let has_more = user_instances.len() > limit_count as usize;
276+
if has_more {
277+
user_instances.pop();
278+
}
279+
let next_cursor = if has_more {
280+
// After removing the lookahead row, last() is the final visible row.
281+
user_instances
282+
.last()
283+
.map(|(_, _, _, _, _, cursor)| cursor.clone())
284+
} else {
285+
None
286+
};
287+
288+
let rt = match tokio::runtime::Builder::new_current_thread()
289+
.enable_all()
290+
.build()
291+
{
292+
Ok(rt) => rt,
293+
Err(_) => return TableIterator::new(vec![]),
294+
};
295+
296+
let results = rt.block_on(async {
297+
let store = match new_backend_provider(&pg_conn_str).await {
298+
Ok(s) => s,
299+
Err(_) => return vec![],
300+
};
301+
302+
let client = Client::new(store);
303+
304+
let mut rows = Vec::new();
305+
for (id, label, df_status, created_at, completed_at, _) in &user_instances {
94306
if let Ok(info) = client.get_instance_info(id).await {
95307
rows.push((
96308
info.instance_id,
@@ -99,6 +311,10 @@ pub fn list_instances(
99311
df_status.clone(),
100312
info.current_execution_id as i64,
101313
info.output,
314+
*created_at,
315+
*completed_at,
316+
total_count,
317+
next_cursor.clone(),
102318
));
103319
}
104320
}

0 commit comments

Comments
 (0)