Skip to content

Commit 5f9ebd0

Browse files
committed
Transferred documentation over to some of the main classes. Resolved a bug that resulted in a silent crash when the ActiveMQ reported null messages in the queue. Added an ErrorHandler method that allows you to tie into exceptions that can occur during the ecs control loop
1 parent ee6e1ec commit 5f9ebd0

4 files changed

Lines changed: 345 additions & 62 deletions

File tree

src/main/java/com/ProActiveQueue/ProActiveQueueClient/Connection/ActiveMQMessageConsumerImpl.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,13 @@ public void closeConsumers() throws JMSException {
7373
}
7474

7575
/**
76-
* ASYNC Message Receive
77-
* @param onMessage
78-
* @param clazz
79-
* @param properties
80-
* @param <T>
76+
* <b>ASYNC Message Receive</b>
77+
* <p>This consumer operates completely within a transaction and will rollback all messages if there is an
78+
* Exception during message processing <i>(While executing the onMessage method</i></p>
79+
* @param onMessage a consumer method (typically lambda) that will process a message when received
80+
* @param clazz The class of the message that will be received, for deserialization
81+
* @param properties a MessageProperties object that is used to filter the requested messages
82+
* @param <T> The message class
8183
*/
8284
public <T> void onMessageReceived(Consumer<T> onMessage, Class<T> clazz, MessageProperties properties) throws JMSException {
8385
Session session = getSession(true, Session.CLIENT_ACKNOWLEDGE, true);
@@ -120,12 +122,16 @@ public void onMessage(Message message) {
120122
}
121123

122124
/**
123-
* SYNCHRONOUS Message Receive
124-
* @param onMessage
125+
* <b>
126+
* SYNCHRONOUS Message Receive
127+
* </b>
128+
* <p>This consumer operates completely within a transaction and will rollback all messages if there is an
129+
* Exception during message processing <i>(While executing the onMessage method</i></p>
130+
* @param onMessage a consumer method (typically lambda) that will process the list of messages when received
125131
* @param numberOfMessages
126-
* @param clazz
127-
* @param props
128-
* @param <T>
132+
* @param clazz The class of the message that will be received, for deserialization
133+
* @param props a MessageProperties object that is used to filter the requested messages
134+
* @param <T> The message class
129135
* @throws JMSException
130136
* @throws JsonProcessingException
131137
*/

src/main/java/com/ProActiveQueue/ProActiveQueueClient/Connection/MessageClient.java

Lines changed: 183 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,32 @@
1515
import java.util.function.Function;
1616

1717
/**
18+
* <p>
1819
* This class is a mid-level abstraction over the activeMQ message library. It allows for Topic or Queue
1920
* connection, filtering using MessageProperties, and async or sync message operations.
21+
* </p>
2022
*
21-
* Low Level: ActiveMQMessageConsumerImpl, ActiveMQMessageProducer
22-
* Mid Level: MessageClient
23-
* High Level: A Destination specific class that extends MessageClient and overrides methods
23+
* <ul>
24+
* <li>
25+
* <b>Low Level</b>: ActiveMQMessageConsumerImpl, ActiveMQMessageProducer
26+
* </li>
27+
* <li>
28+
* <b>Mid Level</b>: MessageClient
29+
* </li>
30+
* <li>
31+
* <b>High Level</b>: A Destination specific class that extends MessageClient and overrides methods
32+
* </li>
33+
* </ul>
2434
*
35+
* <p>
36+
* In addition to providing get and send message requests, you can also send batched messages or request a
37+
* certain number of batched messages.
38+
* </p>
39+
*
40+
* <p>
2541
* As a reminder, if you do not extend MessageClient, do to generics, you will need to provide
2642
* the class of the message object that will be sent in the queue/topic.
43+
* </p>
2744
*
2845
*
2946
* @param <P> the class of the message object that will be sent in the queue/topic.
@@ -80,14 +97,22 @@ public MessageClient(Destination destination, ActiveMQConnectionFactory factory,
8097
this.clazz = clazz;
8198
}
8299

100+
/**
101+
* This method can be called to close down the low level ActiveMQ consumer and producer
102+
* @throws JMSException
103+
*/
83104
public void close() throws JMSException {
84105
this.messageConsumer.closeConsumers();
85106
this.messageProducer.close();
86107
}
87108

88109
/**
89-
* ASYNC Message Receiver
90-
* This method allows you to register an Asynchronous message listener to this queue.
110+
* <b>
111+
* ASYNC Message Receiver
112+
* </b>
113+
* <p>
114+
* This method allows you to register an Asynchronous message listener to this queue.
115+
* </p>
91116
*
92117
* @param onMessage a lambda function to consume messages as they become available
93118
*/
@@ -96,9 +121,13 @@ public void onMessageReceived(Consumer<P> onMessage) throws JMSException {
96121
}
97122

98123
/**
99-
* ASYNC Message Receiver
124+
* <b>
125+
* ASYNC Message Receiver
126+
* </b>
127+
* <p>
100128
* This method allows you to register an Asynchronous message listener to this queue. Additionally, you can provide
101-
* MessageProperties in order to filter the messages that would be received
129+
* MessageProperties in order to filter the messages that would be received.
130+
* </p>
102131
*
103132
* @param onMessage a lambda function to consume messages as they become available
104133
* @param props an Object that holds message filters that are applied to the returned messages
@@ -107,10 +136,42 @@ public void onMessageReceived(Consumer<P> onMessage, MessageProperties props) th
107136
messageConsumer.onMessageReceived(onMessage,clazz,props);
108137
}
109138

139+
/**
140+
* <b>
141+
* SYNCHRONOUS Message Receiver
142+
* </b>
143+
* <p>
144+
* This method allows you to synchronously request a single message, transactional safe. Plain and simple
145+
* </p>
146+
* <p>
147+
* If your consumer method throws an error while processing, the message will be rolled back to the
148+
* message broker for another consumer to use
149+
* </p>
150+
* @param onMessage a method that will act on a recieved message if there is one available
151+
* @throws IOException
152+
* @throws JMSException
153+
*/
110154
public void getMessage(Consumer<P> onMessage) throws IOException, JMSException {
111155
getMessage(onMessage,null);
112156
}
113157

158+
/**
159+
* <b>
160+
* SYNCHRONOUS Message Receiver
161+
* </b>
162+
* <p>
163+
* This method allows you to synchronously request a single message, based on one or more filtering options
164+
* based on the provided MessageProperties object, in a transaction safe environment.
165+
* </p>
166+
* <p>
167+
* If your consumer method throws an error while processing, the message will be rolled back to the
168+
* message broker for another consumer to use
169+
* </p>
170+
* @param onMessage a method that will act on a recieved message if there is one available that fits the filter
171+
* @param props a <b>MessageProperties</b> object that holds one or more filters for the requested message
172+
* @throws IOException
173+
* @throws JMSException
174+
*/
114175
public void getMessage(Consumer<P> onMessage, MessageProperties props) throws IOException, JMSException {
115176
messageConsumer.getMessages((onMessages) -> {
116177
try{
@@ -129,8 +190,14 @@ public void getMessage(Consumer<P> onMessage, MessageProperties props) throws IO
129190
* This method allows you to get a transactional bound list of messages, equivalent to messageCount in length, if available.
130191
* If that amount is not available, it will return a smaller amount, according to availability.
131192
* The method handles the acknowledgment and commit/rollback of the transaction in case things go bad.
193+
* <p>
194+
* If your consumer method throws an error while processing, the message will be rolled back to the
195+
* message broker for another consumer to use
196+
* </p>
197+
* <p>
198+
* <i>Use getMessages(onMessage, messageCount, props) to apply filters to the messages</i>
199+
* </p>
132200
*
133-
* Use getMessages(onMessage, messageCount, props) to apply filters to the messages
134201
* @param onMessage A Consumer lambda function to run on the returned List of <P> objects
135202
* @param messageCount the number of messages to try to return. If the queue doesn't have that many messages, it returns
136203
* all the messages in the queue
@@ -144,15 +211,31 @@ public void getMessages(Consumer<List<P>> onMessage, Integer messageCount) throw
144211
* If that amount is not available, it will return a smaller amount, according to availability.
145212
* The method handles the acknowledgment and commit/rollback of the transaction in case things go bad.
146213
*
214+
* <p>
215+
* If your consumer method throws an error while processing, the message will be rolled back to the
216+
* message broker for another consumer to use
217+
* </p>
218+
*
219+
* <p>
147220
* Additionally, this method allows you to add filters to the messages you are requesting. These filters are
148221
* identified in the MessageProperties class which should be accessed through the builder() method. Some available
149222
* selectors are as follows:
150-
* MATCHES (make sure all messages have the same value for a given property)
151-
* EQUALS (make sure all messages have a given value for a given property)
152-
* GREATER_THAN (make sure all messages have a greater numeric value than given for a given property)
153-
* LESS_THAN (make sure all messages have a lesser numeric value than given for a given property)
223+
* </p>
224+
* <ul>
225+
* <li>
226+
* <b>MATCHES</b> (make sure all messages have the same value for a given property)
227+
* </li>
228+
* <li>
229+
* <b>EQUALS</b> (make sure all messages have a given value for a given property)
230+
* </li>
231+
* <li>
232+
* <b>GREATER_THAN</b> (make sure all messages have a greater numeric value than given for a given property)
233+
* </li>
234+
* <li>
235+
* <b>LESS_THAN</b> (make sure all messages have a lesser numeric value than given for a given property)
236+
* </li>
237+
* </ul>
154238
*
155-
* Use getMessages(onMessage, messageCount, props) to apply filters to the messages
156239
* @param onMessage A Consumer lambda function to run on the returned List of <P> objects
157240
* @param messageCount the number of messages to try to return. If the queue doesn't have that many messages, it returns
158241
* all the messages in the queue
@@ -162,30 +245,105 @@ public void getMessages(Consumer<List<P>> onMessage, Integer messageCount, Messa
162245
messageConsumer.getMessages(onMessage,messageCount, clazz,props);
163246
}
164247

165-
public void send(P host) throws JsonProcessingException, JMSException {
166-
send(host, null);
248+
/**
249+
* <p>
250+
* This method allows you to send a single message to the broker and ensure that it is received
251+
* </p>
252+
* <p>
253+
* <i>If you want to add filterable message properties to the message, use send(message,props)</i>
254+
* </p>
255+
* @param message an object of type <b>{@literal <}P{@literal >}</b> to send
256+
* @throws JsonProcessingException
257+
* @throws JMSException
258+
*/
259+
public void send(P message) throws JsonProcessingException, JMSException {
260+
send(message, null);
167261
}
168262

169-
public void send(P host, Properties props) throws JsonProcessingException, JMSException {
263+
/**
264+
* <p>
265+
* This method allows you to send a single message to the broker and ensure that it is received. Additionally,
266+
* it allows you to attach properties to the message that can be filtered on later by a consumer
267+
* </p>
268+
* @param message an object of type <b>{@literal <}P{@literal >}</b> to send
269+
* @param props a Properties object that contains filterable information about the message.
270+
* @throws JsonProcessingException
271+
* @throws JMSException
272+
*/
273+
public void send(P message, Properties props) throws JsonProcessingException, JMSException {
170274
if(props != null){
171-
messageProducer.send(host, props);
275+
messageProducer.send(message, props);
172276
}else{
173-
messageProducer.send(host);
277+
messageProducer.send(message);
174278
}
175279
messageProducer.commit();
176280
}
177281

178-
public void sendAll(List<P> hosts) throws JsonProcessingException, JMSException {
179-
sendAll(hosts,null);
282+
/**
283+
* <p>
284+
* This method allows you to send a list of messages to the broker and ensure that they all are received
285+
* </p>
286+
* <p>
287+
* <i>If you want to add filterable message properties to the messages, use sendAll(message,getProperty)</i>
288+
* </p>
289+
* @param messages a list of Objects of type <b>{@literal <}P{@literal >}</b> to send
290+
* @throws JsonProcessingException
291+
* @throws JMSException
292+
*/
293+
public void sendAll(List<P> messages) throws JsonProcessingException, JMSException {
294+
sendAll(messages,null);
180295
}
181296

182-
public void sendAll(List<P> hosts, Function<P, Properties> getProperty) throws JsonProcessingException, JMSException {
183-
for(P host : hosts){
184-
Properties props = Optional.ofNullable(getProperty).map(getProp -> getProp.apply(host)).orElse(null);
297+
/**
298+
* <p>
299+
* This method allows you to send a list of messages to the broker and ensure that they all are received.
300+
* Additionally, it allows you to attach properties to these message objects that can be filtered on
301+
* later by a consumer. Because a list of messages is being provided, you must provide a Function
302+
* that can be used to get the property for each message object.
303+
* </p>
304+
* <p>
305+
* The <b>getProperty</b> method is typically provided as a lambda function that takes the message
306+
* object as input and outputs a Properties object with any desired properties attached.
307+
* </p>
308+
* <p>
309+
* <b>Example:</b>
310+
* <ul>
311+
* <li>
312+
* message has a method <i>getScanType()</i> that returns a string
313+
* </li>
314+
* <li>
315+
* NmapScanType.class.getSimpleName() is the name of the property that will be filtered on later
316+
* </li>
317+
* </ul>
318+
* </p>
319+
* <p></p>
320+
* <pre>
321+
* (message) -> {
322+
* if(message.getScanType() != null){
323+
* Properties props = new Properties();
324+
* props.setProperty(NmapScanType.class.getSimpleName(),
325+
* message.getScanType().toString());
326+
* return props;
327+
* }else{
328+
* return null;
329+
* }
330+
* }
331+
* </pre>
332+
* <p></p>
333+
* <p>If desired, you can add as many Properties for that messsage as you would like</p>
334+
*
335+
* @param messages a list of Objects of type <b>{@literal <}P{@literal >}</b> to send
336+
* @param getProperty a Funcion that takes a message object and returns a Properties object. See above for Example
337+
* @throws JsonProcessingException
338+
* @throws JMSException
339+
*/
340+
public void sendAll(List<P> messages, Function<P, Properties> getProperty) throws JsonProcessingException, JMSException {
341+
for(P message : messages){
342+
Properties props = Optional.ofNullable(getProperty).map(getProp -> getProp.apply(message)).orElse(null);
185343
if(props != null){
186-
messageProducer.send(host, props);
344+
messageProducer.send(message, props);
187345
}else{
188-
messageProducer.send(host);
346+
messageProducer.send(message);
189347
}
190348
}
191349
messageProducer.commit();

0 commit comments

Comments
 (0)