Skip to content

Commit f088c81

Browse files
Copilotpinodeca
andcommitted
Progress update: metrics fix and regression test validated
Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com>
1 parent 902a88e commit f088c81

2 files changed

Lines changed: 125 additions & 13 deletions

File tree

src/monitoring.rs

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@ pub fn metrics() -> TableIterator<
304304
),
305305
> {
306306
let pg_conn_str = postgres_connection_string();
307-
let provider_schema = backend_duroxide_schema();
308307

309308
let rt = match tokio::runtime::Builder::new_current_thread()
310309
.enable_all()
@@ -315,21 +314,60 @@ pub fn metrics() -> TableIterator<
315314
};
316315

317316
let results = rt.block_on(async {
318-
let store = match new_backend_provider(&pg_conn_str, provider_schema).await {
319-
Ok(s) => s,
317+
let pool = match sqlx::postgres::PgPoolOptions::new()
318+
.max_connections(1)
319+
.connect(&pg_conn_str)
320+
.await
321+
{
322+
Ok(pool) => pool,
320323
Err(_) => return vec![],
321324
};
322325

323-
let client = Client::new(store);
324-
325-
match client.get_system_metrics().await {
326-
Ok(m) => vec![(
327-
m.total_instances as i64,
328-
m.running_instances as i64,
329-
m.completed_instances as i64,
330-
m.failed_instances as i64,
331-
m.total_executions as i64,
332-
m.total_events as i64,
326+
let row: Result<(i64, i64, i64, i64, i64, i64), sqlx::Error> = sqlx::query_as(
327+
r#"
328+
SELECT
329+
i.total_instances,
330+
i.running_instances,
331+
i.completed_instances,
332+
i.failed_instances,
333+
e.total_executions,
334+
h.total_events
335+
FROM (
336+
SELECT
337+
COUNT(*)::BIGINT AS total_instances,
338+
COUNT(*) FILTER (WHERE status = 'running')::BIGINT AS running_instances,
339+
COUNT(*) FILTER (WHERE status = 'completed')::BIGINT AS completed_instances,
340+
COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_instances
341+
FROM df.instances
342+
) i
343+
CROSS JOIN (
344+
SELECT COUNT(*)::BIGINT AS total_executions
345+
FROM duroxide.executions
346+
) e
347+
CROSS JOIN (
348+
SELECT COUNT(*)::BIGINT AS total_events
349+
FROM duroxide.history
350+
) h
351+
"#,
352+
)
353+
.fetch_one(&pool)
354+
.await;
355+
356+
match row {
357+
Ok((
358+
total_instances,
359+
running_instances,
360+
completed_instances,
361+
failed_instances,
362+
total_executions,
363+
total_events,
364+
)) => vec![(
365+
total_instances,
366+
running_instances,
367+
completed_instances,
368+
failed_instances,
369+
total_executions,
370+
total_events,
333371
)],
334372
Err(_) => vec![],
335373
}

tests/e2e/sql/05_monitoring_and_explain.sql

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,80 @@ END $$;
5757

5858
DROP TABLE _test_state;
5959

60+
-- Regression: rolled-back df.start() should not inflate failed_instances in df.metrics()
61+
DO $$
62+
DECLARE
63+
total_metrics BIGINT;
64+
running_metrics BIGINT;
65+
completed_metrics BIGINT;
66+
failed_metrics BIGINT;
67+
previous_failed_metrics BIGINT := -1;
68+
stable_checks INT := 0;
69+
attempts INT := 0;
70+
total_instances BIGINT;
71+
running_instances BIGINT;
72+
completed_instances BIGINT;
73+
failed_instances BIGINT;
74+
BEGIN
75+
BEGIN
76+
PERFORM df.start('SELECT 1', 'rollback-metrics-probe');
77+
RAISE EXCEPTION 'force rollback';
78+
EXCEPTION
79+
WHEN OTHERS THEN NULL;
80+
END;
81+
82+
-- Worker waits up to 5s for an instance row after dequeue. Poll until
83+
-- failed_instances stabilizes for 3 checks after at least ~6s.
84+
LOOP
85+
SELECT m.failed_instances INTO failed_metrics FROM df.metrics() m;
86+
87+
IF failed_metrics = previous_failed_metrics THEN
88+
stable_checks := stable_checks + 1;
89+
ELSE
90+
stable_checks := 0;
91+
previous_failed_metrics := failed_metrics;
92+
END IF;
93+
94+
EXIT WHEN (attempts >= 12 AND stable_checks >= 3) OR attempts >= 60;
95+
PERFORM pg_sleep(0.5);
96+
attempts := attempts + 1;
97+
END LOOP;
98+
99+
SELECT m.total_instances, m.running_instances, m.completed_instances, m.failed_instances
100+
INTO total_metrics, running_metrics, completed_metrics, failed_metrics
101+
FROM df.metrics() m;
102+
103+
SELECT
104+
COUNT(*)::BIGINT,
105+
COUNT(*) FILTER (WHERE lower(status) = 'running')::BIGINT,
106+
COUNT(*) FILTER (WHERE lower(status) = 'completed')::BIGINT,
107+
COUNT(*) FILTER (WHERE lower(status) = 'failed')::BIGINT
108+
INTO total_instances, running_instances, completed_instances, failed_instances
109+
FROM df.instances;
110+
111+
IF total_metrics != total_instances THEN
112+
RAISE EXCEPTION 'TEST FAILED: metrics total_instances=% does not match df.instances=%',
113+
total_metrics, total_instances;
114+
END IF;
115+
116+
IF running_metrics != running_instances THEN
117+
RAISE EXCEPTION 'TEST FAILED: metrics running_instances=% does not match df.instances=%',
118+
running_metrics, running_instances;
119+
END IF;
120+
121+
IF completed_metrics != completed_instances THEN
122+
RAISE EXCEPTION 'TEST FAILED: metrics completed_instances=% does not match df.instances=%',
123+
completed_metrics, completed_instances;
124+
END IF;
125+
126+
IF failed_metrics != failed_instances THEN
127+
RAISE EXCEPTION 'TEST FAILED: metrics failed_instances=% does not match df.instances=%',
128+
failed_metrics, failed_instances;
129+
END IF;
130+
131+
RAISE NOTICE 'TEST PASSED: rollback metrics consistency';
132+
END $$;
133+
60134
-- === Test: 10_explain ===
61135

62136
-- Test dry-run explain (use $body$ to avoid conflict with inner $$)

0 commit comments

Comments
 (0)