Skip to content

Commit 25b1618

Browse files
authored
Rdwallis/messageconverter (#40)
* Inject configured message converters into RqueueEndpointManager * Use singleton MessageConverter instead of List of MessageConverters * Add back getMessageList and deprecate it * Remove @bean default configurations for MessageConverter and SimpleRqueueListenerContainerFactory
1 parent eb342d5 commit 25b1618

19 files changed

Lines changed: 114 additions & 151 deletions

File tree

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ hs_err_pid*
2929
.idea/
3030
/.gradle/
3131
/build/
32-
/*/build/
32+
/*/build/
33+
34+
.DS_Store

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ ext {
6767

6868
subprojects {
6969
group = 'com.github.sonus21'
70-
version = '2.1.0-RELEASE'
70+
version = '2.1.1-RELEASE'
7171

7272
dependencies {
7373
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.springframework.util.Assert.notNull;
2121

2222
import com.github.sonus21.rqueue.annotation.RqueueListener;
23+
import com.github.sonus21.rqueue.core.DefaultRqueueMessageConverter;
2324
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2425
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
2526
import com.github.sonus21.rqueue.core.support.MessageProcessor;
@@ -28,10 +29,12 @@
2829
import com.github.sonus21.rqueue.models.enums.PriorityMode;
2930
import com.github.sonus21.rqueue.utils.Constants;
3031
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
32+
import java.util.Collections;
3133
import java.util.List;
3234
import org.springframework.core.task.AsyncTaskExecutor;
3335
import org.springframework.core.task.TaskExecutor;
3436
import org.springframework.data.redis.connection.RedisConnectionFactory;
37+
import org.springframework.messaging.converter.CompositeMessageConverter;
3538
import org.springframework.messaging.converter.MessageConverter;
3639

3740
/**
@@ -51,8 +54,8 @@ public class SimpleRqueueListenerContainerFactory {
5154
private RedisConnectionFactory redisConnectionFactory;
5255
// Custom requeue message handler
5356
private RqueueMessageHandler rqueueMessageHandler;
54-
// List of message converters to convert messages to/from
55-
private List<MessageConverter> messageConverters;
57+
// The message converter to convert messages to/from
58+
private MessageConverter messageConverter;
5659
// Send message poll time when no messages are available
5760
private long pollingInterval = 200L;
5861
// In case of failure how much time, we should wait for next job
@@ -137,6 +140,9 @@ public void setAutoStartup(boolean autoStartup) {
137140
* @return RqueueMessageHandler object
138141
*/
139142
public RqueueMessageHandler getRqueueMessageHandler() {
143+
if (rqueueMessageHandler == null) {
144+
rqueueMessageHandler = new RqueueMessageHandler(getMessageConverter());
145+
}
140146
return rqueueMessageHandler;
141147
}
142148

@@ -199,20 +205,45 @@ public void setMaxNumWorkers(int maxNumWorkers) {
199205
this.maxNumWorkers = maxNumWorkers;
200206
}
201207

202-
/** @return list of configured message converters */
208+
/** @return the message converters
209+
* @deprecated use {@link #getMessageConverter()}
210+
*/
211+
@Deprecated
203212
public List<MessageConverter> getMessageConverters() {
204-
return messageConverters;
213+
return Collections.singletonList(getMessageConverter());
214+
}
215+
216+
/** @return the message converter */
217+
public MessageConverter getMessageConverter() {
218+
if (messageConverter == null) {
219+
messageConverter = new DefaultRqueueMessageConverter();
220+
}
221+
return messageConverter;
205222
}
206223

207224
/**
208225
* For message (de)serialization we might need one or more message converters, configure those
209226
* message converters
210227
*
211228
* @param messageConverters list of message converters
229+
* @deprecated use {@link #setMessageConverter(MessageConverter)}
212230
*/
231+
@Deprecated
213232
public void setMessageConverters(List<MessageConverter> messageConverters) {
214233
notEmpty(messageConverters, "messageConverters must not be empty");
215-
this.messageConverters = messageConverters;
234+
if (messageConverters.size() == 1) {
235+
setMessageConverter(messageConverters.get(0));
236+
} else {
237+
setMessageConverter(new CompositeMessageConverter(messageConverters));
238+
}
239+
}
240+
241+
/**
242+
* @param messageConverter the message converter
243+
*/
244+
public void setMessageConverter(MessageConverter messageConverter) {
245+
notNull(messageConverter, "message converter must not be null");
246+
this.messageConverter = messageConverter;
216247
}
217248

218249
/** @return get Redis connection factor */
@@ -254,13 +285,13 @@ public void setRqueueMessageTemplate(RqueueMessageTemplate messageTemplate) {
254285
* @return an object of {@link RqueueMessageListenerContainer} object
255286
*/
256287
public RqueueMessageListenerContainer createMessageListenerContainer() {
257-
notNull(rqueueMessageHandler, "rqueueMessageHandler must not be null");
288+
notNull(getRqueueMessageHandler(), "rqueueMessageHandler must not be null");
258289
notNull(redisConnectionFactory, "redisConnectionFactory must not be null");
259290
if (rqueueMessageTemplate == null) {
260291
rqueueMessageTemplate = new RqueueMessageTemplateImpl(redisConnectionFactory);
261292
}
262293
RqueueMessageListenerContainer messageListenerContainer =
263-
new RqueueMessageListenerContainer(rqueueMessageHandler, rqueueMessageTemplate);
294+
new RqueueMessageListenerContainer(getRqueueMessageHandler(), rqueueMessageTemplate);
264295
messageListenerContainer.setAutoStartup(autoStartup);
265296
if (taskExecutor != null) {
266297
messageListenerContainer.setTaskExecutor(taskExecutor);

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageConverterFactory.java renamed to rqueue-core/src/main/java/com/github/sonus21/rqueue/core/DefaultRqueueMessageConverter.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,18 @@
1818

1919
import com.github.sonus21.rqueue.converter.GenericMessageConverter;
2020
import java.util.ArrayList;
21+
import java.util.Collection;
2122
import java.util.List;
23+
import com.google.common.collect.ImmutableList;
24+
import lombok.EqualsAndHashCode;
2225
import org.springframework.messaging.converter.CompositeMessageConverter;
2326
import org.springframework.messaging.converter.MessageConverter;
2427
import org.springframework.messaging.converter.StringMessageConverter;
2528

26-
public final class MessageConverterFactory {
27-
private MessageConverterFactory() {}
29+
@EqualsAndHashCode(callSuper = true)
30+
public final class DefaultRqueueMessageConverter extends CompositeMessageConverter implements MessageConverter{
2831

29-
public static CompositeMessageConverter getMessageConverter(
30-
List<MessageConverter> messageConverterList) {
31-
List<MessageConverter> messageConverters = messageConverterList;
32-
if (messageConverterList == null) {
33-
messageConverters = new ArrayList<>();
34-
}
35-
messageConverters = new ArrayList<>(messageConverters);
36-
// String message converter
37-
StringMessageConverter stringMessageConverter = new StringMessageConverter();
38-
stringMessageConverter.setSerializedPayloadClass(String.class);
39-
messageConverters.add(stringMessageConverter);
40-
// add generic message converter
41-
messageConverters.add(new GenericMessageConverter());
42-
return new CompositeMessageConverter(messageConverters);
32+
public DefaultRqueueMessageConverter() {
33+
super(ImmutableList.of(new GenericMessageConverter(), new StringMessageConverter()));
4334
}
4435
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
2525
import com.github.sonus21.rqueue.config.RqueueConfig;
2626
import com.github.sonus21.rqueue.core.EndpointRegistry;
27-
import com.github.sonus21.rqueue.core.MessageConverterFactory;
2827
import com.github.sonus21.rqueue.core.RqueueMessage;
2928
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
3029
import com.github.sonus21.rqueue.listener.QueueDetail;
@@ -33,32 +32,25 @@
3332
import com.github.sonus21.rqueue.utils.PriorityUtils;
3433
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
3534
import java.time.Duration;
36-
import java.util.Collections;
37-
import java.util.List;
3835
import lombok.extern.slf4j.Slf4j;
3936
import org.springframework.beans.factory.annotation.Autowired;
40-
import org.springframework.messaging.converter.CompositeMessageConverter;
4137
import org.springframework.messaging.converter.MessageConverter;
4238

4339
@Slf4j
4440
@SuppressWarnings("WeakerAccess")
4541
abstract class BaseMessageSender {
46-
protected CompositeMessageConverter messageConverter;
42+
protected MessageConverter messageConverter;
4743
protected RqueueMessageTemplate messageTemplate;
4844
@Autowired protected RqueueRedisTemplate<String> stringRqueueRedisTemplate;
4945
@Autowired protected RqueueConfig rqueueConfig;
5046
@Autowired protected RqueueMessageMetadataService rqueueMessageMetadataService;
5147

52-
BaseMessageSender(RqueueMessageTemplate messageTemplate) {
53-
this(messageTemplate, Collections.emptyList());
54-
}
55-
5648
BaseMessageSender(
57-
RqueueMessageTemplate messageTemplate, List<MessageConverter> messageConverters) {
49+
RqueueMessageTemplate messageTemplate, MessageConverter messageConverter) {
5850
notNull(messageTemplate, "messageTemplate cannot be null");
59-
notNull(messageConverters, "messageConverters cannot be null");
51+
notNull(messageConverter, "messageConverter cannot be null");
6052
this.messageTemplate = messageTemplate;
61-
this.messageConverter = MessageConverterFactory.getMessageConverter(messageConverters);
53+
this.messageConverter = messageConverter;
6254
}
6355

6456
private void storeMessageMetadata(RqueueMessage rqueueMessage, Long delayInMillis) {

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import org.springframework.messaging.converter.MessageConverter;
3132
import org.springframework.util.CollectionUtils;
3233

3334
public class RqueueEndpointManagerImpl extends BaseMessageSender implements RqueueEndpointManager {
3435

35-
public RqueueEndpointManagerImpl(RqueueMessageTemplate messageTemplate) {
36-
super(messageTemplate);
36+
public RqueueEndpointManagerImpl(RqueueMessageTemplate messageTemplate,
37+
MessageConverter messageConverter) {
38+
super(messageTemplate, messageConverter);
3739
}
3840

3941
@Override

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageEnqueuerImpl.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,9 @@
3434
@Slf4j
3535
public class RqueueMessageEnqueuerImpl extends BaseMessageSender implements RqueueMessageEnqueuer {
3636

37-
public RqueueMessageEnqueuerImpl(RqueueMessageTemplate messageTemplate) {
38-
super(messageTemplate, Collections.emptyList());
39-
}
40-
4137
public RqueueMessageEnqueuerImpl(
42-
RqueueMessageTemplate messageTemplate, List<MessageConverter> messageConverters) {
43-
super(messageTemplate, messageConverters);
38+
RqueueMessageTemplate messageTemplate, MessageConverter messageConverter) {
39+
super(messageTemplate, messageConverter);
4440
}
4541

4642
@Override

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageManagerImpl.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,9 @@
4040
public class RqueueMessageManagerImpl extends BaseMessageSender implements RqueueMessageManager {
4141
@Autowired private RqueueLockManager rqueueLockManager;
4242

43-
public RqueueMessageManagerImpl(RqueueMessageTemplate messageTemplate) {
44-
super(messageTemplate, Collections.emptyList());
45-
}
46-
4743
public RqueueMessageManagerImpl(
48-
RqueueMessageTemplate messageTemplate, List<MessageConverter> messageConverters) {
49-
super(messageTemplate, messageConverters);
44+
RqueueMessageTemplate messageTemplate, MessageConverter messageConverter) {
45+
super(messageTemplate, messageConverter);
5046
}
5147

5248
@Override

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageSenderImpl.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,16 @@
3636
import java.util.ArrayList;
3737
import java.util.Collections;
3838
import java.util.List;
39+
import com.google.common.collect.ImmutableList;
3940
import lombok.extern.slf4j.Slf4j;
4041
import org.springframework.messaging.converter.MessageConverter;
4142

4243
@Slf4j
4344
public class RqueueMessageSenderImpl extends BaseMessageSender implements RqueueMessageSender {
4445

45-
public RqueueMessageSenderImpl(RqueueMessageTemplate messageTemplate) {
46-
this(messageTemplate, Collections.emptyList());
47-
}
48-
4946
public RqueueMessageSenderImpl(
50-
RqueueMessageTemplate messageTemplate, List<MessageConverter> messageConverters) {
51-
super(messageTemplate, messageConverters);
47+
RqueueMessageTemplate messageTemplate, MessageConverter messageConverter) {
48+
super(messageTemplate, messageConverter);
5249
}
5350

5451
@Override
@@ -131,7 +128,7 @@ public MessageConverter getMessageConverter() {
131128

132129
@Override
133130
public List<MessageConverter> getMessageConverters() {
134-
return messageConverter.getConverters();
131+
return ImmutableList.of(messageConverter);
135132
}
136133

137134
@Override

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import static org.springframework.util.Assert.notNull;
2020

2121
import com.github.sonus21.rqueue.annotation.RqueueListener;
22+
import com.github.sonus21.rqueue.core.DefaultRqueueMessageConverter;
2223
import com.github.sonus21.rqueue.core.EndpointRegistry;
23-
import com.github.sonus21.rqueue.core.MessageConverterFactory;
2424
import com.github.sonus21.rqueue.exception.QueueDoesNotExist;
2525
import com.github.sonus21.rqueue.models.Concurrency;
2626
import com.github.sonus21.rqueue.utils.Constants;
@@ -62,13 +62,9 @@ public class RqueueMessageHandler extends AbstractMethodMessageHandler<MappingIn
6262
private final ConversionService conversionService;
6363
private final MessageConverter messageConverter;
6464

65-
public RqueueMessageHandler() {
66-
this(Collections.emptyList());
67-
}
68-
69-
public RqueueMessageHandler(List<MessageConverter> messageConverters) {
70-
notNull(messageConverters, "messageConverters cannot be null");
71-
this.messageConverter = MessageConverterFactory.getMessageConverter(messageConverters);
65+
public RqueueMessageHandler(final MessageConverter messageConverter) {
66+
notNull(messageConverter, "messageConverter cannot be null");
67+
this.messageConverter = messageConverter;
7268
this.conversionService = new DefaultFormattingConversionService();
7369
}
7470

0 commit comments

Comments
 (0)