Skip to content

Commit 573f1e9

Browse files
authored
Add DSL for Redis Queue channel adapters
Related to: #8108 Also add respective Java DSL and Java usage in redis.adoc. Additionally, add missing MessageConverter sample for channel adapters in redis.adoc. * Fix sample in redis.adoc. * Add an overload method based on function to determine queue name. Refine Javadoc and update redis.adoc documentation * Add minor description in Javadoc. revise the length of code lines, less than 120 symbols per line. Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
1 parent 0ee1435 commit 573f1e9

7 files changed

Lines changed: 460 additions & 10 deletions

File tree

spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/Redis.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616

1717
package org.springframework.integration.redis.dsl;
1818

19+
import java.util.function.Function;
20+
1921
import org.springframework.data.redis.connection.RedisConnectionFactory;
22+
import org.springframework.expression.Expression;
23+
import org.springframework.integration.expression.FunctionExpression;
24+
import org.springframework.messaging.Message;
2025

2126
/**
2227
* Factory class for Redis components.
@@ -45,6 +50,54 @@ public static RedisOutboundChannelAdapterSpec outboundChannelAdapter(RedisConnec
4550
return new RedisOutboundChannelAdapterSpec(connectionFactory);
4651
}
4752

53+
/**
54+
* The factory to produce a {@link RedisQueueInboundChannelAdapterSpec}.
55+
* @param queueName The queueName of the Redis list to build on
56+
* @param connectionFactory the {@link RedisConnectionFactory} to build on
57+
* @return the {@link RedisQueueInboundChannelAdapterSpec} instance
58+
*/
59+
public static RedisQueueInboundChannelAdapterSpec queueInboundChannelAdapter(String queueName,
60+
RedisConnectionFactory connectionFactory) {
61+
62+
return new RedisQueueInboundChannelAdapterSpec(queueName, connectionFactory);
63+
}
64+
65+
/**
66+
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
67+
* @param queueName The queueName of the Redis list to build on
68+
* @param connectionFactory the {@link RedisConnectionFactory} to build on
69+
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
70+
*/
71+
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(String queueName,
72+
RedisConnectionFactory connectionFactory) {
73+
74+
return new RedisQueueOutboundChannelAdapterSpec(queueName, connectionFactory);
75+
}
76+
77+
/**
78+
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
79+
* @param queueExpression The queueExpression of the Redis list to build on
80+
* @param connectionFactory the {@link RedisConnectionFactory} to build on
81+
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
82+
*/
83+
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(Expression queueExpression,
84+
RedisConnectionFactory connectionFactory) {
85+
86+
return new RedisQueueOutboundChannelAdapterSpec(queueExpression, connectionFactory);
87+
}
88+
89+
/**
90+
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
91+
* @param queueFunction The queueExpression of the Redis list to build on
92+
* @param connectionFactory the {@link RedisConnectionFactory} to build on
93+
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
94+
*/
95+
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(
96+
Function<Message<?>, String> queueFunction, RedisConnectionFactory connectionFactory) {
97+
98+
return new RedisQueueOutboundChannelAdapterSpec(new FunctionExpression<>(queueFunction), connectionFactory);
99+
}
100+
48101
private Redis() {
49102
}
50103

spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisInboundChannelAdapterSpec.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,21 @@
2525
import org.springframework.messaging.converter.MessageConverter;
2626

2727
/**
28-
* A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapterSpec}.
28+
* A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapter}.
2929
*
3030
* @author Jiandong Ma
3131
*
3232
* @since 7.1
3333
*/
34-
public class RedisInboundChannelAdapterSpec extends MessageProducerSpec<RedisInboundChannelAdapterSpec, RedisInboundChannelAdapter> {
34+
public class RedisInboundChannelAdapterSpec extends
35+
MessageProducerSpec<RedisInboundChannelAdapterSpec, RedisInboundChannelAdapter> {
3536

3637
protected RedisInboundChannelAdapterSpec(RedisConnectionFactory connectionFactory) {
3738
this.target = new RedisInboundChannelAdapter(connectionFactory);
3839
}
3940

4041
/**
42+
* Specify the RedisSerializer to deserialize the body of Redis messages.
4143
* @param serializer the serializer
4244
* @return the spec
4345
* @see RedisInboundChannelAdapter#setSerializer(RedisSerializer)
@@ -48,6 +50,7 @@ public RedisInboundChannelAdapterSpec serializer(RedisSerializer<?> serializer)
4850
}
4951

5052
/**
53+
* Specify the topics to subscribe.
5154
* @param topics the topics
5255
* @return the spec
5356
* @see RedisInboundChannelAdapter#setTopics(String...)
@@ -58,6 +61,7 @@ public RedisInboundChannelAdapterSpec topics(String... topics) {
5861
}
5962

6063
/**
64+
* Specify the topicPatterns to subscribe.
6165
* @param topicPatterns the topicPatterns
6266
* @return the spec
6367
* @see RedisInboundChannelAdapter#setTopicPatterns(String...)
@@ -68,6 +72,7 @@ public RedisInboundChannelAdapterSpec topicPatterns(String... topicPatterns) {
6872
}
6973

7074
/**
75+
* Specify the messageConverter to convert between Redis messages and Spring message payloads.
7176
* @param messageConverter the messageConverter
7277
* @return the spec
7378
* @see RedisInboundChannelAdapter#setMessageConverter(MessageConverter)
@@ -78,6 +83,7 @@ public RedisInboundChannelAdapterSpec messageConverter(MessageConverter messageC
7883
}
7984

8085
/**
86+
* Specify an {@link Executor} for running the message listeners when messages are received.
8187
* @param taskExecutor the taskExecutor
8288
* @return the spec
8389
* @see RedisInboundChannelAdapter#setTaskExecutor(Executor)

spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisOutboundChannelAdapterSpec.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,21 @@
2828
import org.springframework.messaging.converter.MessageConverter;
2929

3030
/**
31-
* A {@link MessageHandlerSpec} for a {@link RedisOutboundChannelAdapterSpec}.
31+
* A {@link MessageHandlerSpec} for a {@link RedisPublishingMessageHandler}.
3232
*
3333
* @author Jiandong Ma
3434
*
3535
* @since 7.1
3636
*/
37-
public class RedisOutboundChannelAdapterSpec extends MessageHandlerSpec<RedisOutboundChannelAdapterSpec, RedisPublishingMessageHandler> {
37+
public class RedisOutboundChannelAdapterSpec extends
38+
MessageHandlerSpec<RedisOutboundChannelAdapterSpec, RedisPublishingMessageHandler> {
3839

3940
protected RedisOutboundChannelAdapterSpec(RedisConnectionFactory connectionFactory) {
4041
this.target = new RedisPublishingMessageHandler(connectionFactory);
4142
}
4243

4344
/**
45+
* Specify the RedisSerializer to serialize data before sending to the Redis.
4446
* @param serializer the serializer
4547
* @return the spec
4648
* @see RedisPublishingMessageHandler#setSerializer(RedisSerializer)
@@ -51,6 +53,7 @@ public RedisOutboundChannelAdapterSpec serializer(RedisSerializer<?> serializer)
5153
}
5254

5355
/**
56+
* Specify the messageConverter to convert between Redis messages and Spring message payloads.
5457
* @param messageConverter the messageConverter
5558
* @return the spec
5659
* @see RedisPublishingMessageHandler#setMessageConverter(MessageConverter)
@@ -61,6 +64,7 @@ public RedisOutboundChannelAdapterSpec messageConverter(MessageConverter message
6164
}
6265

6366
/**
67+
* Specify the topic to publish messages.
6468
* @param topic the topic
6569
* @return the spec
6670
* @see RedisPublishingMessageHandler#setTopic(String)
@@ -71,6 +75,7 @@ public RedisOutboundChannelAdapterSpec topic(String topic) {
7175
}
7276

7377
/**
78+
* Specify the topicExpression to determine the topic.
7479
* @param topicExpression the topicExpression
7580
* @return the spec
7681
* @see RedisPublishingMessageHandler#setTopicExpression(Expression)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.dsl;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.Executor;
21+
22+
import org.springframework.data.redis.connection.RedisConnectionFactory;
23+
import org.springframework.data.redis.serializer.RedisSerializer;
24+
import org.springframework.integration.dsl.MessageProducerSpec;
25+
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;
26+
27+
/**
28+
* A {@link MessageProducerSpec} for a {@link RedisQueueMessageDrivenEndpoint}.
29+
*
30+
* @author Jiandong Ma
31+
*
32+
* @since 7.1
33+
*/
34+
public class RedisQueueInboundChannelAdapterSpec extends
35+
MessageProducerSpec<RedisQueueInboundChannelAdapterSpec, RedisQueueMessageDrivenEndpoint> {
36+
37+
protected RedisQueueInboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) {
38+
this.target = new RedisQueueMessageDrivenEndpoint(queueName, connectionFactory);
39+
}
40+
41+
/**
42+
* Specify the RedisSerializer to deserialize the body of Redis messages.
43+
* @param serializer the serializer
44+
* @return the spec
45+
* @see RedisQueueMessageDrivenEndpoint#setSerializer(RedisSerializer)
46+
*/
47+
public RedisQueueInboundChannelAdapterSpec serializer(RedisSerializer<?> serializer) {
48+
this.target.setSerializer(serializer);
49+
return this;
50+
}
51+
52+
/**
53+
* Specify whether expects data from the Redis queue to contain entire Message instances.
54+
* @param expectMessage the expectMessage
55+
* @return the spec
56+
* @see RedisQueueMessageDrivenEndpoint#setExpectMessage(boolean)
57+
*/
58+
public RedisQueueInboundChannelAdapterSpec expectMessage(boolean expectMessage) {
59+
this.target.setExpectMessage(expectMessage);
60+
return this;
61+
}
62+
63+
/**
64+
* Specify the timeout for 'pop' operation to wait for a Redis message from the Redis Queue.
65+
* @param receiveTimeout the receiveTimeout
66+
* @return the spec
67+
* @see RedisQueueMessageDrivenEndpoint#setReceiveDuration(Duration)
68+
*/
69+
public RedisQueueInboundChannelAdapterSpec receiveDuration(Duration receiveTimeout) {
70+
this.target.setReceiveDuration(receiveTimeout);
71+
return this;
72+
}
73+
74+
/**
75+
* Specify the timeout for 'pop' operation to wait for a Redis message from the Redis Queue.
76+
* @param receiveTimeout the receiveTimeout
77+
* @return the spec
78+
* @see RedisQueueMessageDrivenEndpoint#setReceiveTimeout(long)
79+
*/
80+
public RedisQueueInboundChannelAdapterSpec receiveTimeout(long receiveTimeout) {
81+
this.target.setReceiveTimeout(receiveTimeout);
82+
return this;
83+
}
84+
85+
/**
86+
* Specify an {@link Executor} for the underlying listening task.
87+
* @param taskExecutor the taskExecutor
88+
* @return the spec
89+
* @see RedisQueueMessageDrivenEndpoint#setTaskExecutor(Executor)
90+
*/
91+
public RedisQueueInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) {
92+
this.target.setTaskExecutor(taskExecutor);
93+
return this;
94+
}
95+
96+
/**
97+
* Specify the time in milliseconds for the underlying listening task should sleep
98+
* after exceptions on the 'pop' operation.
99+
* @param recoveryInterval the recoveryInterval
100+
* @return the spec
101+
* @see RedisQueueMessageDrivenEndpoint#setRecoveryInterval(long)
102+
*/
103+
public RedisQueueInboundChannelAdapterSpec recoveryInterval(long recoveryInterval) {
104+
this.target.setRecoveryInterval(recoveryInterval);
105+
return this;
106+
}
107+
108+
/**
109+
* Specify use "right pop" or "left pop" to read messages from the Redis Queue.
110+
* @param rightPop the rightPop
111+
* @return the spec
112+
* @see RedisQueueMessageDrivenEndpoint#setRightPop(boolean)
113+
*/
114+
public RedisQueueInboundChannelAdapterSpec rightPop(boolean rightPop) {
115+
this.target.setRightPop(rightPop);
116+
return this;
117+
}
118+
119+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.dsl;
18+
19+
import org.springframework.data.redis.connection.RedisConnectionFactory;
20+
import org.springframework.data.redis.serializer.RedisSerializer;
21+
import org.springframework.expression.Expression;
22+
import org.springframework.integration.dsl.MessageHandlerSpec;
23+
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;
24+
25+
/**
26+
* A {@link MessageHandlerSpec} for a {@link RedisQueueOutboundChannelAdapter}.
27+
*
28+
* @author Jiandong Ma
29+
*
30+
* @since 7.1
31+
*/
32+
public class RedisQueueOutboundChannelAdapterSpec extends
33+
MessageHandlerSpec<RedisQueueOutboundChannelAdapterSpec, RedisQueueOutboundChannelAdapter> {
34+
35+
protected RedisQueueOutboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) {
36+
this.target = new RedisQueueOutboundChannelAdapter(queueName, connectionFactory);
37+
}
38+
39+
protected RedisQueueOutboundChannelAdapterSpec(Expression queueExpression, RedisConnectionFactory connectionFactory) {
40+
this.target = new RedisQueueOutboundChannelAdapter(queueExpression, connectionFactory);
41+
}
42+
43+
/**
44+
* Specify send only the payload or the entire Message to the Redis queue.
45+
* @param extractPayload the extractPayload
46+
* @return the spec
47+
* @see RedisQueueOutboundChannelAdapter#setExtractPayload(boolean)
48+
*/
49+
public RedisQueueOutboundChannelAdapterSpec extractPayload(boolean extractPayload) {
50+
this.target.setExtractPayload(extractPayload);
51+
return this;
52+
}
53+
54+
/**
55+
* Specify the RedisSerializer to serialize data before sending to the Redis Queue.
56+
* @param serializer the serializer
57+
* @return the spec
58+
* @see RedisQueueOutboundChannelAdapter#setSerializer(RedisSerializer)
59+
*/
60+
public RedisQueueOutboundChannelAdapterSpec serializer(RedisSerializer<?> serializer) {
61+
this.target.setSerializer(serializer);
62+
return this;
63+
}
64+
65+
/**
66+
* Specify use "left push" or "right push" to write messages to the Redis Queue.
67+
* @param leftPush the leftPush
68+
* @return the spec
69+
* @see RedisQueueOutboundChannelAdapter#setLeftPush(boolean)
70+
*/
71+
public RedisQueueOutboundChannelAdapterSpec leftPush(boolean leftPush) {
72+
this.target.setLeftPush(leftPush);
73+
return this;
74+
}
75+
76+
}

0 commit comments

Comments
 (0)