@@ -327,47 +327,39 @@ defmodule Singularity.Workflow.Notifications do
327327 case ensure_queue ( queue_name , repo ) do
328328 :ok ->
329329 # Read messages from PGMQ
330- # PGMQ read_messages signature: read_messages(repo, queue_name, visibility_timeout, limit)
331- case Pgmq . read_messages ( repo , queue_name , visibility_timeout , limit ) do
332- { :ok , messages } when is_list ( messages ) ->
333- formatted_messages =
334- Enum . map ( messages , fn % Pgmq.Message { id: msg_id , body: body } ->
335- decoded_payload =
336- case Jason . decode ( body ) do
337- { :ok , decoded } -> decoded
338- { :error , _ } -> % { "raw" => body }
339- end
340-
341- % {
342- id: Ecto.UUID . generate ( ) ,
343- workflow_id:
344- Map . get ( decoded_payload , "workflow_id" ) ||
345- Map . get ( decoded_payload , :workflow_id ) ||
346- Ecto.UUID . generate ( ) ,
347- queue_name: queue_name ,
348- message_id: Integer . to_string ( msg_id ) ,
349- payload: decoded_payload
350- }
351- end )
352-
353- Logger . debug ( "Received messages from queue" ,
354- queue: queue_name ,
355- count: length ( formatted_messages ) ,
356- limit: limit
357- )
358-
359- { :ok , formatted_messages }
360-
361- { :ok , nil } ->
362- { :ok , [ ] }
363-
364- { :error , reason } ->
365- Logger . error ( "Failed to receive messages from queue" ,
366- queue: queue_name ,
367- error: inspect ( reason )
368- )
369-
370- { :error , reason }
330+ # PGMQ read_messages returns a list directly or nil
331+ messages = Pgmq . read_messages ( repo , queue_name , visibility_timeout , limit )
332+
333+ if is_list ( messages ) and length ( messages ) > 0 do
334+ formatted_messages =
335+ Enum . map ( messages , fn % Pgmq.Message { id: msg_id , body: body } ->
336+ decoded_payload =
337+ case Jason . decode ( body ) do
338+ { :ok , decoded } -> decoded
339+ { :error , _ } -> % { "raw" => body }
340+ end
341+
342+ % {
343+ id: Ecto.UUID . generate ( ) ,
344+ workflow_id:
345+ Map . get ( decoded_payload , "workflow_id" ) ||
346+ Map . get ( decoded_payload , :workflow_id ) ||
347+ Ecto.UUID . generate ( ) ,
348+ queue_name: queue_name ,
349+ message_id: Integer . to_string ( msg_id ) ,
350+ payload: decoded_payload
351+ }
352+ end )
353+
354+ Logger . debug ( "Received messages from queue" ,
355+ queue: queue_name ,
356+ count: length ( formatted_messages ) ,
357+ limit: limit
358+ )
359+
360+ { :ok , formatted_messages }
361+ else
362+ { :ok , [ ] }
371363 end
372364
373365 { :error , reason } ->
@@ -443,24 +435,14 @@ defmodule Singularity.Workflow.Notifications do
443435 end
444436
445437 # Delete the message from PGMQ
446- case Pgmq . delete_messages ( repo , queue_name , [ msg_id_int ] ) do
447- :ok ->
448- Logger . debug ( "Message acknowledged" ,
449- queue: queue_name ,
450- message_id: message_id
451- )
438+ :ok = Pgmq . delete_messages ( repo , queue_name , [ msg_id_int ] )
452439
453- :ok
454-
455- { :error , reason } ->
456- Logger . error ( "Failed to acknowledge message" ,
457- queue: queue_name ,
458- message_id: message_id ,
459- error: inspect ( reason )
460- )
440+ Logger . debug ( "Message acknowledged" ,
441+ queue: queue_name ,
442+ message_id: message_id
443+ )
461444
462- { :error , reason }
463- end
445+ :ok
464446 rescue
465447 error ->
466448 Logger . error ( "Exception while acknowledging message" ,
@@ -486,7 +468,7 @@ defmodule Singularity.Workflow.Notifications do
486468 defp send_pgmq_message ( queue_name , message , repo ) when is_binary ( queue_name ) do
487469 with { :ok , json } <- encode_message ( message ) ,
488470 { :ok , message_id } <- do_send ( queue_name , json , repo ) do
489- { :ok , Integer . to_string ( message_id ) }
471+ { :ok , to_string ( message_id ) }
490472 end
491473 end
492474
@@ -697,9 +679,6 @@ defmodule Singularity.Workflow.Notifications do
697679 end
698680 end
699681
700- defp decode_message_payload ( % { } = msg ) , do: { :ok , msg }
701- defp decode_message_payload ( other ) , do: { :ok , other }
702-
703682 defp cleanup_reply_queue ( false , _queue , _repo ) , do: :ok
704683
705684 defp cleanup_reply_queue ( true , queue , repo ) do
0 commit comments