1414import dev .matheuscruz .infra .ai .data .RecognizedTransaction ;
1515import dev .matheuscruz .infra .ai .data .SimpleMessage ;
1616import io .quarkus .narayana .jta .QuarkusTransaction ;
17- import io .quarkus . scheduler . Scheduled ;
17+ import io .smallrye . mutiny . Multi ;
1818import jakarta .enterprise .context .ApplicationScoped ;
19- import java .io .IOException ;
19+ import java .util .ArrayList ;
20+ import java .util .List ;
2021import java .util .Optional ;
21- import java .util .UUID ;
22- import org .eclipse .microprofile .config .inject .ConfigProperty ;
22+ import org .eclipse .microprofile .reactive .messaging .Incoming ;
23+ import org .eclipse .microprofile .reactive .messaging .Message ;
24+ import org .eclipse .microprofile .reactive .messaging .Outgoing ;
2325import org .jboss .logging .Logger ;
2426import software .amazon .awssdk .services .sqs .SqsClient ;
2527
2628@ ApplicationScoped
2729public class SQS {
2830
29- final String incomingMessagesUrl ;
30- final String processedMessagesUrl ;
3131 final SqsClient sqs ;
3232 final ObjectMapper objectMapper ;
3333 final TextAiService aiService ;
@@ -39,96 +39,94 @@ public class SQS {
3939
4040 private static final ObjectReader AI_RESPONSE_READER = new ObjectMapper ().readerFor (RecognizedOperation .class );
4141
42- public SQS (SqsClient sqs , @ ConfigProperty (name = "whatsapp.incoming-message.queue-url" ) String incomingMessagesUrl ,
43- @ ConfigProperty (name = "whatsapp.recognized-message.queue-url" ) String messagesProcessedUrl ,
44- ObjectMapper objectMapper , TextAiService aiService , RecordRepository recordRepository ,
42+ public SQS (SqsClient sqs , ObjectMapper objectMapper , TextAiService aiService , RecordRepository recordRepository ,
4543 UserRepository userRepository ) {
4644
4745 this .sqs = sqs ;
48- this .incomingMessagesUrl = incomingMessagesUrl ;
49- this .processedMessagesUrl = messagesProcessedUrl ;
5046 this .objectMapper = objectMapper ;
5147 this .aiService = aiService ;
5248 this .recordRepository = recordRepository ;
5349 this .userRepository = userRepository ;
5450 }
5551
56- @ Scheduled (every = "5s" )
57- public void receiveMessages () {
58- sqs .receiveMessage (req -> req .maxNumberOfMessages (10 ).queueUrl (incomingMessagesUrl )).messages ()
59- .forEach (message -> processMessage (message .body (), message .receiptHandle ()));
60- }
61-
62- private void processMessage (String body , String receiptHandle ) {
52+ @ Incoming ("whatsapp-incoming" )
53+ @ Outgoing ("whatsapp-recognized" )
54+ public Multi <Message <String >> receiveMessages (Message <String > message ) {
55+ String body = message .getPayload ();
6356 IncomingMessage incomingMessage = parseIncomingMessage (body );
64- if (!MessageKind .TEXT .equals (incomingMessage .kind ()))
65- return ;
57+
58+ if (!MessageKind .TEXT .equals (incomingMessage .kind ())) {
59+ return Multi .createFrom ().item (message );
60+ }
6661
6762 Optional <User > user = this .userRepository .findByPhoneNumber (incomingMessage .sender ());
6863
6964 if (user .isEmpty ()) {
70- logger .error ("User not found. Deleting message from queue." );
71- deleteMessageUsing (receiptHandle );
72- return ;
65+ logger .error ("User not found." );
66+ return Multi .createFrom ().item (message );
7367 }
7468
75- handleUserMessage (user .get (), incomingMessage , receiptHandle );
69+ return Multi .createFrom ().iterable (handleUserMessage (user .get (), incomingMessage )).map (processedMessage -> {
70+ try {
71+ String processedBody = objectMapper .writeValueAsString (processedMessage );
72+ return Message .of (processedBody ).withAck (() -> message .ack ())
73+ .withNack (throwable -> message .nack (throwable ));
74+ } catch (JsonProcessingException e ) {
75+ logger .error ("Failed to serialize message" , e );
76+ throw new RuntimeException (e );
77+ }
78+ });
7679 }
7780
78- private void handleUserMessage (User user , IncomingMessage message , String receiptHandle ) {
81+ private List <Object > handleUserMessage (User user , IncomingMessage message ) {
82+ List <Object > results = new ArrayList <>();
7983 try {
8084 AllRecognizedOperations allRecognizedOperations = aiService .handleMessage (message .messageBody (),
8185 user .getId ());
8286
8387 for (RecognizedOperation recognizedOperation : allRecognizedOperations .all ()) {
8488 switch (recognizedOperation .operation ()) {
8589 case AiOperations .ADD_TRANSACTION ->
86- processAddTransactionMessage (user , message , receiptHandle , recognizedOperation );
90+ results . add ( processAddTransactionMessage (user , message , recognizedOperation ) );
8791 case AiOperations .GET_BALANCE -> {
8892 logger .info ("Processing GET_BALANCE operation" + recognizedOperation .recognizedTransaction ());
89- processSimpleMessage (user , message , receiptHandle , recognizedOperation );
93+ results . add ( processSimpleMessage (user , message , recognizedOperation ) );
9094 }
9195 default -> logger .warnf ("Unknown operation type: %s" , recognizedOperation .operation ());
9296 }
9397 }
94-
9598 } catch (Exception e ) {
9699 logger .error ("Failed to process message: " + message .messageId (), e );
97100 }
101+ return results ;
98102 }
99103
100- private void processAddTransactionMessage (User user , IncomingMessage message , String receiptHandle ,
101- RecognizedOperation recognizedOperation ) throws IOException {
104+ private TransactionMessageProcessed processAddTransactionMessage (User user , IncomingMessage message ,
105+ RecognizedOperation recognizedOperation ) {
102106 RecognizedTransaction recognizedTransaction = recognizedOperation .recognizedTransaction ();
103- sendProcessedMessage (new TransactionMessageProcessed (AiOperations .ADD_TRANSACTION .commandName (),
104- message .messageId (), MessageStatus .PROCESSED , user .getPhoneNumber (), recognizedTransaction .withError (),
105- recognizedTransaction ));
106107
107108 Record record = new Record .Builder ().userId (user .getId ()).amount (recognizedTransaction .amount ())
108109 .description (recognizedTransaction .description ()).transaction (recognizedTransaction .type ())
109110 .category (recognizedTransaction .category ()).build ();
110111
111112 QuarkusTransaction .requiringNew ().run (() -> recordRepository .persist (record ));
112113
113- deleteMessageUsing (receiptHandle );
114-
115114 logger .infof ("Message %s processed as ADD_TRANSACTION" , message .messageId ());
115+
116+ return new TransactionMessageProcessed (AiOperations .ADD_TRANSACTION .commandName (), message .messageId (),
117+ MessageStatus .PROCESSED , user .getPhoneNumber (), recognizedTransaction .withError (),
118+ recognizedTransaction );
116119 }
117120
118- private void processSimpleMessage (User user , IncomingMessage message , String receiptHandle ,
119- RecognizedOperation recognizedOperation ) throws IOException {
121+ private SimpleMessageProcessed processSimpleMessage (User user , IncomingMessage message ,
122+ RecognizedOperation recognizedOperation ) {
120123 logger .infof ("Processing simple message for user %s" , recognizedOperation .recognizedTransaction ());
121124 SimpleMessage response = new SimpleMessage (recognizedOperation .recognizedTransaction ().description ());
122- sendProcessedMessage (new SimpleMessageProcessed (AiOperations .GET_BALANCE .commandName (), message .messageId (),
123- MessageStatus .PROCESSED , user .getPhoneNumber (), response ));
124- deleteMessageUsing (receiptHandle );
125+
125126 logger .infof ("Message %s processed as GET_BALANCE" , message .messageId ());
126- }
127127
128- private void sendProcessedMessage (Object processedMessage ) throws JsonProcessingException {
129- String messageBody = objectMapper .writeValueAsString (processedMessage );
130- sqs .sendMessage (req -> req .messageBody (messageBody ).messageGroupId ("ProcessedMessages" )
131- .messageDeduplicationId (UUID .randomUUID ().toString ()).queueUrl (processedMessagesUrl ));
128+ return new SimpleMessageProcessed (AiOperations .GET_BALANCE .commandName (), message .messageId (),
129+ MessageStatus .PROCESSED , user .getPhoneNumber (), response );
132130 }
133131
134132 private IncomingMessage parseIncomingMessage (String messageBody ) {
@@ -139,10 +137,6 @@ private IncomingMessage parseIncomingMessage(String messageBody) {
139137 }
140138 }
141139
142- private void deleteMessageUsing (String receiptHandle ) {
143- sqs .deleteMessage (req -> req .queueUrl (incomingMessagesUrl ).receiptHandle (receiptHandle ));
144- }
145-
146140 public record TransactionMessageProcessed (String kind , String messageId , MessageStatus status , String user ,
147141 Boolean withError , RecognizedTransaction content ) {
148142 }
0 commit comments