Skip to content

Commit 66e108d

Browse files
authored
Bug fixes introduced by #40 (#41)
Bug fixes introduced by #40
1 parent 25b1618 commit 66e108d

55 files changed

Lines changed: 297 additions & 227 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ ext {
6767

6868
subprojects {
6969
group = 'com.github.sonus21'
70-
version = '2.1.1-RELEASE'
70+
version = '2.1.0-RELEASE'
7171

7272
dependencies {
7373
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/application/CustomMessageConverterApplication.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.springframework.context.annotation.Bean;
2424
import org.springframework.data.redis.connection.RedisConnectionFactory;
2525
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
26-
import org.springframework.messaging.converter.ByteArrayMessageConverter;
26+
import org.springframework.messaging.MessageHeaders;
27+
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
28+
import org.springframework.util.MimeType;
2729

2830
public abstract class CustomMessageConverterApplication extends ApplicationBasicConfiguration {
2931
@PostConstruct
@@ -48,7 +50,12 @@ public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory
4850
new SimpleRqueueListenerContainerFactory();
4951
simpleRqueueListenerContainerFactory.setRedisConnectionFactory(redisConnectionFactory);
5052
simpleRqueueListenerContainerFactory.setMessageConverters(
51-
Collections.singletonList(new ByteArrayMessageConverter()));
53+
Collections.singletonList(new MappingJackson2MessageConverter()));
54+
MessageHeaders messageHeaders =
55+
new MessageHeaders(
56+
Collections.singletonMap(
57+
MessageHeaders.CONTENT_TYPE, new MimeType("application", "json")));
58+
simpleRqueueListenerContainerFactory.setMessageHeaders(messageHeaders);
5259
return simpleRqueueListenerContainerFactory;
5360
}
5461
}

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/common/SpringTestBase.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.github.sonus21.rqueue.core.RqueueMessageManager;
2727
import com.github.sonus21.rqueue.core.RqueueMessageSender;
2828
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
29-
import com.github.sonus21.rqueue.core.support.RqueueMessageFactory;
29+
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
3030
import com.github.sonus21.rqueue.listener.QueueDetail;
3131
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3232
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
@@ -103,17 +103,17 @@ public abstract class SpringTestBase extends TestBase {
103103

104104
protected void enqueue(Object message, String queueName) {
105105
RqueueMessage rqueueMessage =
106-
RqueueMessageFactory.buildMessage(
107-
rqueueMessageManager.getMessageConverter(), message, queueName, null, null);
106+
RqueueMessageUtils.buildMessage(
107+
rqueueMessageManager.getMessageConverter(), message, queueName, null, null, null);
108108
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
109109
}
110110

111111
protected void enqueue(String queueName, Factory factory, int n) {
112112
for (int i = 0; i < n; i++) {
113113
Object object = factory.next(i);
114114
RqueueMessage rqueueMessage =
115-
RqueueMessageFactory.buildMessage(
116-
rqueueMessageManager.getMessageConverter(), object, queueName, null, null);
115+
RqueueMessageUtils.buildMessage(
116+
rqueueMessageManager.getMessageConverter(), object, queueName, null, null, null);
117117
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
118118
}
119119
}
@@ -123,16 +123,16 @@ protected void enqueueIn(String zsetName, Factory factory, Delay delay, int n) {
123123
Object object = factory.next(i);
124124
long score = delay.getDelay(i);
125125
RqueueMessage rqueueMessage =
126-
RqueueMessageFactory.buildMessage(
127-
rqueueMessageManager.getMessageConverter(), object, zsetName, null, score);
126+
RqueueMessageUtils.buildMessage(
127+
rqueueMessageManager.getMessageConverter(), object, zsetName, null, score, null);
128128
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
129129
}
130130
}
131131

132132
protected void enqueueIn(Object message, String zsetName, long delay) {
133133
RqueueMessage rqueueMessage =
134-
RqueueMessageFactory.buildMessage(
135-
rqueueMessageManager.getMessageConverter(), message, zsetName, null, delay);
134+
RqueueMessageUtils.buildMessage(
135+
rqueueMessageManager.getMessageConverter(), message, zsetName, null, delay, null);
136136
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
137137
}
138138

@@ -181,6 +181,10 @@ protected void printQueueStats(List<String> queueNames) {
181181
}
182182
}
183183

184+
protected void printQueueStats(String queueName) {
185+
printQueueStats(Collections.singletonList(queueName));
186+
}
187+
184188
protected void cleanQueue(String queue) {
185189
QueueDetail queueDetail = EndpointRegistry.get(queue);
186190
stringRqueueRedisTemplate.delete(queueDetail.getQueueName());

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/tests/MessageChannelTests.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
@Slf4j
3535
public abstract class MessageChannelTests extends SpringTestBase {
36-
private final int messageCount = 200;
3736
/**
3837
* This test verifies whether any pending message in the delayed queue are moved or not whenever a
3938
* delayed message is pushed. During enqueue of delayed message we check whether there are any
@@ -42,18 +41,23 @@ public abstract class MessageChannelTests extends SpringTestBase {
4241
*/
4342
protected void verifyPublishMessageIsTriggeredOnMessageAddition() throws TimedOutException {
4443
String delayedQueueName = rqueueConfig.getDelayedQueueName(emailQueue);
45-
enqueueIn(delayedQueueName, i -> Email.newInstance(), i -> -1000L, messageCount);
44+
enqueueIn(delayedQueueName, i -> Email.newInstance(), i -> -1000L, 200);
4645
Email email = Email.newInstance();
4746
log.info("adding new message {}", email);
4847
enqueueIn(emailQueue, email, Duration.ofMillis(1000));
4948
waitFor(
5049
() -> stringRqueueRedisTemplate.getZsetSize(delayedQueueName) <= 1,
5150
"one or zero messages in zset");
5251
assertTrue(
53-
stringRqueueRedisTemplate.getListSize(rqueueConfig.getQueueName(emailQueue))
54-
>= messageCount,
52+
stringRqueueRedisTemplate.getListSize(rqueueConfig.getQueueName(emailQueue)) >= 200,
5553
"Messages are correctly moved");
56-
assertEquals(messageCount + 1L, getMessageCount(emailQueue));
54+
assertEquals(
55+
200 + 1L,
56+
getMessageCount(emailQueue),
57+
() -> {
58+
printQueueStats(emailQueue);
59+
return "message count is not correct";
60+
});
5761
}
5862

5963
/**
@@ -67,7 +71,7 @@ protected void verifyPublishMessageIsTriggeredOnMessageRemoval() throws TimedOut
6771
List<String> ids = new ArrayList<>();
6872
int maxDelay = 2000;
6973
String processingQueue = rqueueConfig.getProcessingQueueName(jobQueue);
70-
for (int i = 0; i < messageCount; i++) {
74+
for (int i = 0; i < 200; i++) {
7175
Job job = Job.newInstance();
7276
jobs.add(job);
7377
ids.add(job.getId());
@@ -79,11 +83,9 @@ protected void verifyPublishMessageIsTriggeredOnMessageRemoval() throws TimedOut
7983
}
8084
TimeoutUtils.sleep(maxDelay);
8185
waitFor(
82-
() -> 0 == getMessageCount(jobQueue),
83-
30 * Constants.ONE_MILLI,
84-
"messages to be consumed");
86+
() -> 0 == getMessageCount(jobQueue), 30 * Constants.ONE_MILLI, "messages to be consumed");
8587
waitFor(
86-
() -> messageCount == consumedMessageService.getMessages(ids, Job.class).size(),
88+
() -> 200 == consumedMessageService.getMessages(ids, Job.class).size(),
8789
30 * Constants.ONE_MILLI,
8890
"message count to be matched");
8991
waitFor(
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
spring.redis.host=localhost
2+
spring.redis.port=8012
3+
job.queue.name=job-queue
4+
job.queue.active=true
5+
notification.queue.name=notification-queue
6+
notification.queue.delay=1000
7+
notification.queue.retry.count=3
8+
notification.queue.active=true
9+
email.queue.name=email-queue
10+
email.dead.letter.queue.name=email-dlq
11+
email.queue.retry.count=3
12+
email.queue.active=true
13+
mysql.db.name=test
14+
rqueue.metrics.tags.rqueue=test
15+
email.execution.time=15*60*1000
16+
sms.queue=sms
17+
sms.queue.active=false
18+
sms.queue.group=
19+
sms.queue.priority=critical:10, high:6, medium:4, low:2
20+
sms.queue.concurrency=-1
21+
chat.indexing.queue=chat-indexing
22+
chat.indexing.queue.active=false
23+
chat.indexing.queue.priority=30
24+
chat.indexing.queue.group=test-group
25+
chat.indexing.queue.concurrency=-1
26+
feed.generation.queue=feed-generation
27+
feed.generation.queue.active=false
28+
feed.generation.queue.priority=20
29+
feed.generation.queue.group=test-group
30+
feed.generation.queue.concurrency=-1
31+
reservation.queue=reservation
32+
reservation.queue.active=false
33+
reservation.queue.priority=10
34+
reservation.queue.group=test-group
35+
reservation.queue.concurrency=-1
36+
reservation.request.queue.name=reservation-request
37+
reservation.request.dead.letter.queue.name=reservation-request-dlq
38+
reservation.request.queue.retry.count=2
39+
reservation.request.dead.letter.consumer.enabled=false
40+
reservation.request.active=false
41+
reservation.request.dead.letter.queue.retry.count=1
42+
list.email.queue.name=email-list-queue
43+
list.email.queue.enabled=false
44+
rqueue.system.mode=PRODUCER
45+
46+

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.core.task.AsyncTaskExecutor;
3535
import org.springframework.core.task.TaskExecutor;
3636
import org.springframework.data.redis.connection.RedisConnectionFactory;
37+
import org.springframework.messaging.MessageHeaders;
3738
import org.springframework.messaging.converter.CompositeMessageConverter;
3839
import org.springframework.messaging.converter.MessageConverter;
3940

@@ -55,7 +56,7 @@ public class SimpleRqueueListenerContainerFactory {
5556
// Custom requeue message handler
5657
private RqueueMessageHandler rqueueMessageHandler;
5758
// The message converter to convert messages to/from
58-
private MessageConverter messageConverter;
59+
private MessageConverter messageConverter = new DefaultRqueueMessageConverter();
5960
// Send message poll time when no messages are available
6061
private long pollingInterval = 200L;
6162
// In case of failure how much time, we should wait for next job
@@ -79,6 +80,9 @@ public class SimpleRqueueListenerContainerFactory {
7980
// Any custom message requeue message template.
8081
private RqueueMessageTemplate rqueueMessageTemplate;
8182

83+
// Any message headers that should be set, only used for message serialization
84+
private MessageHeaders messageHeaders;
85+
8286
// Set priority mode for the pollers
8387
private PriorityMode priorityMode;
8488

@@ -205,7 +209,8 @@ public void setMaxNumWorkers(int maxNumWorkers) {
205209
this.maxNumWorkers = maxNumWorkers;
206210
}
207211

208-
/** @return the message converters
212+
/**
213+
* @return the message converters
209214
* @deprecated use {@link #getMessageConverter()}
210215
*/
211216
@Deprecated
@@ -215,9 +220,6 @@ public List<MessageConverter> getMessageConverters() {
215220

216221
/** @return the message converter */
217222
public MessageConverter getMessageConverter() {
218-
if (messageConverter == null) {
219-
messageConverter = new DefaultRqueueMessageConverter();
220-
}
221223
return messageConverter;
222224
}
223225

@@ -239,6 +241,10 @@ public void setMessageConverters(List<MessageConverter> messageConverters) {
239241
}
240242

241243
/**
244+
* A default message converter {@link DefaultRqueueMessageConverter} is added that can handle all
245+
* type of data serialization/deserialization and all data is serialized to and from JSON. You can
246+
* use other mechanism to serialize/deserialize class object like MessagePack or any other format.
247+
*
242248
* @param messageConverter the message converter
243249
*/
244250
public void setMessageConverter(MessageConverter messageConverter) {
@@ -452,4 +458,13 @@ public PriorityMode getPriorityMode() {
452458
public void setPriorityMode(PriorityMode priorityMode) {
453459
this.priorityMode = priorityMode;
454460
}
461+
462+
public MessageHeaders getMessageHeaders() {
463+
return messageHeaders;
464+
}
465+
466+
public void setMessageHeaders(MessageHeaders messageHeaders) {
467+
notEmpty(messageHeaders, "messageHeaders can not be empty");
468+
this.messageHeaders = messageHeaders;
469+
}
455470
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/DefaultRqueueMessageConverter.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,13 @@
1717
package com.github.sonus21.rqueue.core;
1818

1919
import com.github.sonus21.rqueue.converter.GenericMessageConverter;
20-
import java.util.ArrayList;
21-
import java.util.Collection;
22-
import java.util.List;
2320
import com.google.common.collect.ImmutableList;
2421
import lombok.EqualsAndHashCode;
2522
import org.springframework.messaging.converter.CompositeMessageConverter;
26-
import org.springframework.messaging.converter.MessageConverter;
2723
import org.springframework.messaging.converter.StringMessageConverter;
2824

2925
@EqualsAndHashCode(callSuper = true)
30-
public final class DefaultRqueueMessageConverter extends CompositeMessageConverter implements MessageConverter{
26+
public final class DefaultRqueueMessageConverter extends CompositeMessageConverter {
3127

3228
public DefaultRqueueMessageConverter() {
3329
super(ImmutableList.of(new GenericMessageConverter(), new StringMessageConverter()));

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.github.sonus21.rqueue.core.impl;
1818

19+
import static com.github.sonus21.rqueue.core.support.RqueueMessageUtils.buildMessage;
1920
import static com.github.sonus21.rqueue.utils.Constants.MIN_DELAY;
20-
import static com.github.sonus21.rqueue.utils.MessageUtils.buildMessage;
2121
import static com.github.sonus21.rqueue.utils.Validator.validateQueue;
2222
import static org.springframework.util.Assert.notNull;
2323

@@ -34,6 +34,7 @@
3434
import java.time.Duration;
3535
import lombok.extern.slf4j.Slf4j;
3636
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.messaging.MessageHeaders;
3738
import org.springframework.messaging.converter.MessageConverter;
3839

3940
@Slf4j
@@ -44,13 +45,17 @@ abstract class BaseMessageSender {
4445
@Autowired protected RqueueRedisTemplate<String> stringRqueueRedisTemplate;
4546
@Autowired protected RqueueConfig rqueueConfig;
4647
@Autowired protected RqueueMessageMetadataService rqueueMessageMetadataService;
48+
protected final MessageHeaders messageHeaders;
4749

4850
BaseMessageSender(
49-
RqueueMessageTemplate messageTemplate, MessageConverter messageConverter) {
51+
RqueueMessageTemplate messageTemplate,
52+
MessageConverter messageConverter,
53+
MessageHeaders messageHeaders) {
5054
notNull(messageTemplate, "messageTemplate cannot be null");
5155
notNull(messageConverter, "messageConverter cannot be null");
5256
this.messageTemplate = messageTemplate;
5357
this.messageConverter = messageConverter;
58+
this.messageHeaders = messageHeaders;
5459
}
5560

5661
private void storeMessageMetadata(RqueueMessage rqueueMessage, Long delayInMillis) {
@@ -75,7 +80,8 @@ private RqueueMessage constructMessage(
7580
Integer retryCount,
7681
Long delayInMilliSecs) {
7782
RqueueMessage rqueueMessage =
78-
buildMessage(messageConverter, queueName, message, retryCount, delayInMilliSecs);
83+
buildMessage(
84+
messageConverter, message, queueName, retryCount, delayInMilliSecs, messageHeaders);
7985
if (messageId != null) {
8086
rqueueMessage.setId(messageId);
8187
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,17 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import org.springframework.messaging.MessageHeaders;
3132
import org.springframework.messaging.converter.MessageConverter;
3233
import org.springframework.util.CollectionUtils;
3334

3435
public class RqueueEndpointManagerImpl extends BaseMessageSender implements RqueueEndpointManager {
3536

36-
public RqueueEndpointManagerImpl(RqueueMessageTemplate messageTemplate,
37-
MessageConverter messageConverter) {
38-
super(messageTemplate, messageConverter);
37+
public RqueueEndpointManagerImpl(
38+
RqueueMessageTemplate messageTemplate,
39+
MessageConverter messageConverter,
40+
MessageHeaders messageHeaders) {
41+
super(messageTemplate, messageConverter, messageHeaders);
3942
}
4043

4144
@Override

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageEnqueuerImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@
2626
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
2727
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2828
import com.github.sonus21.rqueue.utils.PriorityUtils;
29-
import java.util.Collections;
30-
import java.util.List;
3129
import lombok.extern.slf4j.Slf4j;
30+
import org.springframework.messaging.MessageHeaders;
3231
import org.springframework.messaging.converter.MessageConverter;
3332

3433
@Slf4j
3534
public class RqueueMessageEnqueuerImpl extends BaseMessageSender implements RqueueMessageEnqueuer {
3635

3736
public RqueueMessageEnqueuerImpl(
38-
RqueueMessageTemplate messageTemplate, MessageConverter messageConverter) {
39-
super(messageTemplate, messageConverter);
37+
RqueueMessageTemplate messageTemplate,
38+
MessageConverter messageConverter,
39+
MessageHeaders messageHeaders) {
40+
super(messageTemplate, messageConverter, messageHeaders);
4041
}
4142

4243
@Override

0 commit comments

Comments
 (0)