Skip to content

Commit 9fec46e

Browse files
authored
check duplicate message before enqueue (#260)
* check duplicate message before enqueue * add duplicate check in unique in as well * add copyright * update changelog file * version update * doc update * reuse the method * fix: duplicate message test * wip * throw error when duplicate message is saved * test compilation issue * fix: misuse of mockito argument checker * increase timeout * test: revert test properties * fix: the enque in should not use push periodic message * delete test data * fix: do not use system redis * do not retry test * fix: reactive message enqueue * chore: change BiFunction to Function * chore: update changelog * chore: update changelog
1 parent 251b3fa commit 9fec46e

File tree

24 files changed

+394
-350
lines changed

24 files changed

+394
-350
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ well, where all services code is in Spring.
6262

6363
### Dependency
6464

65-
Snapshot
66-
Version: [https://s01.oss.sonatype.org/content/repositories/snapshots/com/github/sonus21/](https://s01.oss.sonatype.org/content/repositories/snapshots/com/github/sonus21/)
67-
<br/>Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.sonus21)
65+
Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.sonus21)
6866

6967
#### Spring Boot
7068

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ ext {
6969

7070
subprojects {
7171
group = "com.github.sonus21"
72-
version = "3.3.0-RELEASE"
72+
version = "3.4.0-RELEASE"
7373

7474
dependencies {
7575
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

docs/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ layout: default
88

99
All notable user-facing changes to this project are documented in this file.
1010

11+
## Release [3.4.0] 22-July-2025
12+
### Fixes
13+
* Fixed unique enqueue message to reject the message upfront instead of identifying it later #259
14+
15+
16+
## Release [3.3.0] 29-June-2025
17+
### Fixes
18+
# Custom Message Converter was ignored #256
19+
# LockKey prefix was not used #239
20+
1121
## Release [3.2.0] 10-July-2024
1222
### Fixes
1323
* Fixed typo #218

publish.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env bash
22
#
3-
# Copyright (c) 2022-2023 Sonu Kumar
3+
# Copyright (c) 2022-2025 Sonu Kumar
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# You may not use this file except in compliance with the License.
@@ -15,6 +15,6 @@
1515
#
1616
#
1717

18-
./gradlew rqueue-core:publish
19-
./gradlew rqueue-spring:publish
20-
./gradlew rqueue-spring-boot-starter:publish
18+
./gradlew rqueue-core:publishToMavenCentral
19+
./gradlew rqueue-spring:publishToMavenCentral
20+
./gradlew rqueue-spring-boot-starter:publishToMavenCentral

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020-2023 Sonu Kumar
2+
* Copyright (c) 2020-2025 Sonu Kumar
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* You may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
3030
import com.github.sonus21.rqueue.core.impl.MessageSweeper.MessageDeleteRequest;
3131
import com.github.sonus21.rqueue.dao.RqueueStringDao;
32+
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
3233
import com.github.sonus21.rqueue.listener.QueueDetail;
3334
import com.github.sonus21.rqueue.models.db.MessageMetadata;
3435
import com.github.sonus21.rqueue.models.enums.MessageStatus;
@@ -50,12 +51,9 @@ abstract class BaseMessageSender {
5051
protected final MessageHeaders messageHeaders;
5152
protected final MessageConverter messageConverter;
5253
protected final RqueueMessageTemplate messageTemplate;
53-
@Autowired
54-
protected RqueueStringDao rqueueStringDao;
55-
@Autowired
56-
protected RqueueConfig rqueueConfig;
57-
@Autowired
58-
protected RqueueMessageMetadataService rqueueMessageMetadataService;
54+
@Autowired protected RqueueStringDao rqueueStringDao;
55+
@Autowired protected RqueueConfig rqueueConfig;
56+
@Autowired protected RqueueMessageMetadataService rqueueMessageMetadataService;
5957

6058
BaseMessageSender(
6159
RqueueMessageTemplate messageTemplate,
@@ -69,13 +67,13 @@ abstract class BaseMessageSender {
6967
}
7068

7169
protected Object storeMessageMetadata(
72-
RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive) {
70+
RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive, boolean isUnique) {
7371
MessageMetadata messageMetadata = new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
7472
Duration duration = rqueueConfig.getMessageDurability(delayInMillis);
7573
if (reactive) {
76-
return rqueueMessageMetadataService.saveReactive(messageMetadata, duration);
74+
return rqueueMessageMetadataService.saveReactive(messageMetadata, duration, isUnique);
7775
} else {
78-
rqueueMessageMetadataService.save(messageMetadata, duration);
76+
rqueueMessageMetadataService.save(messageMetadata, duration, isUnique);
7977
}
8078
return null;
8179
}
@@ -112,7 +110,8 @@ protected String pushMessage(
112110
String messageId,
113111
Object message,
114112
Integer retryCount,
115-
Long delayInMilliSecs) {
113+
Long delayInMilliSecs,
114+
boolean isUnique) {
116115
QueueDetail queueDetail = EndpointRegistry.get(queueName);
117116
RqueueMessage rqueueMessage =
118117
buildMessage(
@@ -124,17 +123,26 @@ protected String pushMessage(
124123
delayInMilliSecs,
125124
messageHeaders);
126125
try {
126+
storeMessageMetadata(rqueueMessage, delayInMilliSecs, false, isUnique);
127127
enqueue(queueDetail, rqueueMessage, delayInMilliSecs, false);
128-
storeMessageMetadata(rqueueMessage, delayInMilliSecs, false);
128+
} catch (DuplicateMessageException e) {
129+
log.warn(
130+
"Duplicate message enqueue attempted queue: {}, messageId: {}",
131+
queueName,
132+
rqueueMessage.getId());
133+
return null;
129134
} catch (Exception e) {
130-
log.error("Queue: {} Message {} could not be pushed {}", queueName, rqueueMessage, e);
135+
log.error("Queue: {} Message {} could not be pushed", queueName, rqueueMessage.getId(), e);
131136
return null;
132137
}
133138
return rqueueMessage.getId();
134139
}
135140

136141
protected String pushPeriodicMessage(
137-
String queueName, String messageId, Object message, long periodInMilliSeconds) {
142+
String queueName,
143+
String messageId,
144+
Object message,
145+
long periodInMilliSeconds) {
138146
QueueDetail queueDetail = EndpointRegistry.get(queueName);
139147
RqueueMessage rqueueMessage =
140148
buildPeriodicMessage(
@@ -146,13 +154,13 @@ protected String pushPeriodicMessage(
146154
periodInMilliSeconds,
147155
messageHeaders);
148156
try {
157+
storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false, false);
149158
enqueue(queueDetail, rqueueMessage, periodInMilliSeconds, false);
150-
storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false);
159+
return rqueueMessage.getId();
151160
} catch (Exception e) {
152-
log.error("Queue: {} Message {} could not be pushed {}", queueName, rqueueMessage, e);
161+
log.error("Queue: {} Message {} could not be pushed", queueName, rqueueMessage, e);
153162
return null;
154163
}
155-
return rqueueMessage.getId();
156164
}
157165

158166
protected Object deleteAllMessages(QueueDetail queueDetail) {

0 commit comments

Comments
 (0)