From 8a9699088d790f7820b1d9f2982f4dc7a42ac61b Mon Sep 17 00:00:00 2001 From: gasp Date: Fri, 3 Apr 2026 00:39:52 +0200 Subject: [PATCH 1/2] fixes --- .../diesel-models/src/query/dead_letter.rs | 16 +++++- .../crates/diesel-models/src/query/pgmq.rs | 20 ++++++- .../src/repositories/dead_letter.rs | 55 ++++++++++++------- modules/meteroid/src/workers/pgmq/outbox.rs | 13 ++++- .../meteroid/src/workers/pgmq/processor.rs | 8 +-- .../meteroid/src/workers/pgmq/webhook_out.rs | 12 ++-- 6 files changed, 86 insertions(+), 38 deletions(-) diff --git a/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs b/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs index 8d76be6e0..bdf54f17a 100644 --- a/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs +++ b/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs @@ -118,6 +118,19 @@ impl DeadLetterMessageRow { .into_db_result() } + pub async fn find_pending_by_ids( + conn: &mut PgConn, + ids: &[Uuid], + ) -> DbResult> { + dead_letter_message::table + .filter(dead_letter_message::id.eq_any(ids)) + .filter(dead_letter_message::status.eq(DeadLetterStatusEnum::Pending)) + .get_results(conn) + .await + .attach("Failed to fetch pending dead letter messages") + .into_db_result() + } + pub async fn batch_update_status( conn: &mut PgConn, ids: &[Uuid], @@ -199,7 +212,8 @@ pub async fn search_organizations( use crate::schema::tenant; use diesel_async::RunQueryDsl; - let pattern = format!("%{query}%"); + let escaped = query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_"); + let pattern = format!("%{escaped}%"); let orgs: Vec = organization::table .filter( diff --git a/modules/meteroid/crates/diesel-models/src/query/pgmq.rs b/modules/meteroid/crates/diesel-models/src/query/pgmq.rs index 587304ee2..f2f82e3a5 100644 --- a/modules/meteroid/crates/diesel-models/src/query/pgmq.rs +++ b/modules/meteroid/crates/diesel-models/src/query/pgmq.rs @@ -11,6 +11,22 @@ pub async fn send_batch( queue: &str, batch: &[PgmqMessageRowNew], ) -> DbResult<()> { + send_batch_returning_ids(conn, queue, batch) + .await + .map(drop) +} + +pub async fn send_batch_returning_ids( + conn: &mut PgConn, + queue: &str, + batch: &[PgmqMessageRowNew], +) -> DbResult> { + #[derive(diesel::QueryableByName)] + struct SendResult { + #[diesel(sql_type = sql_types::BigInt)] + msg_id: i64, + } + let raw_query = r"SELECT * from pgmq.send_batch($1, $2, $3) as msg_id"; let (messages, headers): (Vec<_>, Vec<_>) = @@ -20,9 +36,9 @@ pub async fn send_batch( .bind::(queue) .bind::>, _>(messages) .bind::>, _>(headers) - .execute(conn) + .get_results::(conn) .await - .map(drop) + .map(|rows| rows.into_iter().map(|r| r.msg_id).collect()) .attach("Error while sending batch of messages to pgmq") .into_db_result() } diff --git a/modules/meteroid/crates/meteroid-store/src/repositories/dead_letter.rs b/modules/meteroid/crates/meteroid-store/src/repositories/dead_letter.rs index 0c5a1bc3d..b7bbd4f28 100644 --- a/modules/meteroid/crates/meteroid-store/src/repositories/dead_letter.rs +++ b/modules/meteroid/crates/meteroid-store/src/repositories/dead_letter.rs @@ -159,16 +159,18 @@ impl DeadLetterInterface for Store { headers: entry.headers.map(Headers), }; - pgmq::send_batch(&mut conn, queue.as_str(), &[msg]) + let new_ids = pgmq::send_batch_returning_ids(&mut conn, queue.as_str(), &[msg]) .await .map_err(Into::>::into)?; + let requeued_pgmq_msg_id = new_ids.into_iter().next(); + DeadLetterMessageRow::update_status( &mut conn, id, diesel_models::enums::DeadLetterStatusEnum::Requeued, resolved_by, - None, + requeued_pgmq_msg_id, ) .await .map(Into::into) @@ -213,28 +215,43 @@ impl DeadLetterInterface for Store { ) -> StoreResult { let mut conn = self.get_conn().await?; - let updated = DeadLetterMessageRow::batch_update_status( - &mut conn, - &ids, - diesel_models::enums::DeadLetterStatusEnum::Requeued, - resolved_by, - ) - .await - .map_err(Into::>::into)?; + let pending = DeadLetterMessageRow::find_pending_by_ids(&mut conn, &ids) + .await + .map_err(Into::>::into)?; - for row in &updated { - if let Ok(queue) = row.queue.parse::() { - let msg = diesel_models::pgmq::PgmqMessageRowNew { - message: row.message.clone().map(Message), - headers: row.headers.clone().map(Headers), - }; - if let Err(e) = pgmq::send_batch(&mut conn, queue.as_str(), &[msg]).await { - log::error!("Failed to requeue dead letter {}: {:?}", row.id, e); + // Enqueue first, collect IDs of successfully enqueued entries + let mut enqueued_ids = Vec::new(); + for row in &pending { + let queue: PgmqQueue = match row.queue.parse() { + Ok(q) => q, + Err(e) => { + log::error!("Unknown queue for dead letter {}: {e}", row.id); + continue; } + }; + let msg = diesel_models::pgmq::PgmqMessageRowNew { + message: row.message.clone().map(Message), + headers: row.headers.clone().map(Headers), + }; + match pgmq::send_batch(&mut conn, queue.as_str(), &[msg]).await { + Ok(()) => enqueued_ids.push(row.id), + Err(e) => log::error!("Failed to requeue dead letter {}: {:?}", row.id, e), } } - Ok(updated.len() as u32) + // Only mark successfully enqueued entries as requeued + if !enqueued_ids.is_empty() { + DeadLetterMessageRow::batch_update_status( + &mut conn, + &enqueued_ids, + diesel_models::enums::DeadLetterStatusEnum::Requeued, + resolved_by, + ) + .await + .map_err(Into::>::into)?; + } + + Ok(enqueued_ids.len() as u32) } async fn batch_discard_dead_letters( diff --git a/modules/meteroid/src/workers/pgmq/outbox.rs b/modules/meteroid/src/workers/pgmq/outbox.rs index c32e1f26c..ac6b3df5d 100644 --- a/modules/meteroid/src/workers/pgmq/outbox.rs +++ b/modules/meteroid/src/workers/pgmq/outbox.rs @@ -9,7 +9,7 @@ use meteroid_store::domain::outbox_event::{EventType, OutboxEvent, OutboxPgmqHea use meteroid_store::domain::pgmq::{ BiAggregationEvent, BiCreditNoteFinalizedEvent, BiInvoiceFinalizedEvent, HubspotSyncRequestEvent, PennylaneSyncInvoice, PennylaneSyncRequestEvent, PgmqMessage, - PgmqMessageNew, PgmqQueue, QuoteConversionRequestEvent, + PgmqMessageNew, PgmqQueue, QuoteConversionRequestEvent, extract_tenant_id_from_headers, }; use meteroid_store::repositories::pgmq::PgmqInterface; use meteroid_store::{Store, StoreResult}; @@ -39,10 +39,13 @@ impl PgmqOutboxDispatch { .try_into() .ok()?; + let tenant_id = + extract_tenant_id_from_headers(&x.headers.as_ref().map(|h| h.0.clone())); + Some(PgmqMessageNew { message: None, headers: Some(headers), - tenant_id: None, + tenant_id, }) }) .collect(); @@ -171,6 +174,10 @@ impl PgmqOutboxDispatch { continue; } + let tenant_id = extract_tenant_id_from_headers( + &msg.headers.as_ref().map(|h| h.0.clone()), + ); + events.push(PgmqMessageNew { message: None, headers: Some( @@ -179,7 +186,7 @@ impl PgmqOutboxDispatch { } .try_into()?, ), - tenant_id: None, + tenant_id, }); } } diff --git a/modules/meteroid/src/workers/pgmq/processor.rs b/modules/meteroid/src/workers/pgmq/processor.rs index 74502bdf5..dbd3e9e46 100644 --- a/modules/meteroid/src/workers/pgmq/processor.rs +++ b/modules/meteroid/src/workers/pgmq/processor.rs @@ -253,12 +253,8 @@ pub(crate) async fn run_once( .change_context(PgmqError::DeleteMessages)?; } - // Succeeded messages — delete or archive (excluding dead-lettered ones, already handled) - let to_ack: Vec = handle_result - .succeeded - .into_iter() - .filter(|id| !exhausted_ids.contains(&id.0) || succeeded_ids.contains(&id.0)) - .collect(); + // Succeeded messages — delete or archive + let to_ack: Vec = handle_result.succeeded; if !to_ack.is_empty() { if delete_processed { diff --git a/modules/meteroid/src/workers/pgmq/webhook_out.rs b/modules/meteroid/src/workers/pgmq/webhook_out.rs index cec6d656a..bd69e2176 100644 --- a/modules/meteroid/src/workers/pgmq/webhook_out.rs +++ b/modules/meteroid/src/workers/pgmq/webhook_out.rs @@ -14,7 +14,6 @@ use crate::workers::pgmq::outbox::to_outbox_events; use crate::workers::pgmq::processor::{HandleResult, PgmqHandler}; use common_domain::pgmq::MessageId; use error_stack::{Report, ResultExt}; -use futures::future::try_join_all; use meteroid_store::domain::outbox_event::OutboxEvent; use meteroid_store::domain::pgmq::PgmqMessage; use std::sync::Arc; @@ -363,17 +362,16 @@ impl PgmqHandler for WebhookOut { }) .collect(); - let results = try_join_all(tasks) - .await - .change_context(PgmqError::HandleMessages)?; + let results = futures::future::join_all(tasks).await; let mut succeeded = Vec::new(); let mut failed = Vec::new(); - for (msg_id, result) in results { + for result in results { match result { - Ok(id) => succeeded.push(id), - Err(e) => failed.push(HandleResult::fail(msg_id, &e)), + Ok((_, Ok(id))) => succeeded.push(id), + Ok((msg_id, Err(e))) => failed.push(HandleResult::fail(msg_id, &e)), + Err(e) => log::error!("Webhook out task panicked: {e:?}"), } } From 4fc6c089efd3ddf372f96e540193b682fe2a3ffb Mon Sep 17 00:00:00 2001 From: gasp Date: Fri, 3 Apr 2026 10:36:42 +0200 Subject: [PATCH 2/2] fmt --- .../meteroid/crates/diesel-models/src/query/dead_letter.rs | 5 ++++- modules/meteroid/crates/diesel-models/src/query/pgmq.rs | 4 +--- modules/meteroid/src/workers/pgmq/outbox.rs | 5 ++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs b/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs index bdf54f17a..566cc1c74 100644 --- a/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs +++ b/modules/meteroid/crates/diesel-models/src/query/dead_letter.rs @@ -212,7 +212,10 @@ pub async fn search_organizations( use crate::schema::tenant; use diesel_async::RunQueryDsl; - let escaped = query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_"); + let escaped = query + .replace('\\', "\\\\") + .replace('%', "\\%") + .replace('_', "\\_"); let pattern = format!("%{escaped}%"); let orgs: Vec = organization::table diff --git a/modules/meteroid/crates/diesel-models/src/query/pgmq.rs b/modules/meteroid/crates/diesel-models/src/query/pgmq.rs index f2f82e3a5..5112f9dee 100644 --- a/modules/meteroid/crates/diesel-models/src/query/pgmq.rs +++ b/modules/meteroid/crates/diesel-models/src/query/pgmq.rs @@ -11,9 +11,7 @@ pub async fn send_batch( queue: &str, batch: &[PgmqMessageRowNew], ) -> DbResult<()> { - send_batch_returning_ids(conn, queue, batch) - .await - .map(drop) + send_batch_returning_ids(conn, queue, batch).await.map(drop) } pub async fn send_batch_returning_ids( diff --git a/modules/meteroid/src/workers/pgmq/outbox.rs b/modules/meteroid/src/workers/pgmq/outbox.rs index ac6b3df5d..6e1883142 100644 --- a/modules/meteroid/src/workers/pgmq/outbox.rs +++ b/modules/meteroid/src/workers/pgmq/outbox.rs @@ -174,9 +174,8 @@ impl PgmqOutboxDispatch { continue; } - let tenant_id = extract_tenant_id_from_headers( - &msg.headers.as_ref().map(|h| h.0.clone()), - ); + let tenant_id = + extract_tenant_id_from_headers(&msg.headers.as_ref().map(|h| h.0.clone())); events.push(PgmqMessageNew { message: None,