Skip to content

Commit 7ba876b

Browse files
authored
GH-8108: Add DSL for Redis Queue Gateways
Related to: #8108 doc fix: there are no properties named `order` and `expect-message` in queue-inbound-gateway, replaced with `phase` and `extract-payload`, modify the properties explanation respectively. * Combine queueInboundGateway test and queueOutboundGateway test into one single test. put reply_channel into the message header for a simpler flow definition. doc format, one sentence per line. * Remove Spec suffix from all methods in `Redis.java`, and respective tests and docs. * Make ReplyChannel as a local variable to be more focused on request-reply pattern. Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
1 parent 210eb46 commit 7ba876b

File tree

5 files changed

+391
-31
lines changed

5 files changed

+391
-31
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/Redis.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(
108108
* @param key The key of the Redis collection to build on
109109
* @return the {@link RedisStoreInboundChannelAdapterSpec} instance
110110
*/
111-
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec(
111+
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapter(
112112
RedisConnectionFactory connectionFactory, String key) {
113113

114-
return storeInboundChannelAdapterSpec(connectionFactory, new LiteralExpression(key));
114+
return storeInboundChannelAdapter(connectionFactory, new LiteralExpression(key));
115115
}
116116

117117
/**
@@ -120,7 +120,7 @@ public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec
120120
* @param keyExpression The keyExpression of the Redis collection to build on
121121
* @return the {@link RedisStoreInboundChannelAdapterSpec} instance
122122
*/
123-
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec(
123+
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapter(
124124
RedisConnectionFactory connectionFactory, Expression keyExpression) {
125125

126126
return new RedisStoreInboundChannelAdapterSpec(connectionFactory, keyExpression);
@@ -132,10 +132,10 @@ public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec
132132
* @param keySupplier The keySupplier of the Redis collection to build on
133133
* @return the {@link RedisStoreInboundChannelAdapterSpec} instance
134134
*/
135-
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec(
135+
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapter(
136136
RedisConnectionFactory connectionFactory, Supplier<Message<?>> keySupplier) {
137137

138-
return storeInboundChannelAdapterSpec(connectionFactory, new SupplierExpression<>(keySupplier));
138+
return storeInboundChannelAdapter(connectionFactory, new SupplierExpression<>(keySupplier));
139139
}
140140

141141
/**
@@ -144,10 +144,10 @@ public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec
144144
* @param key The key of the Redis collection to build on
145145
* @return the {@link RedisStoreInboundChannelAdapterSpec} instance
146146
*/
147-
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec(
147+
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapter(
148148
RedisTemplate<String, ?> redisTemplate, String key) {
149149

150-
return storeInboundChannelAdapterSpec(redisTemplate, new LiteralExpression(key));
150+
return storeInboundChannelAdapter(redisTemplate, new LiteralExpression(key));
151151
}
152152

153153
/**
@@ -156,7 +156,7 @@ public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec
156156
* @param keyExpression The keyExpression of the Redis collection to build on
157157
* @return the {@link RedisStoreInboundChannelAdapterSpec} instance
158158
*/
159-
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec(
159+
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapter(
160160
RedisTemplate<String, ?> redisTemplate, Expression keyExpression) {
161161

162162
return new RedisStoreInboundChannelAdapterSpec(redisTemplate, keyExpression);
@@ -168,18 +168,18 @@ public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec
168168
* @param keySupplier The keySupplier of the Redis collection to build on
169169
* @return the {@link RedisStoreInboundChannelAdapterSpec} instance
170170
*/
171-
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapterSpec(
171+
public static RedisStoreInboundChannelAdapterSpec storeInboundChannelAdapter(
172172
RedisTemplate<String, ?> redisTemplate, Supplier<Message<?>> keySupplier) {
173173

174-
return storeInboundChannelAdapterSpec(redisTemplate, new SupplierExpression<>(keySupplier));
174+
return storeInboundChannelAdapter(redisTemplate, new SupplierExpression<>(keySupplier));
175175
}
176176

177177
/**
178178
* The factory to produce a {@link RedisStoreOutboundChannelAdapterSpec}.
179179
* @param connectionFactory the {@link RedisConnectionFactory} to build on
180180
* @return the {@link RedisStoreOutboundChannelAdapterSpec} instance
181181
*/
182-
public static RedisStoreOutboundChannelAdapterSpec storeOutboundChannelAdapterSpec(
182+
public static RedisStoreOutboundChannelAdapterSpec storeOutboundChannelAdapter(
183183
RedisConnectionFactory connectionFactory) {
184184

185185
return new RedisStoreOutboundChannelAdapterSpec(connectionFactory);
@@ -190,7 +190,7 @@ public static RedisStoreOutboundChannelAdapterSpec storeOutboundChannelAdapterSp
190190
* @param redisTemplate the {@link RedisTemplate} to build on
191191
* @return the {@link RedisStoreOutboundChannelAdapterSpec} instance
192192
*/
193-
public static RedisStoreOutboundChannelAdapterSpec storeOutboundChannelAdapterSpec(
193+
public static RedisStoreOutboundChannelAdapterSpec storeOutboundChannelAdapter(
194194
RedisTemplate<String, ?> redisTemplate) {
195195

196196
return new RedisStoreOutboundChannelAdapterSpec(redisTemplate);
@@ -201,7 +201,7 @@ public static RedisStoreOutboundChannelAdapterSpec storeOutboundChannelAdapterSp
201201
* @param redisTemplate the {@link RedisTemplate} to build on
202202
* @return the {@link RedisOutboundGatewaySpec} instance
203203
*/
204-
public static RedisOutboundGatewaySpec outboundGatewaySpec(RedisTemplate<String, ?> redisTemplate) {
204+
public static RedisOutboundGatewaySpec outboundGateway(RedisTemplate<String, ?> redisTemplate) {
205205
return new RedisOutboundGatewaySpec(redisTemplate);
206206
}
207207

@@ -210,10 +210,34 @@ public static RedisOutboundGatewaySpec outboundGatewaySpec(RedisTemplate<String,
210210
* @param connectionFactory the {@link RedisConnectionFactory} to build on
211211
* @return the {@link RedisOutboundGatewaySpec} instance
212212
*/
213-
public static RedisOutboundGatewaySpec outboundGatewaySpec(RedisConnectionFactory connectionFactory) {
213+
public static RedisOutboundGatewaySpec outboundGateway(RedisConnectionFactory connectionFactory) {
214214
return new RedisOutboundGatewaySpec(connectionFactory);
215215
}
216216

217+
/**
218+
* The factory to produce a {@link RedisQueueOutboundGatewaySpec}.
219+
* @param queueName The queueName of the Redis list to build on
220+
* @param connectionFactory the {@link RedisConnectionFactory} to build on
221+
* @return the {@link RedisQueueOutboundGatewaySpec} instance
222+
*/
223+
public static RedisQueueOutboundGatewaySpec queueOutboundGateway(String queueName,
224+
RedisConnectionFactory connectionFactory) {
225+
226+
return new RedisQueueOutboundGatewaySpec(queueName, connectionFactory);
227+
}
228+
229+
/**
230+
* The factory to produce a {@link RedisQueueInboundGatewaySpec}.
231+
* @param queueName The queueName of the Redis list to build on
232+
* @param connectionFactory the {@link RedisConnectionFactory} to build on
233+
* @return the {@link RedisQueueInboundGatewaySpec} instance
234+
*/
235+
public static RedisQueueInboundGatewaySpec queueInboundGateway(String queueName,
236+
RedisConnectionFactory connectionFactory) {
237+
238+
return new RedisQueueInboundGatewaySpec(queueName, connectionFactory);
239+
}
240+
217241
private Redis() {
218242
}
219243

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.dsl;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.Executor;
21+
22+
import org.springframework.data.redis.connection.RedisConnectionFactory;
23+
import org.springframework.data.redis.serializer.RedisSerializer;
24+
import org.springframework.integration.dsl.MessagingGatewaySpec;
25+
import org.springframework.integration.redis.inbound.RedisQueueInboundGateway;
26+
27+
/**
28+
* A {@link MessagingGatewaySpec} for a {@link RedisQueueInboundGateway}.
29+
*
30+
* @author Jiandong Ma
31+
*
32+
* @since 7.1
33+
*/
34+
public class RedisQueueInboundGatewaySpec extends MessagingGatewaySpec<RedisQueueInboundGatewaySpec, RedisQueueInboundGateway> {
35+
36+
protected RedisQueueInboundGatewaySpec(String queueName, RedisConnectionFactory connectionFactory) {
37+
super(new RedisQueueInboundGateway(queueName, connectionFactory));
38+
}
39+
40+
/**
41+
* Specify whether extract payload.
42+
* @param extractPayload the extractPayload
43+
* @return the spec
44+
* @see RedisQueueInboundGateway#setExtractPayload(boolean)
45+
*/
46+
public RedisQueueInboundGatewaySpec extractPayload(boolean extractPayload) {
47+
this.target.setExtractPayload(extractPayload);
48+
return this;
49+
}
50+
51+
/**
52+
* Specify the redis serializer.
53+
* @param serializer the serializer
54+
* @return the spec
55+
* @see RedisQueueInboundGateway#setSerializer(RedisSerializer)
56+
*/
57+
public RedisQueueInboundGatewaySpec serializer(RedisSerializer<?> serializer) {
58+
this.target.setSerializer(serializer);
59+
return this;
60+
}
61+
62+
/**
63+
* Specify the receiveTimeout.
64+
* @param receiveTimeout the receiveTimeout
65+
* @return the spec
66+
* @see RedisQueueInboundGateway#setReceiveDuration(Duration)
67+
*/
68+
public RedisQueueInboundGatewaySpec receiveTimeout(Duration receiveTimeout) {
69+
this.target.setReceiveDuration(receiveTimeout);
70+
return this;
71+
}
72+
73+
/**
74+
* Specify the receiveTimeout.
75+
* @param receiveTimeout the receiveTimeout
76+
* @return the spec
77+
* @see RedisQueueInboundGateway#setReceiveTimeout(long)
78+
*/
79+
public RedisQueueInboundGatewaySpec receiveTimeout(long receiveTimeout) {
80+
this.target.setReceiveTimeout(receiveTimeout);
81+
return this;
82+
}
83+
84+
/**
85+
* Specify an {@link Executor} for the underlying listening task.
86+
* @param taskExecutor the taskExecutor
87+
* @return the spec
88+
* @see RedisQueueInboundGateway#setTaskExecutor(Executor)
89+
*/
90+
public RedisQueueInboundGatewaySpec taskExecutor(Executor taskExecutor) {
91+
this.target.setTaskExecutor(taskExecutor);
92+
return this;
93+
}
94+
95+
/**
96+
* Specify the time (in milliseconds) for the listener task should sleep after exceptions on
97+
* the "right pop" operation before restarting the listener task.
98+
* @param recoveryInterval the recoveryInterval
99+
* @return the spec
100+
* @see RedisQueueInboundGateway#setRecoveryInterval(long)
101+
*/
102+
public RedisQueueInboundGatewaySpec recoveryInterval(long recoveryInterval) {
103+
this.target.setRecoveryInterval(recoveryInterval);
104+
return this;
105+
}
106+
107+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.dsl;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.data.redis.connection.RedisConnectionFactory;
22+
import org.springframework.data.redis.serializer.RedisSerializer;
23+
import org.springframework.integration.dsl.MessageHandlerSpec;
24+
import org.springframework.integration.redis.outbound.RedisQueueOutboundGateway;
25+
26+
/**
27+
* A {@link MessageHandlerSpec} for a {@link RedisQueueOutboundGateway}.
28+
*
29+
* @author Jiandong Ma
30+
*
31+
* @since 7.1
32+
*/
33+
public class RedisQueueOutboundGatewaySpec extends
34+
MessageHandlerSpec<RedisQueueOutboundGatewaySpec, RedisQueueOutboundGateway> {
35+
36+
protected RedisQueueOutboundGatewaySpec(String queueName, RedisConnectionFactory connectionFactory) {
37+
this.target = new RedisQueueOutboundGateway(queueName, connectionFactory);
38+
}
39+
40+
/**
41+
* Specify the receiveTimeout.
42+
* @param receiveTimeout the receiveTimeout
43+
* @return the spec
44+
* @see RedisQueueOutboundGateway#setReceiveDuration(Duration)
45+
*/
46+
public RedisQueueOutboundGatewaySpec receiveTimeout(Duration receiveTimeout) {
47+
this.target.setReceiveDuration(receiveTimeout);
48+
return this;
49+
}
50+
51+
/**
52+
* Specify the receiveTimeout.
53+
* @param receiveTimeout the receiveTimeout
54+
* @return the spec
55+
* @see RedisQueueOutboundGateway#setReceiveTimeout(long)
56+
*/
57+
public RedisQueueOutboundGatewaySpec receiveTimeout(long receiveTimeout) {
58+
this.target.setReceiveTimeout(receiveTimeout);
59+
return this;
60+
}
61+
62+
/**
63+
* Specify whether extract payload.
64+
* @param extractPayload the extractPayload
65+
* @return the spec
66+
* @see RedisQueueOutboundGateway#setExtractPayload(boolean)
67+
*/
68+
public RedisQueueOutboundGatewaySpec extractPayload(boolean extractPayload) {
69+
this.target.setExtractPayload(extractPayload);
70+
return this;
71+
}
72+
73+
/**
74+
* Specify the redis serializer.
75+
* @param serializer the serializer
76+
* @return the spec
77+
* @see RedisQueueOutboundGateway#setSerializer(RedisSerializer)
78+
*/
79+
public RedisQueueOutboundGatewaySpec serializer(RedisSerializer<?> serializer) {
80+
this.target.setSerializer(serializer);
81+
return this;
82+
}
83+
84+
}

0 commit comments

Comments
 (0)