1515import dev .matheuscruz .infra .ai .data .SimpleMessage ;
1616import io .quarkus .narayana .jta .QuarkusTransaction ;
1717import io .smallrye .mutiny .Multi ;
18+ import io .smallrye .mutiny .Multi ;
1819import jakarta .enterprise .context .ApplicationScoped ;
1920import java .util .ArrayList ;
2021import java .util .List ;
22+ import java .util .ArrayList ;
23+ import java .util .List ;
2124import java .util .Optional ;
2225import org .eclipse .microprofile .reactive .messaging .Incoming ;
2326import org .eclipse .microprofile .reactive .messaging .Message ;
2427import org .eclipse .microprofile .reactive .messaging .Outgoing ;
28+ import org .eclipse .microprofile .reactive .messaging .Incoming ;
29+ import org .eclipse .microprofile .reactive .messaging .Message ;
30+ import org .eclipse .microprofile .reactive .messaging .Outgoing ;
2531import org .jboss .logging .Logger ;
2632import software .amazon .awssdk .services .sqs .SqsClient ;
2733
@@ -49,6 +55,10 @@ public SQS(SqsClient sqs, ObjectMapper objectMapper, TextAiService aiService, Re
4955 this .userRepository = userRepository ;
5056 }
5157
58+ @ Incoming ("whatsapp-incoming" )
59+ @ Outgoing ("whatsapp-recognized" )
60+ public Multi <Message <String >> receiveMessages (Message <String > message ) {
61+ String body = message .getPayload ();
5262 @ Incoming ("whatsapp-incoming" )
5363 @ Outgoing ("whatsapp-recognized" )
5464 public Multi <Message <String >> receiveMessages (Message <String > message ) {
@@ -59,13 +69,31 @@ public Multi<Message<String>> receiveMessages(Message<String> message) {
5969 return Multi .createFrom ().item (message );
6070 }
6171
72+ if (!MessageKind .TEXT .equals (incomingMessage .kind ())) {
73+ return Multi .createFrom ().item (message );
74+ }
75+
6276 Optional <User > user = this .userRepository .findByPhoneNumber (incomingMessage .sender ());
6377
6478 if (user .isEmpty ()) {
6579 logger .error ("User not found." );
6680 return Multi .createFrom ().item (message );
6781 }
6882
83+ return Multi .createFrom ().iterable (handleUserMessage (user .get (), incomingMessage )).map (processedMessage -> {
84+ try {
85+ String processedBody = objectMapper .writeValueAsString (processedMessage );
86+ return Message .of (processedBody ).withAck (() -> message .ack ())
87+ .withNack (throwable -> message .nack (throwable ));
88+ } catch (JsonProcessingException e ) {
89+ logger .error ("Failed to serialize message" , e );
90+ throw new RuntimeException (e );
91+ }
92+ });
93+ logger .error ("User not found." );
94+ return Multi .createFrom ().item (message );
95+ }
96+
6997 return Multi .createFrom ().iterable (handleUserMessage (user .get (), incomingMessage )).map (processedMessage -> {
7098 try {
7199 String processedBody = objectMapper .writeValueAsString (processedMessage );
@@ -87,7 +115,7 @@ private List<Object> handleUserMessage(User user, IncomingMessage message) {
87115 for (RecognizedOperation recognizedOperation : allRecognizedOperations .all ()) {
88116 switch (recognizedOperation .operation ()) {
89117 case AiOperations .ADD_TRANSACTION ->
90- results .add (processAddTransactionMessage (user , message , recognizedOperation ));
118+ results .add (processAddTransactionMessage (user , message , recognizedOperation ));
91119 case AiOperations .GET_BALANCE -> {
92120 logger .info ("Processing GET_BALANCE operation" + recognizedOperation .recognizedTransaction ());
93121 results .add (processSimpleMessage (user , message , recognizedOperation ));
0 commit comments