-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy pathActiveMQMessageProducerSupport.java
More file actions
406 lines (372 loc) · 17.5 KB
/
ActiveMQMessageProducerSupport.java
File metadata and controls
406 lines (372 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.Set;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.IllegalStateException;
import jakarta.jms.IllegalStateRuntimeException;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageFormatRuntimeException;
import jakarta.jms.MessageProducer;
/**
* A useful base class for implementing a {@link MessageProducer}
*
*
*/
public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable {
protected ActiveMQSession session;
protected boolean disableMessageID;
protected boolean disableMessageTimestamp;
protected int defaultDeliveryMode;
protected int defaultPriority;
protected long defaultTimeToLive;
protected int sendTimeout=0;
public ActiveMQMessageProducerSupport(ActiveMQSession session) {
this.session = session;
disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
}
/**
* Gets the delivery delay associated with this <CODE>MessageProducer</CODE>.
*
* @return this producer's <CODE>DeliveryDely/ <CODE>
* @throws JMSException if the JMS provider fails to close the producer due to
* some internal error.
* @since 2.0
*/
@Override
public void setDeliveryDelay(long deliveryDelay) throws JMSException {
throw new UnsupportedOperationException("setDeliveryDelay() is not supported");
}
/**
* Gets the delivery delay value for this <CODE>MessageProducer</CODE>.
*
* @return the delivery delay for this messageProducer
* @throws jakarta.jms.JMSException if the JMS provider fails to determine if deliver delay is
* disabled due to some internal error.
*/
@Override
public long getDeliveryDelay() throws JMSException {
throw new UnsupportedOperationException("getDeliveryDelay() is not supported");
}
/**
* Sets whether message IDs are disabled.
* <P>
* Since message IDs take some effort to create and increase a message's
* size, some JMS providers may be able to optimize message overhead if
* they are given a hint that the message ID is not used by an application.
* By calling the <CODE>setDisableMessageID</CODE> method on this message
* producer, a JMS client enables this potential optimization for all
* messages sent by this message producer. If the JMS provider accepts this
* hint, these messages must have the message ID set to null; if the
* provider ignores the hint, the message ID must be set to its normal
* unique value.
* <P>
* Message IDs are enabled by default.
*
* @param disableMessageID indicates if message IDs are disabled
* @throws jakarta.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
*/
public void setDisableMessageID(boolean disableMessageID) throws JMSException {
checkClosed();
this.disableMessageID = disableMessageID;
}
/**
* Gets an indication of whether message IDs are disabled.
*
* @return an indication of whether message IDs are disabled
* @throws jakarta.jms.JMSException if the JMS provider fails to determine if message IDs are
* disabled due to some internal error.
*/
public boolean getDisableMessageID() throws JMSException {
checkClosed();
return this.disableMessageID;
}
/**
* Sets whether message timestamps are disabled.
* <P>
* Since timestamps take some effort to create and increase a message's
* size, some JMS providers may be able to optimize message overhead if
* they are given a hint that the timestamp is not used by an application.
* By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
* message producer, a JMS client enables this potential optimization for
* all messages sent by this message producer. If the JMS provider accepts
* this hint, these messages must have the timestamp set to zero; if the
* provider ignores the hint, the timestamp must be set to its normal
* value.
* <P>
* Message timestamps are enabled by default.
*
* @param disableMessageTimestamp indicates if message timestamps are disabled
* @throws jakarta.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
*/
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
checkClosed();
this.disableMessageTimestamp = disableMessageTimestamp;
}
/**
* Gets an indication of whether message timestamps are disabled.
*
* @return an indication of whether message timestamps are disabled
* @throws jakarta.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
*/
public boolean getDisableMessageTimestamp() throws JMSException {
checkClosed();
return this.disableMessageTimestamp;
}
/**
* Sets the producer's default delivery mode.
* <P>
* Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
*
* @param newDeliveryMode the message delivery mode for this message producer; legal
* values are <code>DeliveryMode.NON_PERSISTENT</code> and
* <code>DeliveryMode.PERSISTENT</code>
* @throws jakarta.jms.JMSException if the JMS provider fails to set the delivery mode due to
* some internal error.
* @see jakarta.jms.MessageProducer#getDeliveryMode
* @see jakarta.jms.DeliveryMode#NON_PERSISTENT
* @see jakarta.jms.DeliveryMode#PERSISTENT
* @see jakarta.jms.Message#DEFAULT_DELIVERY_MODE
*/
public void setDeliveryMode(int newDeliveryMode) throws JMSException {
if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
throw new jakarta.jms.IllegalStateException("unknown delivery mode: " + newDeliveryMode);
}
checkClosed();
this.defaultDeliveryMode = newDeliveryMode;
}
/**
* Gets the producer's default delivery mode.
*
* @return the message delivery mode for this message producer
* @throws jakarta.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
*/
public int getDeliveryMode() throws JMSException {
checkClosed();
return this.defaultDeliveryMode;
}
/**
* Sets the producer's default priority.
* <P>
* The JMS API defines ten levels of priority value, with 0 as the lowest
* priority and 9 as the highest. Clients should consider priorities 0-4 as
* gradations of normal priority and priorities 5-9 as gradations of
* expedited priority. Priority is set to 4 by default.
*
* @param newDefaultPriority the message priority for this message producer; must be a
* value between 0 and 9
* @throws jakarta.jms.JMSException if the JMS provider fails to set the delivery mode due to
* some internal error.
* @see jakarta.jms.MessageProducer#getPriority
* @see jakarta.jms.Message#DEFAULT_PRIORITY
*/
public void setPriority(int newDefaultPriority) throws JMSException {
if (newDefaultPriority < 0 || newDefaultPriority > 9) {
throw new IllegalStateException("default priority must be a value between 0 and 9");
}
checkClosed();
this.defaultPriority = newDefaultPriority;
}
/**
* Gets the producer's default priority.
*
* @return the message priority for this message producer
* @throws jakarta.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
* @see jakarta.jms.MessageProducer#setPriority
*/
public int getPriority() throws JMSException {
checkClosed();
return this.defaultPriority;
}
/**
* Sets the default length of time in milliseconds from its dispatch time
* that a produced message should be retained by the message system.
* <P>
* Time to live is set to zero by default.
*
* @param timeToLive the message time to live in milliseconds; zero is unlimited
* @throws jakarta.jms.JMSException if the JMS provider fails to set the time to live due to
* some internal error.
* @see jakarta.jms.MessageProducer#getTimeToLive
* @see jakarta.jms.Message#DEFAULT_TIME_TO_LIVE
*/
public void setTimeToLive(long timeToLive) throws JMSException {
if (timeToLive < 0L) {
throw new IllegalStateException("cannot set a negative timeToLive");
}
checkClosed();
this.defaultTimeToLive = timeToLive;
}
/**
* Gets the default length of time in milliseconds from its dispatch time
* that a produced message should be retained by the message system.
*
* @return the message time to live in milliseconds; zero is unlimited
* @throws jakarta.jms.JMSException if the JMS provider fails to get the time to live due to
* some internal error.
* @see jakarta.jms.MessageProducer#setTimeToLive
*/
public long getTimeToLive() throws JMSException {
checkClosed();
return this.defaultTimeToLive;
}
/**
* Sends a message using the <CODE>MessageProducer</CODE>'s default
* delivery mode, priority, and time to live.
*
* @param message the message to send
* @throws jakarta.jms.JMSException if the JMS provider fails to send the message due to some
* internal error.
* @throws jakarta.jms.MessageFormatException if an invalid message is specified.
* @throws jakarta.jms.InvalidDestinationException if a client uses this method with a <CODE>
* MessageProducer</CODE> with an invalid destination.
* @throws UnsupportedOperationException
* if a client uses this method with a <CODE>
* MessageProducer</CODE> that did not specify a
* destination at creation time.
* @see jakarta.jms.Session#createProducer
* @see jakarta.jms.MessageProducer
* @since 1.1
*/
public void send(Message message) throws JMSException {
this.send(this.getDestination(),
message,
this.defaultDeliveryMode,
this.defaultPriority,
this.defaultTimeToLive);
}
/**
* Sends a message to the destination, specifying delivery mode, priority,
* and time to live.
*
* @param message the message to send
* @param deliveryMode the delivery mode to use
* @param priority the priority for this message
* @param timeToLive the message's lifetime (in milliseconds)
* @throws jakarta.jms.JMSException if the JMS provider fails to send the message due to some
* internal error.
* @throws jakarta.jms.MessageFormatException if an invalid message is specified.
* @throws jakarta.jms.InvalidDestinationException if a client uses this method with a <CODE>
* MessageProducer</CODE> with an invalid destination.
* @throws UnsupportedOperationException
* if a client uses this method with a <CODE>
* MessageProducer</CODE> that did not specify a
* destination at creation time.
* @see jakarta.jms.Session#createProducer
* @since 1.1
*/
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(this.getDestination(),
message,
deliveryMode,
priority,
timeToLive);
}
/**
* Sends a message to a destination for an unidentified message producer.
* Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
* priority, and time to live.
* <P>
* Typically, a message producer is assigned a destination at creation
* time; however, the JMS API also supports unidentified message producers,
* which require that the destination be supplied every time a message is
* sent.
*
* @param destination the destination to send this message to
* @param message the message to send
* @throws jakarta.jms.JMSException if the JMS provider fails to send the message due to some
* internal error.
* @throws jakarta.jms.MessageFormatException if an invalid message is specified.
* @throws jakarta.jms.InvalidDestinationException if a client uses this method with an invalid destination.
* @throws UnsupportedOperationException
* if a client uses this method with a <CODE>
* MessageProducer</CODE> that specified a destination at
* creation time.
* @see jakarta.jms.Session#createProducer
* @see jakarta.jms.MessageProducer
*/
public void send(Destination destination, Message message) throws JMSException {
this.send(destination,
message,
this.defaultDeliveryMode,
this.defaultPriority,
this.defaultTimeToLive);
}
protected abstract void checkClosed() throws IllegalStateException;
protected static void validateDeliveryMode(final int deliveryMode) throws JMSException {
if (deliveryMode != DeliveryMode.PERSISTENT && deliveryMode != DeliveryMode.NON_PERSISTENT) {
throw new JMSException("Invalid delivery mode: " + deliveryMode);
}
}
protected static void validatePriority(final int priority) throws JMSException {
if (priority < 0 || priority > 9) {
throw new JMSException("Invalid priority: " + priority + " (must be 0-9)");
}
}
/**
* @return the sendTimeout
*/
public int getSendTimeout() {
return sendTimeout;
}
/**
* @param sendTimeout the sendTimeout to set
*/
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
// See JMS 2.0 spec sections 3.5.1 and 3.8.1.1
public static final Set<String> JMS_PROPERTY_NAMES_DISALLOWED = Set.of("JMSDeliveryMode", "JMSPriority", "JMSMessageID", "JMSTimestamp", "JMSCorrelationID", "JMSType", "NULL", "TRUE", "FALSE", "NOT", "AND", "OR", "BETWEEN", "LIKE", "IN", "IS", "ESCAPE");
public static void validateValidPropertyName(String propertyName) throws IllegalStateRuntimeException {
if(propertyName == null || propertyName.length() == 0) {
throw new IllegalArgumentException("Invalid JMS property name must not be null or empty");
}
if(JMS_PROPERTY_NAMES_DISALLOWED.contains(propertyName)) {
throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name is in disallowed list");
}
char first = propertyName.charAt(0);
if(!(Character.isJavaIdentifierStart(first))) {
throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name starts with invalid character: " + first);
}
for (int i=1; i < propertyName.length(); i++) {
char c = propertyName.charAt(i);
if (!(Character.isJavaIdentifierPart(c))) {
throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name contains invalid character: " + c);
}
}
}
public static void validateValidPropertyValue(String propertyName, Object propertyValue) throws IllegalStateRuntimeException {
boolean invalid = true;
if(propertyValue == null || propertyValue instanceof String ||
propertyValue instanceof Integer || propertyValue instanceof Short ||
propertyValue instanceof Float || propertyValue instanceof Long ||
propertyValue instanceof Boolean || propertyValue instanceof Byte ||
propertyValue instanceof Character || propertyValue instanceof Double) {
invalid = false;
}
if(invalid) {
throw new MessageFormatRuntimeException("Invalid JMS property: " + propertyName + " value class: " + propertyValue.getClass().getName() + " is not permitted by specification");
}
}
}