Skip to content

Commit 6a43586

Browse files
committed
Introduce RqueueSerDes and RqueueTypeFactory as universal serialization abstractions
Replace all direct ObjectMapper usages with RqueueSerDes and RqueueTypeFactory: - GenericMessageConverter.SmartMessageSerDes now takes RqueueSerDes + RqueueTypeFactory; getTargetType returns TypeEnvelop instead of Jackson-specific JavaType - JsonMessageConverter, RqueueWorkerRegistryImpl, HttpUtils, RqueueInternalPubSubChannel all use RqueueSerDes; SmartMessageSerDes removed from RqueueInternalPubSubChannel - RqueuePubSubEvent.messageAs signature updated to accept RqueueSerDes - SerializationUtils exposes static serDes and typeFactory singletons for non-Spring use - RqueueListenerBaseConfig registers @bean for both with MissingRqueueSerDes / MissingRqueueTypeFactory conditions so user-provided beans take precedence - Msg.msg changed from String to byte[] with Utf8BytesSerializer/Deserializer to keep the same wire format and preserve backward compatibility with stored messages - backward compatibility test added to GenericMessageConverterTest Assisted-By: Claude Code
1 parent 0e518b5 commit 6a43586

42 files changed

Lines changed: 665 additions & 245 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,13 @@
2323
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2424
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
2525
import com.github.sonus21.rqueue.core.impl.UuidV4RqueueMessageIdGenerator;
26+
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
27+
import com.github.sonus21.rqueue.serdes.RqueueTypeFactory;
28+
import com.github.sonus21.rqueue.serdes.SerializationUtils;
2629
import com.github.sonus21.rqueue.utils.RedisUtils;
2730
import com.github.sonus21.rqueue.utils.condition.MissingRqueueMessageIdGenerator;
31+
import com.github.sonus21.rqueue.utils.condition.MissingRqueueSerDes;
32+
import com.github.sonus21.rqueue.utils.condition.MissingRqueueTypeFactory;
2833
import org.springframework.beans.factory.annotation.Autowired;
2934
import org.springframework.beans.factory.annotation.Value;
3035
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -180,6 +185,18 @@ protected RqueueMessageTemplate getMessageTemplate(RqueueConfig rqueueConfig) {
180185
return simpleRqueueListenerContainerFactory.getRqueueMessageTemplate();
181186
}
182187

188+
@Bean
189+
@Conditional(MissingRqueueSerDes.class)
190+
public RqueueSerDes rqueueSerDes() {
191+
return SerializationUtils.getSerDes();
192+
}
193+
194+
@Bean
195+
@Conditional(MissingRqueueTypeFactory.class)
196+
public RqueueTypeFactory rqueueTypeFactory() {
197+
return SerializationUtils.getTypeFactory();
198+
}
199+
183200
@Bean
184201
public RqueueBeanProvider rqueueBeanProvider() {
185202
return new RqueueBeanProvider();

rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818

1919
import static org.springframework.util.Assert.notNull;
2020

21-
import com.github.sonus21.rqueue.utils.SerializationUtils;
21+
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
22+
import com.github.sonus21.rqueue.serdes.RqueueTypeFactory;
23+
import com.github.sonus21.rqueue.serdes.SerializationUtils;
24+
import com.github.sonus21.rqueue.serdes.TypeEnvelop;
2225
import java.lang.reflect.Field;
2326
import java.lang.reflect.TypeVariable;
27+
import java.nio.charset.StandardCharsets;
2428
import java.util.Collection;
2529
import java.util.List;
2630
import lombok.AllArgsConstructor;
@@ -33,8 +37,14 @@
3337
import org.springframework.messaging.converter.SmartMessageConverter;
3438
import org.springframework.messaging.support.GenericMessage;
3539
import tools.jackson.core.JacksonException;
36-
import tools.jackson.databind.JavaType;
37-
import tools.jackson.databind.ObjectMapper;
40+
import tools.jackson.core.JsonGenerator;
41+
import tools.jackson.core.JsonParser;
42+
import tools.jackson.databind.DeserializationContext;
43+
import tools.jackson.databind.SerializationContext;
44+
import tools.jackson.databind.annotation.JsonDeserialize;
45+
import tools.jackson.databind.annotation.JsonSerialize;
46+
import tools.jackson.databind.deser.std.StdDeserializer;
47+
import tools.jackson.databind.ser.std.StdSerializer;
3848

3949
/**
4050
* A converter to turn the payload of a {@link Message} from serialized form to a typed String and
@@ -47,13 +57,14 @@ public class GenericMessageConverter implements SmartMessageConverter {
4757
private final SmartMessageSerDes smartMessageSerDes;
4858

4959
public GenericMessageConverter() {
50-
ObjectMapper mapper = SerializationUtils.createObjectMapper();
51-
this.smartMessageSerDes = new SmartMessageSerDes(mapper);
60+
this.smartMessageSerDes = new SmartMessageSerDes(
61+
SerializationUtils.getSerDes(), SerializationUtils.getTypeFactory());
5262
}
5363

54-
public GenericMessageConverter(ObjectMapper objectMapper) {
55-
notNull(objectMapper, "objectMapper cannot be null");
56-
this.smartMessageSerDes = new SmartMessageSerDes(objectMapper);
64+
public GenericMessageConverter(RqueueSerDes serDes, RqueueTypeFactory typeFactory) {
65+
notNull(serDes, "serDes cannot be null");
66+
notNull(typeFactory, "typeFactory cannot be null");
67+
this.smartMessageSerDes = new SmartMessageSerDes(serDes, typeFactory);
5768
}
5869

5970
/**
@@ -117,16 +128,49 @@ public Object fromMessage(Message<?> message, Class<?> targetClass) {
117128
@AllArgsConstructor
118129
private static class Msg {
119130

120-
private String msg;
131+
@JsonSerialize(using = Utf8BytesSerializer.class)
132+
@JsonDeserialize(using = Utf8BytesDeserializer.class)
133+
private byte[] msg;
121134
private String name;
122135
}
123136

137+
private static class Utf8BytesSerializer extends StdSerializer<byte[]> {
138+
139+
Utf8BytesSerializer() {
140+
super(byte[].class);
141+
}
142+
143+
@Override
144+
public void serialize(byte[] value, JsonGenerator gen, SerializationContext ctx)
145+
throws JacksonException {
146+
gen.writeString(new String(value, StandardCharsets.UTF_8));
147+
}
148+
}
149+
150+
private static class Utf8BytesDeserializer extends StdDeserializer<byte[]> {
151+
152+
Utf8BytesDeserializer() {
153+
super(byte[].class);
154+
}
155+
156+
@Override
157+
public byte[] deserialize(JsonParser p, DeserializationContext ctx) throws JacksonException {
158+
String text = p.getString();
159+
if (text == null) {
160+
return null;
161+
}
162+
return text.getBytes(StandardCharsets.UTF_8);
163+
}
164+
}
165+
124166
public static class SmartMessageSerDes {
125167

126-
private final ObjectMapper objectMapper;
168+
private final RqueueSerDes serDes;
169+
private final RqueueTypeFactory typeFactory;
127170

128-
public SmartMessageSerDes(ObjectMapper objectMapper) {
129-
this.objectMapper = objectMapper;
171+
public SmartMessageSerDes(RqueueSerDes serDes, RqueueTypeFactory typeFactory) {
172+
this.serDes = serDes;
173+
this.typeFactory = typeFactory;
130174
}
131175

132176
private String[] splitClassNames(String name) {
@@ -197,27 +241,27 @@ private String getClassName(Object payload) {
197241
return getGenericFieldBasedClassName(payloadClass, payload);
198242
}
199243

200-
private JavaType getTargetType(Msg msg) throws ClassNotFoundException {
244+
private TypeEnvelop getTargetType(Msg msg) throws ClassNotFoundException {
201245
String[] classNames = splitClassNames(msg.getName());
202246
if (classNames.length == 1) {
203247
Class<?> c = Thread.currentThread().getContextClassLoader().loadClass(msg.getName());
204-
return objectMapper.getTypeFactory().constructType(c);
248+
return typeFactory.create(c);
205249
}
206250
Class<?> envelopeClass =
207251
Thread.currentThread().getContextClassLoader().loadClass(classNames[0]);
208252
Class<?>[] classes = new Class<?>[classNames.length - 1];
209253
for (int i = 1; i < classNames.length; i++) {
210254
classes[i - 1] = Thread.currentThread().getContextClassLoader().loadClass(classNames[i]);
211255
}
212-
return objectMapper.getTypeFactory().constructParametricType(envelopeClass, classes);
256+
return typeFactory.create(envelopeClass, classes);
213257
}
214258

215259
public Object deserialize(String payload) {
216260
try {
217261
if (SerializationUtils.isJson(payload)) {
218-
Msg msg = objectMapper.readValue(payload, Msg.class);
219-
JavaType type = getTargetType(msg);
220-
return objectMapper.readValue(msg.msg, type);
262+
Msg msg = serDes.deserialize(payload, Msg.class);
263+
TypeEnvelop type = getTargetType(msg);
264+
return serDes.deserialize(msg.msg, type);
221265
}
222266
} catch (Exception e) {
223267
log.debug("Deserialization of message {} failed", payload, e);
@@ -230,7 +274,7 @@ public <T> T deserialize(byte[] payload, Class<T> clazz) {
230274
return null;
231275
}
232276
try {
233-
return objectMapper.readValue(payload, clazz);
277+
return serDes.deserialize(payload, clazz);
234278
} catch (Exception e) {
235279
log.debug("Deserialization of message {} failed", new String(payload), e);
236280
}
@@ -243,10 +287,10 @@ public String serialize(Object payload) {
243287
return null;
244288
}
245289
try {
246-
String msg = objectMapper.writeValueAsString(payload);
290+
byte[] msg = serDes.serialize(payload);
247291
Msg message = new Msg(msg, name);
248-
return objectMapper.writeValueAsString(message);
249-
} catch (JacksonException e) {
292+
return serDes.serializeAsString(message);
293+
} catch (Exception e) {
250294
log.debug("Serialisation failed", e);
251295
return null;
252296
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/JsonMessageConverter.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818

1919
import static org.springframework.util.Assert.notNull;
2020

21-
import com.github.sonus21.rqueue.utils.SerializationUtils;
21+
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
22+
import com.github.sonus21.rqueue.serdes.SerializationUtils;
2223
import lombok.extern.slf4j.Slf4j;
2324
import org.springframework.messaging.Message;
2425
import org.springframework.messaging.MessageHeaders;
2526
import org.springframework.messaging.converter.MessageConverter;
2627
import org.springframework.messaging.support.GenericMessage;
27-
import tools.jackson.core.JacksonException;
28-
import tools.jackson.databind.ObjectMapper;
2928

3029
/**
3130
* JsonMessageConverter tries to convert to JSON and from JSON to object.
@@ -41,15 +40,15 @@
4140
@Slf4j
4241
public class JsonMessageConverter implements MessageConverter {
4342

44-
private final ObjectMapper objectMapper;
43+
private final RqueueSerDes serDes;
4544

4645
public JsonMessageConverter() {
47-
this.objectMapper = SerializationUtils.createObjectMapper();
46+
this.serDes = SerializationUtils.getSerDes();
4847
}
4948

50-
public JsonMessageConverter(ObjectMapper objectMapper) {
51-
notNull(objectMapper, "objectMapper cannot be null");
52-
this.objectMapper = objectMapper;
49+
public JsonMessageConverter(RqueueSerDes serDes) {
50+
notNull(serDes, "serDes cannot be null");
51+
this.serDes = serDes;
5352
}
5453

5554
@Override
@@ -61,10 +60,10 @@ public Object fromMessage(Message<?> message, Class<?> targetClass) {
6160
return null;
6261
}
6362
if (SerializationUtils.isJson(payload)) {
64-
return objectMapper.readValue(payload, targetClass);
63+
return serDes.deserialize(payload, targetClass);
6564
}
6665
return null;
67-
} catch (JacksonException | ClassCastException e) {
66+
} catch (Exception e) {
6867
log.debug("Deserialization of message {} failed", message, e);
6968
return null;
7069
}
@@ -74,9 +73,8 @@ public Object fromMessage(Message<?> message, Class<?> targetClass) {
7473
public Message<?> toMessage(Object payload, MessageHeaders headers) {
7574
log.trace("Payload: {} Headers: {}", payload, headers);
7675
try {
77-
String msg = objectMapper.writeValueAsString(payload);
78-
return new GenericMessage<>(msg);
79-
} catch (JacksonException e) {
76+
return new GenericMessage<>(serDes.serializeAsString(payload));
77+
} catch (Exception e) {
8078
log.debug("Serialisation failed, Payload: {}", payload, e);
8179
return null;
8280
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/MessageConverterProvider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.springframework.messaging.Message;
2121
import org.springframework.messaging.converter.CompositeMessageConverter;
2222
import org.springframework.messaging.converter.JacksonJsonMessageConverter;
23-
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
2423
import org.springframework.messaging.converter.MessageConverter;
2524
import org.springframework.messaging.converter.SmartMessageConverter;
2625
import org.springframework.messaging.converter.StringMessageConverter;
@@ -42,7 +41,6 @@
4241
* @see SmartMessageConverter
4342
* @see DefaultRqueueMessageConverter
4443
* @see JacksonJsonMessageConverter
45-
* @see MappingJackson2MessageConverter
4644
*/
4745
public interface MessageConverterProvider {
4846

rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.github.sonus21.rqueue.converter;
1818

1919
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
20-
import com.github.sonus21.rqueue.utils.SerializationUtils;
20+
import com.github.sonus21.rqueue.serdes.SerializationUtils;
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.springframework.cache.support.NullValue;
2323
import org.springframework.data.redis.serializer.RedisSerializer;
@@ -64,11 +64,10 @@ public Object deserialize(byte[] bytes) throws SerializationException {
6464

6565
// adapted from spring-data-redis
6666
private static class RqueueRedisSerDes implements RedisSerializer<Object> {
67-
68-
private ObjectMapper mapper;
67+
private final ObjectMapper mapper;
6968

7069
RqueueRedisSerDes() {
71-
this.mapper = SerializationUtils.createObjectMapper()
70+
this.mapper = SerializationUtils.getObjectMapper()
7271
.rebuild()
7372
.addModule(new SimpleModule().addSerializer(new NullValueSerializer()))
7473
.activateDefaultTyping(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
2020
import com.github.sonus21.rqueue.config.RqueueConfig;
21-
import com.github.sonus21.rqueue.converter.GenericMessageConverter.SmartMessageSerDes;
2221
import com.github.sonus21.rqueue.converter.RqueueRedisSerializer;
2322
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
2423
import com.github.sonus21.rqueue.models.enums.PubSubType;
2524
import com.github.sonus21.rqueue.models.event.RqueuePubSubEvent;
2625
import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest;
26+
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
27+
import com.github.sonus21.rqueue.serdes.SerializationUtils;
2728
import com.github.sonus21.rqueue.utils.Constants;
28-
import com.github.sonus21.rqueue.utils.SerializationUtils;
2929
import com.github.sonus21.rqueue.utils.StringUtils;
3030
import java.time.Duration;
3131
import java.util.UUID;
@@ -44,7 +44,7 @@ public class RqueueInternalPubSubChannel implements InitializingBean {
4444
private final RqueueRedisTemplate<String> stringRqueueRedisTemplate;
4545
private final RqueueRedisSerializer rqueueRedisSerializer;
4646
private final RqueueBeanProvider rqueueBeanProvider;
47-
private SmartMessageSerDes smartMessageSerDes;
47+
private final RqueueSerDes serDes = SerializationUtils.getSerDes();
4848

4949
public RqueueInternalPubSubChannel(
5050
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
@@ -63,10 +63,8 @@ public RqueueInternalPubSubChannel(
6363
@Override
6464
public void afterPropertiesSet() throws Exception {
6565
String channel = rqueueConfig.getInternalCommChannelName();
66-
InternalMessageListener messageListener = new InternalMessageListener();
6766
rqueueRedisListenerContainerFactory.addMessageListener(
68-
messageListener, new ChannelTopic(channel));
69-
this.smartMessageSerDes = new SmartMessageSerDes(SerializationUtils.createObjectMapper());
67+
new InternalMessageListener(), new ChannelTopic(channel));
7068
}
7169

7270
public void emitPauseUnpauseQueueEvent(PauseUnpauseQueueRequest pauseUnpauseQueueRequest) {
@@ -103,24 +101,31 @@ public void onMessage(Message message, byte[] pattern) {
103101

104102
private void processEvent(byte[] body) {
105103
log.debug("Message on internal channel {}", new String(body));
106-
RqueuePubSubEvent rqueuePubSubEvent =
107-
smartMessageSerDes.deserialize(body, RqueuePubSubEvent.class);
104+
RqueuePubSubEvent rqueuePubSubEvent;
105+
try {
106+
rqueuePubSubEvent = serDes.deserialize(body, RqueuePubSubEvent.class);
107+
} catch (Exception e) {
108+
log.error("Invalid message on pub-sub channel {}", new String(body), e);
109+
return;
110+
}
108111
if (rqueuePubSubEvent == null) {
109112
log.error("Invalid message on pub-sub channel {}", new String(body));
110113
return;
111114
}
112-
switch (rqueuePubSubEvent.getType()) {
113-
case PAUSE_QUEUE:
114-
PauseUnpauseQueueRequest request =
115-
rqueuePubSubEvent.messageAs(smartMessageSerDes, PauseUnpauseQueueRequest.class);
116-
handlePauseEvent(request);
117-
break;
118-
case QUEUE_CRUD:
119-
String queue = rqueuePubSubEvent.messageAs(smartMessageSerDes, String.class);
120-
rqueueBeanProvider.getRqueueSystemConfigDao().clearCacheByName(queue);
121-
break;
122-
default:
123-
log.error("Unknown event type {}", rqueuePubSubEvent);
115+
try {
116+
switch (rqueuePubSubEvent.getType()) {
117+
case PAUSE_QUEUE:
118+
handlePauseEvent(rqueuePubSubEvent.messageAs(serDes, PauseUnpauseQueueRequest.class));
119+
break;
120+
case QUEUE_CRUD:
121+
rqueueBeanProvider.getRqueueSystemConfigDao()
122+
.clearCacheByName(rqueuePubSubEvent.messageAs(serDes, String.class));
123+
break;
124+
default:
125+
log.error("Unknown event type {}", rqueuePubSubEvent);
126+
}
127+
} catch (Exception e) {
128+
log.error("Failed to process pub-sub event {}", rqueuePubSubEvent, e);
124129
}
125130
}
126131

0 commit comments

Comments
 (0)