Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion modules/meteroid/crates/diesel-models/src/query/dead_letter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ impl DeadLetterMessageRow {
.into_db_result()
}

pub async fn find_pending_by_ids(
conn: &mut PgConn,
ids: &[Uuid],
) -> DbResult<Vec<DeadLetterMessageRow>> {
dead_letter_message::table
.filter(dead_letter_message::id.eq_any(ids))
.filter(dead_letter_message::status.eq(DeadLetterStatusEnum::Pending))
.get_results(conn)
.await
.attach("Failed to fetch pending dead letter messages")
.into_db_result()
}

pub async fn batch_update_status(
conn: &mut PgConn,
ids: &[Uuid],
Expand Down Expand Up @@ -199,7 +212,11 @@ pub async fn search_organizations(
use crate::schema::tenant;
use diesel_async::RunQueryDsl;

let pattern = format!("%{query}%");
let escaped = query
.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_");
let pattern = format!("%{escaped}%");

let orgs: Vec<OrganizationRow> = organization::table
.filter(
Expand Down
18 changes: 16 additions & 2 deletions modules/meteroid/crates/diesel-models/src/query/pgmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ pub async fn send_batch(
queue: &str,
batch: &[PgmqMessageRowNew],
) -> DbResult<()> {
send_batch_returning_ids(conn, queue, batch).await.map(drop)
}

pub async fn send_batch_returning_ids(
conn: &mut PgConn,
queue: &str,
batch: &[PgmqMessageRowNew],
) -> DbResult<Vec<i64>> {
#[derive(diesel::QueryableByName)]
struct SendResult {
#[diesel(sql_type = sql_types::BigInt)]
msg_id: i64,
}

let raw_query = r"SELECT * from pgmq.send_batch($1, $2, $3) as msg_id";

let (messages, headers): (Vec<_>, Vec<_>) =
Expand All @@ -20,9 +34,9 @@ pub async fn send_batch(
.bind::<sql_types::Text, _>(queue)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Jsonb>>, _>(messages)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Jsonb>>, _>(headers)
.execute(conn)
.get_results::<SendResult>(conn)
.await
.map(drop)
.map(|rows| rows.into_iter().map(|r| r.msg_id).collect())
.attach("Error while sending batch of messages to pgmq")
.into_db_result()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,18 @@ impl DeadLetterInterface for Store {
headers: entry.headers.map(Headers),
};

pgmq::send_batch(&mut conn, queue.as_str(), &[msg])
let new_ids = pgmq::send_batch_returning_ids(&mut conn, queue.as_str(), &[msg])
.await
.map_err(Into::<Report<StoreError>>::into)?;

let requeued_pgmq_msg_id = new_ids.into_iter().next();

DeadLetterMessageRow::update_status(
&mut conn,
id,
diesel_models::enums::DeadLetterStatusEnum::Requeued,
resolved_by,
None,
requeued_pgmq_msg_id,
)
.await
.map(Into::into)
Expand Down Expand Up @@ -213,28 +215,43 @@ impl DeadLetterInterface for Store {
) -> StoreResult<u32> {
let mut conn = self.get_conn().await?;

let updated = DeadLetterMessageRow::batch_update_status(
&mut conn,
&ids,
diesel_models::enums::DeadLetterStatusEnum::Requeued,
resolved_by,
)
.await
.map_err(Into::<Report<StoreError>>::into)?;
let pending = DeadLetterMessageRow::find_pending_by_ids(&mut conn, &ids)
.await
.map_err(Into::<Report<StoreError>>::into)?;

for row in &updated {
if let Ok(queue) = row.queue.parse::<PgmqQueue>() {
let msg = diesel_models::pgmq::PgmqMessageRowNew {
message: row.message.clone().map(Message),
headers: row.headers.clone().map(Headers),
};
if let Err(e) = pgmq::send_batch(&mut conn, queue.as_str(), &[msg]).await {
log::error!("Failed to requeue dead letter {}: {:?}", row.id, e);
// Enqueue first, collect IDs of successfully enqueued entries
let mut enqueued_ids = Vec::new();
for row in &pending {
let queue: PgmqQueue = match row.queue.parse() {
Ok(q) => q,
Err(e) => {
log::error!("Unknown queue for dead letter {}: {e}", row.id);
continue;
}
};
let msg = diesel_models::pgmq::PgmqMessageRowNew {
message: row.message.clone().map(Message),
headers: row.headers.clone().map(Headers),
};
match pgmq::send_batch(&mut conn, queue.as_str(), &[msg]).await {
Ok(()) => enqueued_ids.push(row.id),
Err(e) => log::error!("Failed to requeue dead letter {}: {:?}", row.id, e),
}
}

Ok(updated.len() as u32)
// Only mark successfully enqueued entries as requeued
if !enqueued_ids.is_empty() {
DeadLetterMessageRow::batch_update_status(
&mut conn,
&enqueued_ids,
diesel_models::enums::DeadLetterStatusEnum::Requeued,
resolved_by,
)
.await
.map_err(Into::<Report<StoreError>>::into)?;
}

Ok(enqueued_ids.len() as u32)
}

async fn batch_discard_dead_letters(
Expand Down
12 changes: 9 additions & 3 deletions modules/meteroid/src/workers/pgmq/outbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use meteroid_store::domain::outbox_event::{EventType, OutboxEvent, OutboxPgmqHea
use meteroid_store::domain::pgmq::{
BiAggregationEvent, BiCreditNoteFinalizedEvent, BiInvoiceFinalizedEvent,
HubspotSyncRequestEvent, PennylaneSyncInvoice, PennylaneSyncRequestEvent, PgmqMessage,
PgmqMessageNew, PgmqQueue, QuoteConversionRequestEvent,
PgmqMessageNew, PgmqQueue, QuoteConversionRequestEvent, extract_tenant_id_from_headers,
};
use meteroid_store::repositories::pgmq::PgmqInterface;
use meteroid_store::{Store, StoreResult};
Expand Down Expand Up @@ -39,10 +39,13 @@ impl PgmqOutboxDispatch {
.try_into()
.ok()?;

let tenant_id =
extract_tenant_id_from_headers(&x.headers.as_ref().map(|h| h.0.clone()));

Some(PgmqMessageNew {
message: None,
headers: Some(headers),
tenant_id: None,
tenant_id,
})
})
.collect();
Expand Down Expand Up @@ -171,6 +174,9 @@ impl PgmqOutboxDispatch {
continue;
}

let tenant_id =
extract_tenant_id_from_headers(&msg.headers.as_ref().map(|h| h.0.clone()));

events.push(PgmqMessageNew {
message: None,
headers: Some(
Expand All @@ -179,7 +185,7 @@ impl PgmqOutboxDispatch {
}
.try_into()?,
),
tenant_id: None,
tenant_id,
});
}
}
Expand Down
8 changes: 2 additions & 6 deletions modules/meteroid/src/workers/pgmq/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,8 @@ pub(crate) async fn run_once(
.change_context(PgmqError::DeleteMessages)?;
}

// Succeeded messages — delete or archive (excluding dead-lettered ones, already handled)
let to_ack: Vec<MessageId> = handle_result
.succeeded
.into_iter()
.filter(|id| !exhausted_ids.contains(&id.0) || succeeded_ids.contains(&id.0))
.collect();
// Succeeded messages — delete or archive
let to_ack: Vec<MessageId> = handle_result.succeeded;

if !to_ack.is_empty() {
if delete_processed {
Expand Down
12 changes: 5 additions & 7 deletions modules/meteroid/src/workers/pgmq/webhook_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::workers::pgmq::outbox::to_outbox_events;
use crate::workers::pgmq::processor::{HandleResult, PgmqHandler};
use common_domain::pgmq::MessageId;
use error_stack::{Report, ResultExt};
use futures::future::try_join_all;
use meteroid_store::domain::outbox_event::OutboxEvent;
use meteroid_store::domain::pgmq::PgmqMessage;
use std::sync::Arc;
Expand Down Expand Up @@ -363,17 +362,16 @@ impl PgmqHandler for WebhookOut {
})
.collect();

let results = try_join_all(tasks)
.await
.change_context(PgmqError::HandleMessages)?;
let results = futures::future::join_all(tasks).await;

let mut succeeded = Vec::new();
let mut failed = Vec::new();

for (msg_id, result) in results {
for result in results {
match result {
Ok(id) => succeeded.push(id),
Err(e) => failed.push(HandleResult::fail(msg_id, &e)),
Ok((_, Ok(id))) => succeeded.push(id),
Ok((msg_id, Err(e))) => failed.push(HandleResult::fail(msg_id, &e)),
Err(e) => log::error!("Webhook out task panicked: {e:?}"),
}
}

Expand Down
Loading