Skip to content

Commit 9082405

Browse files
committed
feat: Implement outbox pattern with SQS integration for message processing
1 parent 6e1c57b commit 9082405

6 files changed

Lines changed: 253 additions & 56 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package dev.matheuscruz.infra.outbox;
2+
3+
import jakarta.persistence.Column;
4+
import jakarta.persistence.Entity;
5+
import jakarta.persistence.EnumType;
6+
import jakarta.persistence.Enumerated;
7+
import jakarta.persistence.GeneratedValue;
8+
import jakarta.persistence.GenerationType;
9+
import jakarta.persistence.Id;
10+
import jakarta.persistence.Table;
11+
import java.time.Instant;
12+
import java.util.UUID;
13+
14+
@Entity
15+
@Table(name = "outbox_messages")
16+
public class OutboxMessage {
17+
18+
@Id
19+
@GeneratedValue(strategy = GenerationType.UUID)
20+
private UUID id;
21+
22+
@Column(nullable = false, columnDefinition = "TEXT")
23+
private String payload;
24+
25+
@Column(name = "queue_url", nullable = false)
26+
private String queueUrl;
27+
28+
@Column(name = "message_group_id", nullable = false)
29+
private String messageGroupId;
30+
31+
@Enumerated(EnumType.STRING)
32+
@Column(nullable = false)
33+
private OutboxStatus status;
34+
35+
@Column(name = "created_at", nullable = false)
36+
private Instant createdAt;
37+
38+
@Column(name = "processed_at")
39+
private Instant processedAt;
40+
41+
@Column(name = "retry_count", nullable = false)
42+
private int retryCount;
43+
44+
protected OutboxMessage() {
45+
}
46+
47+
public OutboxMessage(String payload, String queueUrl, String messageGroupId) {
48+
this.payload = payload;
49+
this.queueUrl = queueUrl;
50+
this.messageGroupId = messageGroupId;
51+
this.status = OutboxStatus.PENDING;
52+
this.createdAt = Instant.now();
53+
this.retryCount = 0;
54+
}
55+
56+
public UUID getId() {
57+
return id;
58+
}
59+
60+
public String getPayload() {
61+
return payload;
62+
}
63+
64+
public String getQueueUrl() {
65+
return queueUrl;
66+
}
67+
68+
public String getMessageGroupId() {
69+
return messageGroupId;
70+
}
71+
72+
public OutboxStatus getStatus() {
73+
return status;
74+
}
75+
76+
public Instant getCreatedAt() {
77+
return createdAt;
78+
}
79+
80+
public Instant getProcessedAt() {
81+
return processedAt;
82+
}
83+
84+
public int getRetryCount() {
85+
return retryCount;
86+
}
87+
88+
public void markAsSent() {
89+
this.status = OutboxStatus.SENT;
90+
this.processedAt = Instant.now();
91+
}
92+
93+
public void markAsFailed() {
94+
this.status = OutboxStatus.FAILED;
95+
this.processedAt = Instant.now();
96+
}
97+
98+
public void incrementRetryCount() {
99+
this.retryCount++;
100+
}
101+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package dev.matheuscruz.infra.outbox;
2+
3+
import io.quarkus.narayana.jta.QuarkusTransaction;
4+
import io.quarkus.scheduler.Scheduled;
5+
import jakarta.enterprise.context.ApplicationScoped;
6+
import java.util.List;
7+
import org.jboss.logging.Logger;
8+
import software.amazon.awssdk.services.sqs.SqsClient;
9+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
10+
11+
@ApplicationScoped
12+
public class OutboxMessageRelay {
13+
14+
private static final int MAX_RETRIES = 10;
15+
16+
private final OutboxMessageRepository outboxMessageRepository;
17+
private final SqsClient sqsClient;
18+
private final Logger logger = Logger.getLogger(OutboxMessageRelay.class);
19+
20+
public OutboxMessageRelay(OutboxMessageRepository outboxMessageRepository, SqsClient sqsClient) {
21+
this.outboxMessageRepository = outboxMessageRepository;
22+
this.sqsClient = sqsClient;
23+
}
24+
25+
@Scheduled(every = "5s")
26+
public void processOutbox() {
27+
List<OutboxMessage> pending = outboxMessageRepository.findPendingMessages();
28+
29+
for (OutboxMessage message : pending) {
30+
if (message.getRetryCount() >= MAX_RETRIES) {
31+
failMessage(message);
32+
continue;
33+
}
34+
35+
try {
36+
sqsClient.sendMessage(SendMessageRequest.builder().queueUrl(message.getQueueUrl())
37+
.messageBody(message.getPayload()).messageGroupId(message.getMessageGroupId()).build());
38+
39+
updateMessageStatus(message, true);
40+
} catch (Exception e) {
41+
logger.errorf(e, "Failed to send outbox message %s to queue %s", message.getId(),
42+
message.getQueueUrl());
43+
updateMessageStatus(message, false);
44+
}
45+
}
46+
}
47+
48+
private void updateMessageStatus(OutboxMessage message, boolean success) {
49+
QuarkusTransaction.requiringNew().run(() -> {
50+
OutboxMessage managed = outboxMessageRepository.findById(message.getId());
51+
if (managed != null) {
52+
if (success) {
53+
managed.markAsSent();
54+
} else {
55+
managed.incrementRetryCount();
56+
}
57+
}
58+
});
59+
}
60+
61+
private void failMessage(OutboxMessage message) {
62+
QuarkusTransaction.requiringNew().run(() -> {
63+
OutboxMessage managed = outboxMessageRepository.findById(message.getId());
64+
if (managed != null) {
65+
managed.markAsFailed();
66+
logger.errorf("Outbox message %s has exceeded max retries, marking as FAILED", message.getId());
67+
}
68+
});
69+
}
70+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package dev.matheuscruz.infra.outbox;
2+
3+
import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
4+
import jakarta.enterprise.context.ApplicationScoped;
5+
import java.util.List;
6+
import java.util.UUID;
7+
8+
@ApplicationScoped
9+
public class OutboxMessageRepository implements PanacheRepositoryBase<OutboxMessage, UUID> {
10+
11+
private static final int MAX_RETRIES = 10;
12+
private static final int BATCH_SIZE = 20;
13+
14+
public List<OutboxMessage> findPendingMessages() {
15+
return find("status = ?1 and retryCount < ?2 order by createdAt", OutboxStatus.PENDING, MAX_RETRIES)
16+
.range(0, BATCH_SIZE - 1).list();
17+
}
18+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package dev.matheuscruz.infra.outbox;
2+
3+
public enum OutboxStatus {
4+
PENDING, SENT, FAILED
5+
}

timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java

Lines changed: 59 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,120 +13,126 @@
1313
import dev.matheuscruz.infra.ai.data.RecognizedOperation;
1414
import dev.matheuscruz.infra.ai.data.RecognizedTransaction;
1515
import dev.matheuscruz.infra.ai.data.SimpleMessage;
16+
import dev.matheuscruz.infra.outbox.OutboxMessage;
17+
import dev.matheuscruz.infra.outbox.OutboxMessageRepository;
1618
import io.quarkus.narayana.jta.QuarkusTransaction;
17-
import io.smallrye.mutiny.Multi;
1819
import jakarta.enterprise.context.ApplicationScoped;
19-
import java.util.ArrayList;
20-
import java.util.List;
2120
import java.util.Optional;
21+
import java.util.concurrent.CompletionStage;
22+
import org.eclipse.microprofile.config.inject.ConfigProperty;
2223
import org.eclipse.microprofile.reactive.messaging.Incoming;
2324
import org.eclipse.microprofile.reactive.messaging.Message;
24-
import org.eclipse.microprofile.reactive.messaging.Outgoing;
2525
import org.jboss.logging.Logger;
26-
import software.amazon.awssdk.services.sqs.SqsClient;
2726

2827
@ApplicationScoped
2928
public class SQS {
3029

31-
final SqsClient sqs;
3230
final ObjectMapper objectMapper;
3331
final TextAiService aiService;
3432
final RecordRepository recordRepository;
3533
final UserRepository userRepository;
34+
final OutboxMessageRepository outboxMessageRepository;
35+
final String recognizedQueueUrl;
3636
final Logger logger = Logger.getLogger(SQS.class);
3737

3838
private static final ObjectReader INCOMING_MESSAGE_READER = new ObjectMapper().readerFor(IncomingMessage.class);
3939

40-
private static final ObjectReader AI_RESPONSE_READER = new ObjectMapper().readerFor(RecognizedOperation.class);
40+
public SQS(ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository,
41+
UserRepository userRepository, OutboxMessageRepository outboxMessageRepository,
42+
@ConfigProperty(name = "whatsapp.recognized-message.queue-url") String recognizedQueueUrl) {
4143

42-
public SQS(SqsClient sqs, ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository,
43-
UserRepository userRepository) {
44-
45-
this.sqs = sqs;
4644
this.objectMapper = objectMapper;
4745
this.aiService = aiService;
4846
this.recordRepository = recordRepository;
4947
this.userRepository = userRepository;
48+
this.outboxMessageRepository = outboxMessageRepository;
49+
this.recognizedQueueUrl = recognizedQueueUrl;
5050
}
5151

5252
@Incoming("whatsapp-incoming")
53-
@Outgoing("whatsapp-recognized")
54-
public Multi<Message<String>> receiveMessages(Message<String> message) {
53+
public CompletionStage<Void> receiveMessages(Message<String> message) {
5554
String body = message.getPayload();
5655
IncomingMessage incomingMessage = parseIncomingMessage(body);
5756

5857
if (!MessageKind.TEXT.equals(incomingMessage.kind())) {
59-
return Multi.createFrom().item(message);
58+
return message.ack();
6059
}
6160

6261
Optional<User> user = this.userRepository.findByPhoneNumber(incomingMessage.sender());
6362

6463
if (user.isEmpty()) {
6564
logger.error("User not found.");
66-
return Multi.createFrom().empty();
65+
return message.nack(new RuntimeException("User not found for phone: " + incomingMessage.sender()));
6766
}
6867

69-
return Multi.createFrom().iterable(handleUserMessage(user.get(), incomingMessage)).map(processedMessage -> {
70-
try {
71-
String processedBody = objectMapper.writeValueAsString(processedMessage);
72-
return Message.of(processedBody).withAck(() -> message.ack())
73-
.withNack(throwable -> message.nack(throwable));
74-
} catch (JsonProcessingException e) {
75-
logger.error("Failed to serialize message", e);
76-
throw new RuntimeException(e);
77-
}
78-
});
68+
try {
69+
handleUserMessage(user.get(), incomingMessage);
70+
return message.ack();
71+
} catch (Exception e) {
72+
logger.error("Failed to process message: " + incomingMessage.messageId(), e);
73+
return message.nack(e);
74+
}
7975
}
8076

81-
private List<Object> handleUserMessage(User user, IncomingMessage message) {
82-
List<Object> results = new ArrayList<>();
83-
try {
84-
AllRecognizedOperations allRecognizedOperations = aiService.handleMessage(message.messageBody(),
85-
user.getId());
86-
87-
for (RecognizedOperation recognizedOperation : allRecognizedOperations.all()) {
88-
switch (recognizedOperation.operation()) {
89-
case AiOperations.ADD_TRANSACTION ->
90-
results.add(processAddTransactionMessage(user, message, recognizedOperation));
91-
case AiOperations.GET_BALANCE -> {
92-
logger.info("Processing GET_BALANCE operation" + recognizedOperation.recognizedTransaction());
93-
results.add(processSimpleMessage(user, message, recognizedOperation));
94-
}
95-
default -> logger.warnf("Unknown operation type: %s", recognizedOperation.operation());
77+
private void handleUserMessage(User user, IncomingMessage message) {
78+
AllRecognizedOperations allRecognizedOperations = aiService.handleMessage(message.messageBody(), user.getId());
79+
80+
for (RecognizedOperation recognizedOperation : allRecognizedOperations.all()) {
81+
switch (recognizedOperation.operation()) {
82+
case AiOperations.ADD_TRANSACTION -> processAddTransactionMessage(user, message, recognizedOperation);
83+
case AiOperations.GET_BALANCE -> {
84+
logger.info("Processing GET_BALANCE operation" + recognizedOperation.recognizedTransaction());
85+
processSimpleMessage(user, message, recognizedOperation);
9686
}
87+
default -> logger.warnf("Unknown operation type: %s", recognizedOperation.operation());
9788
}
98-
} catch (Exception e) {
99-
logger.error("Failed to process message: " + message.messageId(), e);
10089
}
101-
return results;
10290
}
10391

104-
private TransactionMessageProcessed processAddTransactionMessage(User user, IncomingMessage message,
92+
private void processAddTransactionMessage(User user, IncomingMessage message,
10593
RecognizedOperation recognizedOperation) {
10694
RecognizedTransaction recognizedTransaction = recognizedOperation.recognizedTransaction();
10795

10896
Record record = new Record.Builder().userId(user.getId()).amount(recognizedTransaction.amount())
10997
.description(recognizedTransaction.description()).transaction(recognizedTransaction.type())
11098
.category(recognizedTransaction.category()).build();
11199

112-
QuarkusTransaction.requiringNew().run(() -> recordRepository.persist(record));
100+
TransactionMessageProcessed processed = new TransactionMessageProcessed(
101+
AiOperations.ADD_TRANSACTION.commandName(), message.messageId(), MessageStatus.PROCESSED,
102+
user.getPhoneNumber(), recognizedTransaction.withError(), recognizedTransaction);
113103

114-
logger.infof("Message %s processed as ADD_TRANSACTION", message.messageId());
104+
OutboxMessage outboxMessage = new OutboxMessage(serialize(processed), recognizedQueueUrl,
105+
user.getPhoneNumber());
115106

116-
return new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(), message.messageId(),
117-
MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(),
118-
recognizedTransaction);
107+
QuarkusTransaction.requiringNew().run(() -> {
108+
recordRepository.persist(record);
109+
outboxMessageRepository.persist(outboxMessage);
110+
});
111+
112+
logger.infof("Message %s processed as ADD_TRANSACTION", message.messageId());
119113
}
120114

121-
private SimpleMessageProcessed processSimpleMessage(User user, IncomingMessage message,
122-
RecognizedOperation recognizedOperation) {
115+
private void processSimpleMessage(User user, IncomingMessage message, RecognizedOperation recognizedOperation) {
123116
logger.infof("Processing simple message for user %s", recognizedOperation.recognizedTransaction());
124117
SimpleMessage response = new SimpleMessage(recognizedOperation.recognizedTransaction().description());
125118

119+
SimpleMessageProcessed processed = new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(),
120+
message.messageId(), MessageStatus.PROCESSED, user.getPhoneNumber(), response);
121+
122+
OutboxMessage outboxMessage = new OutboxMessage(serialize(processed), recognizedQueueUrl,
123+
user.getPhoneNumber());
124+
125+
QuarkusTransaction.requiringNew().run(() -> outboxMessageRepository.persist(outboxMessage));
126+
126127
logger.infof("Message %s processed as GET_BALANCE", message.messageId());
128+
}
127129

128-
return new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(),
129-
MessageStatus.PROCESSED, user.getPhoneNumber(), response);
130+
private String serialize(Object message) {
131+
try {
132+
return objectMapper.writeValueAsString(message);
133+
} catch (JsonProcessingException e) {
134+
throw new RuntimeException("Failed to serialize message", e);
135+
}
130136
}
131137

132138
private IncomingMessage parseIncomingMessage(String messageBody) {

timeless-api/src/main/resources/application.properties

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ mp.messaging.incoming.whatsapp-incoming.connector=smallrye-sqs
1010
mp.messaging.incoming.whatsapp-incoming.queue=${INCOMING_MESSAGE_FIFO_URL}
1111
mp.messaging.incoming.whatsapp-incoming.visibility-timeout=30
1212

13-
mp.messaging.outgoing.whatsapp-recognized.connector=smallrye-sqs
14-
mp.messaging.outgoing.whatsapp-recognized.queue=${RECOGNIZED_MESSAGE_FIFO_URL}
15-
1613
# aws sqs
1714
quarkus.sqs.devservices.enabled=false
1815

0 commit comments

Comments
 (0)