Skip to content

Commit 6f2be89

Browse files
feat(aggregation-mode): handle processing that have failed in the aggregation process (#2210)
1 parent 51c0b7d commit 6f2be89

File tree

7 files changed

+74
-20
lines changed

7 files changed

+74
-20
lines changed

aggregation_mode/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
tokio = { version = "1"}
8-
# TODO: enable tls
9-
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] }
8+
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate", "chrono" ] }
109

1110

1211
[[bin]]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE tasks add COLUMN status_updated_at TIMESTAMPTZ DEFAULT now();

aggregation_mode/db/src/types.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use sqlx::{
22
prelude::FromRow,
3-
types::{BigDecimal, Uuid},
3+
types::{
4+
chrono::{DateTime, Utc},
5+
BigDecimal, Uuid,
6+
},
47
Type,
58
};
69

@@ -21,6 +24,7 @@ pub struct Task {
2124
pub program_commitment: Vec<u8>,
2225
pub merkle_path: Option<Vec<u8>>,
2326
pub status: TaskStatus,
27+
pub status_updated_at: DateTime<Utc>,
2428
}
2529

2630
#[derive(Debug, Clone, FromRow)]

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,18 @@ impl Db {
2323
Ok(Self { pool })
2424
}
2525

26-
pub async fn get_pending_tasks_and_mark_them_as_processing(
26+
/// Fetches tasks that are ready to be processed and atomically updates their status.
27+
///
28+
/// This function selects up to `limit` tasks for the given `proving_system_id` that are
29+
/// either:
30+
/// - in `pending` status, or
31+
/// - in `processing` status but whose `status_updated_at` timestamp is older than 12 hours
32+
/// (to recover tasks that may have been abandoned or stalled).
33+
///
34+
/// The selected rows are locked using `FOR UPDATE SKIP LOCKED` to ensure safe concurrent
35+
/// processing by multiple workers. All selected tasks have their status set to
36+
/// `processing` and their `status_updated_at` updated to `now()` before being returned.
37+
pub async fn get_tasks_to_process_and_update_their_status(
2738
&self,
2839
proving_system_id: i32,
2940
limit: i64,
@@ -32,12 +43,19 @@ impl Db {
3243
"WITH selected AS (
3344
SELECT task_id
3445
FROM tasks
35-
WHERE proving_system_id = $1 AND status = 'pending'
46+
WHERE proving_system_id = $1
47+
AND (
48+
status = 'pending'
49+
OR (
50+
status = 'processing'
51+
AND status_updated_at <= now() - interval '12 hours'
52+
)
53+
)
3654
LIMIT $2
3755
FOR UPDATE SKIP LOCKED
3856
)
3957
UPDATE tasks t
40-
SET status = 'processing'
58+
SET status = 'processing', status_updated_at = now()
4159
FROM selected s
4260
WHERE t.task_id = s.task_id
4361
RETURNING t.*;",
@@ -61,7 +79,7 @@ impl Db {
6179

6280
for (task_id, merkle_path) in updates {
6381
if let Err(e) = sqlx::query(
64-
"UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2",
82+
"UPDATE tasks SET merkle_path = $1, status = 'verified', status_updated_at = now(), proof = NULL WHERE task_id = $2",
6583
)
6684
.bind(merkle_path)
6785
.bind(task_id)
@@ -83,6 +101,20 @@ impl Db {
83101
Ok(())
84102
}
85103

86-
// TODO: this should be used when rolling back processing proofs on unexpected errors
87-
pub async fn mark_tasks_as_pending(&self) {}
104+
pub async fn mark_tasks_as_pending(&self, tasks_id: &[Uuid]) -> Result<(), DbError> {
105+
if tasks_id.is_empty() {
106+
return Ok(());
107+
}
108+
109+
sqlx::query(
110+
"UPDATE tasks SET status = 'pending', status_updated_at = now()
111+
WHERE task_id = ANY($1) AND status = 'processing'",
112+
)
113+
.bind(tasks_id)
114+
.execute(&self.pool)
115+
.await
116+
.map_err(|e| DbError::Query(e.to_string()))?;
117+
118+
Ok(())
119+
}
88120
}

aggregation_mode/proof_aggregator/src/backend/fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl ProofsFetcher {
3030
) -> Result<(Vec<AlignedProof>, Vec<Uuid>), ProofsFetcherError> {
3131
let tasks = self
3232
.db
33-
.get_pending_tasks_and_mark_them_as_processing(engine.proving_system_id() as i32, limit)
33+
.get_tasks_to_process_and_update_their_status(engine.proving_system_id() as i32, limit)
3434
.await
3535
.map_err(ProofsFetcherError::Query)?;
3636

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,28 +119,42 @@ impl ProofAggregator {
119119
info!("Starting proof aggregator service");
120120

121121
info!("About to aggregate and submit proof to be verified on chain");
122-
let res = self.aggregate_and_submit_proofs_on_chain().await;
122+
123+
let (proofs, tasks_id) = match self
124+
.fetcher
125+
.fetch_pending_proofs(self.engine.clone(), self.config.total_proofs_limit as i64)
126+
.await
127+
.map_err(AggregatedProofSubmissionError::FetchingProofs)
128+
{
129+
Ok(res) => res,
130+
Err(e) => {
131+
error!("Error while aggregating and submitting proofs: {:?}", e);
132+
return;
133+
}
134+
};
135+
136+
let res = self
137+
.aggregate_and_submit_proofs_on_chain((proofs, &tasks_id))
138+
.await;
123139

124140
match res {
125141
Ok(()) => {
126142
info!("Process finished successfully");
127143
}
128144
Err(err) => {
129145
error!("Error while aggregating and submitting proofs: {:?}", err);
146+
warn!("Marking tasks back to pending after failure");
147+
if let Err(e) = self.db.mark_tasks_as_pending(&tasks_id).await {
148+
error!("Error while marking proofs to pending again: {:?}", e);
149+
};
130150
}
131151
}
132152
}
133153

134-
// TODO: on failure, mark proofs as pending again
135154
async fn aggregate_and_submit_proofs_on_chain(
136155
&mut self,
156+
(proofs, tasks_id): (Vec<AlignedProof>, &[Uuid]),
137157
) -> Result<(), AggregatedProofSubmissionError> {
138-
let (proofs, tasks_id) = self
139-
.fetcher
140-
.fetch_pending_proofs(self.engine.clone(), self.config.total_proofs_limit as i64)
141-
.await
142-
.map_err(AggregatedProofSubmissionError::FetchingProofs)?;
143-
144158
if proofs.is_empty() {
145159
warn!("No proofs collected, skipping aggregation...");
146160
return Ok(());
@@ -215,7 +229,7 @@ impl ProofAggregator {
215229

216230
info!("Storing merkle paths for each task...",);
217231
let mut merkle_paths_for_tasks: Vec<(Uuid, Vec<u8>)> = vec![];
218-
for (idx, task_id) in tasks_id.into_iter().enumerate() {
232+
for (idx, task_id) in tasks_id.iter().enumerate() {
219233
let Some(proof) = merkle_tree.get_proof_by_pos(idx) else {
220234
warn!("Proof not found for task id {task_id}");
221235
continue;
@@ -226,7 +240,7 @@ impl ProofAggregator {
226240
.flat_map(|e| e.to_vec())
227241
.collect::<Vec<_>>();
228242

229-
merkle_paths_for_tasks.push((task_id, proof_bytes))
243+
merkle_paths_for_tasks.push((*task_id, proof_bytes))
230244
}
231245
self.db
232246
.insert_tasks_merkle_path_and_mark_them_as_verified(merkle_paths_for_tasks)

0 commit comments

Comments
 (0)