Skip to content
This repository was archived by the owner on Jul 30, 2020. It is now read-only.

Commit ca29b17

Browse files
committed
Merge branch 'integration-module' of https://github.com/pablo-parra/devon into pablo-parra-integration-module4
2 parents 39c045a + 8892054 commit ca29b17

26 files changed

Lines changed: 366 additions & 327 deletions

modules/integration/src/main/java/com/capgemini/devonfw/module/integration/common/api/Handler.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

modules/integration/src/main/java/com/capgemini/devonfw/module/integration/common/api/Integration.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import org.apache.activemq.broker.region.Queue;
77
import org.springframework.messaging.Message;
8-
import org.springframework.messaging.MessageHandler;
98
import org.springframework.messaging.MessageHeaders;
109

1110
/**
@@ -73,27 +72,26 @@ public interface Integration {
7372
* Subscribes to the default simple channel. The subscriber will be listening to the channel and polling it for new
7473
* messages in an interval of time configured with the property "integration.one-direction.poller.rate"
7574
*
76-
* @param messageHandler the {@link MessageHandler} in charge of managing each received {@link Message}
75+
* @param messageHandler the {@link SubscriptionHandler} in charge of managing each received {@link Message}
7776
*/
78-
void subscribe(MessageHandler messageHandler);
77+
void subscribe(SubscriptionHandler messageHandler);
7978

8079
/**
8180
* Subscribes to the default asynchronous request-reply channel. The subscriber will be listening to the channel and
82-
* will generate a response with the {@link IntegrationHandler}.
81+
* will generate a response with the {@link RequestAsyncHandler}.
8382
*
84-
* @param handler the {@link IntegrationHandler} in charge of managing each received {@link Message} and provide an
83+
* @param handler the {@link RequestAsyncHandler} in charge of managing each received {@link Message} and provide an
8584
* asynchronous response
8685
*/
87-
void subscribeAsync(IntegrationHandler handler);
86+
void subscribeAsync(RequestAsyncHandler handler);
8887

8988
/**
9089
* Subscribes to the default request-reply channel. The subscriber will be listening to the channel and will generate
91-
* a response with the {@link IntegrationHandler}.
90+
* a response with the {@link RequestHandler}.
9291
*
93-
* @param handler the {@link IntegrationHandler} in charge of managing each received {@link Message} and return a
94-
* response
92+
* @param handler the {@link RequestHandler} in charge of managing each received {@link Message} and return a response
9593
*/
96-
void subscribeAndReply(IntegrationHandler handler);
94+
void subscribeAndReply(RequestHandler handler);
9795

9896
// Implementation for new created Channels
9997

@@ -103,42 +101,41 @@ public interface Integration {
103101
*
104102
* @param channelName the {@link IntegrationChannel} to be subscribed to
105103
* @param queueName the {@link Queue} to be listening to
106-
* @param messageHandler the {@link MessageHandler} handler to manage each received message
104+
* @param messageHandler the {@link SubscriptionHandler} handler to manage each received message
107105
*/
108-
void subscribeTo(String channelName, String queueName, MessageHandler messageHandler);
106+
void subscribeTo(String channelName, String queueName, SubscriptionHandler messageHandler);
109107

110108
/**
111109
* Subscribes to a new simple channel, no response is sent. The subscriber will be listening to the channel and
112110
* polling it for new messages.
113111
*
114112
* @param channelName the {@link IntegrationChannel} to be subscribed to
115113
* @param queueName the {@link Queue} to be listening to
116-
* @param messageHandler the {@link MessageHandler} handler to manage each received message
114+
* @param messageHandler the {@link SubscriptionHandler} handler to manage each received message
117115
* @param pollRate the time interval for making the poll to the message broker to check new messages
118116
*/
119-
void subscribeTo(String channelName, String queueName, MessageHandler messageHandler, long pollRate);
117+
void subscribeTo(String channelName, String queueName, SubscriptionHandler messageHandler, long pollRate);
120118

121119
/**
122120
* Subscribes to a new request-reply channel. The subscriber will be listening to the channel for new messages and
123121
* creating a response for each one with the {@link IntegrationHandler}.
124122
*
125123
* @param channelName channelName the {@link IntegrationChannel} to be subscribed to
126124
* @param queueName queueName the {@link Queue} to be listening to
127-
* @param handler the {@link IntegrationHandler} in charge of managing each received {@link Message} and return a
128-
* response
125+
* @param handler the {@link RequestHandler} in charge of managing each received {@link Message} and return a response
129126
*/
130-
void subscribeAndReplyTo(String channelName, String queueName, IntegrationHandler handler);
127+
void subscribeAndReplyTo(String channelName, String queueName, RequestHandler handler);
131128

132129
/**
133130
* Subscribes to a new asynchronous request-reply channel. The subscriber will be listening to the channel for new
134131
* messages and creating a response for each one with the {@link IntegrationHandler}.
135132
*
136133
* @param channelName channelName the {@link IntegrationChannel} to be subscribed to
137134
* @param queueName queueName the {@link Queue} to be listening to
138-
* @param handler the {@link IntegrationHandler} in charge of managing each received {@link Message} and return a
135+
* @param handler the {@link RequestAsyncHandler} in charge of managing each received {@link Message} and return a
139136
* response
140137
*/
141-
void subscribeAndReplyAsyncTo(String channelName, String queueName, IntegrationHandler handler);
138+
void subscribeAndReplyAsyncTo(String channelName, String queueName, RequestAsyncHandler handler);
142139

143140
/**
144141
* Creates a new simple {@link IntegrationChannel}
@@ -155,46 +152,46 @@ public interface Integration {
155152
*
156153
* @param channelName name for the new {@link IntegrationChannel}
157154
* @param queueName name for the new {@link Queue}
158-
* @param messageHandler the {@link MessageHandler} handler to manage each received response
155+
* @param responseHandler the {@link ResponseHandler} handler to manage each received response
159156
* @return the new {@link IntegrationChannel}
160157
*/
161-
IntegrationChannel createRequestReplyChannel(String channelName, String queueName, MessageHandler messageHandler);
158+
IntegrationChannel createRequestReplyChannel(String channelName, String queueName, ResponseHandler responseHandler);
162159

163160
/**
164161
* Creates a new request-reply {@link IntegrationChannel}.
165162
*
166163
* @param channelName name for the new {@link IntegrationChannel}
167164
* @param queueName name for the new {@link Queue}
168-
* @param messageHandler the {@link MessageHandler} handler to manage each received response
165+
* @param responseHandler the {@link ResponseHandler} handler to manage each received response
169166
* @param receiveTimeout the waiting time for a response after sending a {@link Message}
170167
* @return the new {@link IntegrationChannel}
171168
*/
172-
IntegrationChannel createRequestReplyChannel(String channelName, String queueName, MessageHandler messageHandler,
169+
IntegrationChannel createRequestReplyChannel(String channelName, String queueName, ResponseHandler responseHandler,
173170
long receiveTimeout);
174171

175172
/**
176173
* Creates a new asynchronous request-reply {@link IntegrationChannel}. The core pool size for the asynchronous task
177174
* executor is configured with the property "integration.default.poolsize".
178-
*
175+
*
179176
* @param channelName name for the new {@link IntegrationChannel}
180177
* @param queueName name for the new {@link Queue}
181-
* @param messageHandler the {@link MessageHandler} handler to manage each received response
178+
* @param responseHandler the {@link ResponseHandler} handler to manage each received response
182179
* @return the new {@link IntegrationChannel}
183180
*/
184181
IntegrationChannel createAsyncRequestReplyChannel(String channelName, String queueName,
185-
MessageHandler messageHandler);
182+
ResponseHandler responseHandler);
186183

187184
/**
188185
* Creates a new asynchronous request-reply {@link IntegrationChannel}.
189186
*
190187
* @param channelName name for the new {@link IntegrationChannel}
191188
* @param queueName name for the new {@link Queue}
192-
* @param messageHandler the {@link MessageHandler} handler to manage each received response
189+
* @param responseHandler the {@link ResponseHandler} handler to manage each received response
193190
* @param poolSize ThreadPoolExecutor's core pool size
194191
* @param receiveTimeout the waiting time for a response after sending a {@link Message}
195192
* @return the new {@link IntegrationChannel}
196193
*/
197-
IntegrationChannel createAsyncRequestReplyChannel(String channelName, String queueName, MessageHandler messageHandler,
198-
int poolSize, long receiveTimeout);
194+
IntegrationChannel createAsyncRequestReplyChannel(String channelName, String queueName,
195+
ResponseHandler responseHandler, int poolSize, long receiveTimeout);
199196

200197
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.capgemini.devonfw.module.integration.common.api;
2+
3+
/**
4+
* @author pparrado
5+
*
6+
*/
7+
public interface RequestAsyncHandler extends IntegrationHandler {
8+
9+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.capgemini.devonfw.module.integration.common.api;
2+
3+
/**
4+
* @author pparrado
5+
*
6+
*/
7+
public interface RequestHandler extends IntegrationHandler {
8+
9+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.capgemini.devonfw.module.integration.common.api;
2+
3+
import org.springframework.messaging.MessageHandler;
4+
5+
/**
6+
* @author pparrado
7+
*
8+
*/
9+
public interface ResponseHandler extends MessageHandler {
10+
11+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.capgemini.devonfw.module.integration.common.api;
2+
3+
import org.springframework.messaging.MessageHandler;
4+
5+
/**
6+
* @author pparrado
7+
*
8+
*/
9+
public interface SubscriptionHandler extends MessageHandler {
10+
11+
}

modules/integration/src/main/java/com/capgemini/devonfw/module/integration/common/config/IntegrationConfig.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.springframework.messaging.support.GenericMessage;
2626

2727
import com.capgemini.devonfw.module.integration.common.api.IntegrationHandler;
28+
import com.capgemini.devonfw.module.integration.common.api.RequestAsyncHandler;
29+
import com.capgemini.devonfw.module.integration.common.api.RequestHandler;
30+
import com.capgemini.devonfw.module.integration.common.api.SubscriptionHandler;
2831

2932
/**
3033
* @author pparrado
@@ -39,31 +42,31 @@ public class IntegrationConfig {
3942
@Inject
4043
private ConnectionFactory connectionFactory;
4144

42-
@Value("${integration.one-direction.channelname}")
45+
@Value("${devonfw.integration.one-direction.channelname}")
4346
private String channel_1d;
4447

45-
@Value("${integration.one-direction.queuename}")
48+
@Value("${devonfw.integration.one-direction.queuename}")
4649
private String queue_1d;
4750

48-
@Value("${integration.request-reply.channelname}")
51+
@Value("${devonfw.integration.request-reply.channelname}")
4952
private String channel_rr;
5053

51-
@Value("${integration.request-reply.queuename}")
54+
@Value("${devonfw.integration.request-reply.queuename}")
5255
private String queue_rr;
5356

54-
@Value("${integration.request-reply-async.channelname}")
57+
@Value("${devonfw.integration.request-reply-async.channelname}")
5558
private String channel_async;
5659

57-
@Value("${integration.request-reply-async.queuename}")
60+
@Value("${devonfw.integration.request-reply-async.queuename}")
5861
private String queue_async;
5962

60-
@Value("${integration.one-direction.poller.rate}")
63+
@Value("${devonfw.integration.one-direction.poller.rate}")
6164
private int rate;
6265

63-
@Value("${integration.request-reply.receivetimeout}")
66+
@Value("${devonfw.integration.request-reply.receivetimeout}")
6467
private long rr_timeout;
6568

66-
@Value("${integration.request-reply-async.receivetimeout}")
69+
@Value("${devonfw.integration.request-reply-async.receivetimeout}")
6770
private long rra_timeout;
6871

6972
// PRECONFIGURED GATEWAYS - - - - - - - - - - - - - - - - - - - - - -
@@ -127,7 +130,7 @@ public interface AsyncGateway {
127130
* @return the created {@link IntegrationFlow}
128131
*/
129132
@Bean
130-
@ConditionalOnProperty(prefix = "integration.one-direction", name = "emitter", havingValue = "true")
133+
@ConditionalOnProperty(prefix = "devonfw.integration.one-direction", name = "emitter", havingValue = "true")
131134
IntegrationFlow outFlow() {
132135

133136
return IntegrationFlows.from(this.channel_1d)
@@ -142,7 +145,7 @@ IntegrationFlow outFlow() {
142145
* @return the created {@link IntegrationFlow}
143146
*/
144147
@Bean
145-
@ConditionalOnProperty(prefix = "integration.request-reply", name = "emitter", havingValue = "true")
148+
@ConditionalOnProperty(prefix = "devonfw.integration.request-reply", name = "emitter", havingValue = "true")
146149
public IntegrationFlow outAndInFlow() {
147150

148151
return IntegrationFlows.from(this.channel_rr).handle(
@@ -157,7 +160,7 @@ public IntegrationFlow outAndInFlow() {
157160
* @return the created {@link IntegrationFlow}
158161
*/
159162
@Bean
160-
@ConditionalOnProperty(prefix = "integration.request-reply-async", name = "emitter", havingValue = "true")
163+
@ConditionalOnProperty(prefix = "devonfw.integration.request-reply-async", name = "emitter", havingValue = "true")
161164
public IntegrationFlow asyncOutboundFlow() {
162165

163166
return IntegrationFlows.from(this.channel_async).handle(Jms.outboundGateway(this.connectionFactory)
@@ -167,63 +170,63 @@ public IntegrationFlow asyncOutboundFlow() {
167170
// in
168171

169172
/**
170-
* If the property "integration.one-direction.listener" has value "true" this Bean will be loaded and will create a
173+
* If the property "integration.one-direction.subscriber" has value "true" this Bean will be loaded and will create a
171174
* simple flow for receiving messages.
172175
*
173176
* @param handler the {@link MessageHandler} that will manage each message received.
174177
* @return the created {@link IntegrationFlow}
175178
*/
176179
@Bean
177-
@ConditionalOnProperty(prefix = "integration.one-direction", name = "listener", havingValue = "true")
178-
public IntegrationFlow inFlow(MessageHandler handler) {
180+
@ConditionalOnProperty(prefix = "devonfw.integration.one-direction", name = "subscriber", havingValue = "true")
181+
public IntegrationFlow inFlow(SubscriptionHandler handler) {
179182

180183
return IntegrationFlows.from(Jms.inboundAdapter(this.connectionFactory).destination(this.queue_1d),
181184
c -> c.poller(Pollers.fixedRate(this.rate, TimeUnit.MILLISECONDS))).handle(m -> {
182185
try {
183186
handler.handleMessage(m);
184187
} catch (Exception e) {
185-
LOG.error(String.format("MessageHandler threw an error: %s", e.getMessage()), e);
188+
LOG.error(String.format("SubscriptionHandler threw an error: %s", e.getMessage()), e);
186189
}
187190
}).get();
188191
}
189192

190193
/**
191-
* If the property "integration.request-reply.listener" has value "true" this Bean will be loaded and will create a
194+
* If the property "integration.request-reply.subscriber" has value "true" this Bean will be loaded and will create a
192195
* request-reply flow for receiving and replying to messages.
193196
*
194197
* @param handler the {@link IntegrationHandler} to manage the messages and send back the response
195198
* @return the created {@link IntegrationFlow}
196199
*/
197200
@Bean
198-
@ConditionalOnProperty(prefix = "integration.request-reply", name = "listener", havingValue = "true")
199-
public IntegrationFlow inAndOutFlow(IntegrationHandler handler) {
201+
@ConditionalOnProperty(prefix = "devonfw.integration.request-reply", name = "subscriber", havingValue = "true")
202+
public IntegrationFlow inAndOutFlow(RequestHandler handler) {
200203

201204
return IntegrationFlows.from(Jms.inboundGateway(this.connectionFactory).destination(this.queue_rr))
202-
.wireTap(flow -> flow.handle(System.out::println)).handle(new GenericHandler<String>() {
205+
.wireTap(flow -> flow.handle(m -> LOG.info(m.getPayload().toString()))).handle(new GenericHandler<String>() {
203206

204207
@Override
205208
public Object handle(String payload, Map<String, Object> headers) {
206209

207210
try {
208211
return handler.handleMessage(new GenericMessage<>(payload, headers));
209212
} catch (Exception e) {
210-
LOG.error(String.format("IntegrationHandler threw an error: %s", e.getMessage()), e);
213+
LOG.error(String.format("RequestHandler threw an error: %s", e.getMessage()), e);
211214
return null;
212215
}
213216
}
214217
}).get();
215218
}
216219

217220
/**
218-
* If the property "integration.request-reply-async.listener" has value "true" this Bean will be loaded and will
221+
* If the property "integration.request-reply-async.subscriber" has value "true" this Bean will be loaded and will
219222
* create a request-reply asynchronous flow for sending messages and receiving responses.
220223
*
221224
* @param handler the {@link IntegrationHandler} to manage the messages and send back the response
222225
* @return the created {@link IntegrationFlow}
223226
*/
224227
@Bean
225-
@ConditionalOnProperty(prefix = "integration.request-reply-async", name = "listener", havingValue = "true")
226-
public IntegrationFlow asyncInAndOutFlow(IntegrationHandler handler) {
228+
@ConditionalOnProperty(prefix = "devonfw.integration.request-reply-async", name = "subscriber", havingValue = "true")
229+
public IntegrationFlow asyncInAndOutFlow(RequestAsyncHandler handler) {
227230

228231
return IntegrationFlows.from(Jms.inboundGateway(this.connectionFactory).destination(this.queue_async))
229232
.wireTap(flow -> flow.handle(System.out::println)).handle(new GenericHandler<String>() {
@@ -233,7 +236,7 @@ public Object handle(String payload, Map<String, Object> headers) {
233236
try {
234237
return handler.handleMessage(new GenericMessage<>(payload, headers));
235238
} catch (Exception e) {
236-
LOG.error(String.format("IntegrationHandler threw an error: %s", e.getMessage()), e);
239+
LOG.error(String.format("RequestAsyncHandler threw an error: %s", e.getMessage()), e);
237240
return null;
238241
}
239242
}

0 commit comments

Comments
 (0)