This section provides a comprehensive guide to developing message producers and consumers using the Red Hat AMQ Core Protocol JMS client library (built upon Apache ActiveMQ Artemis JMS client). You will gain a deep understanding of the fundamental components involved and learn how to implement them in practical, hands-on Java applications to interact with an AMQ Broker deployed on OpenShift.
The Red Hat AMQ Core Protocol JMS client enables Java applications to communicate with the AMQ Broker using its native, high-performance wire format. At the heart of message-oriented middleware (MOM) lies the interaction between entities that send messages (producers) and entities that receive them (consumers).
To effectively develop a Java Core Protocol client, it’s crucial to understand the purpose and interaction of its key components:
-
ConnectionFactory: As the initial point of contact, the
ConnectionFactoryis responsible for creating connections to the AMQ Broker. It encapsulates connection configuration details, such as the broker’s network address (host and port) and SSL/TLS settings. For the Core Protocol JMS client, you typically use theActiveMQJMSClient.createConnectionFactory()method to instantiate it. -
Connection: Represents an active, live connection between the client application and the AMQ Broker. Establishing a connection is a relatively resource-intensive operation, so in well-designed applications,
Connectionobjects are typically created once and then reused throughout the application’s lifecycle. -
Session: A
Sessionprovides a single-threaded context for producing and consuming messages. It acts as a factory forMessageProducerandMessageConsumerobjects and can also support transactional operations. Multiple sessions can be created from a singleConnection, allowing for concurrent message processing within the same application. -
Destination: This abstract concept refers to the named location where messages are sent or from which they are received. In JMS,
Destinationcan be specialized into: -
Queue: Represents a point-to-point messaging destination, where each message sent to the queue is typically delivered to only one consumer. Messages are stored in order until consumed.
-
Topic: Represents a publish/subscribe messaging destination, where each message sent to the topic can be delivered to all active subscribers interested in that topic.
-
MessageProducer: An object created by a
Sessionthat an application uses to send messages to a specificDestination(either aQueueor aTopic). -
MessageConsumer: An object created by a
Sessionthat an application uses to receive messages from a specificDestination. Consumers can be configured to filter messages or receive them asynchronously. -
Message: This is the actual payload or application-specific data that is transmitted between the producer and consumer. JMS supports various message types, including
TextMessage(for string data),BytesMessage(for raw byte arrays),MapMessage,ObjectMessage, andStreamMessage, offering flexibility for different data formats.
|
Note
|
As highlighted in the provided context, the Core Protocol JMS client library allows for advanced features like flow control. You can configure ConnectionFactory properties such as setConsumerWindowSize() and setProducerWindowSize() to regulate the flow of messages and prevent resource exhaustion, especially in scenarios with varying producer and consumer speeds. For basic examples, default settings are often sufficient, but for performance-critical or high-volume applications, understanding and tuning these settings is crucial.
|
In this activity, you will set up a basic Maven project, incorporate the necessary AMQ Core Protocol JMS client dependency, and then develop separate Java applications to act as a message producer and a message consumer. These applications will demonstrate how to send and receive messages from an AMQ Broker deployed on OpenShift.
Before proceeding with this lab, ensure you have the following in place:
-
A running AMQ Broker instance on OpenShift. Refer to the "Deploying a Basic AMQ Broker Instance" lab if you need to set one up.
-
External access configured for your AMQ Broker, specifically for the Core Protocol, and you have the accessible hostname and port of its OpenShift Route.
-
Java Development Kit (JDK) 11 or later installed on your development machine.
-
Apache Maven installed.
-
A Java Integrated Development Environment (IDE) such as VS Code, IntelliJ IDEA, or Eclipse (optional, but recommended).
Begin by setting up a new Maven project structure for your client applications.
-
Open your terminal or command prompt and create a new directory for your project:
mkdir amq-core-java-client cd amq-core-java-client -
Initialize a basic Maven project using the
mvn archetype:generatecommand:mvn archetype:generate -DgroupId=com.redhat.amq.core -DartifactId=amq-core-java-client -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
This command creates a standard Maven project structure within the
amq-core-java-clientdirectory. -
Navigate into the newly created project directory:
cd amq-core-java-client
Next, you need to add the required client library dependency to your project’s pom.xml file.
-
Open the
amq-core-java-client/pom.xmlfile in your preferred text editor or IDE. -
Locate the
<dependencies>section and add the following<dependency>entry. Also, add a<properties>section to manage the AMQ client version andmaven-compiler-pluginconfiguration for better project management. Finally, add the Red Hat Maven repository andexec-maven-pluginconfiguration to easily run your main classes.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.redhat.amq.core</groupId> <artifactId>amq-core-java-client</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>amq-core-java-client</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> // (1) <maven.compiler.target>11</maven.compiler.target> // (1) <amq.version>2.31.0.redhat-00001</amq.version> // (2) </properties> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-client-all</artifactId> <version>${amq.version}</version> </dependency> <!-- Keep existing test dependencies or add others as needed --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <goals> <goal>java</goal> </goals> </execution> </executions> <configuration> <mainClass>${main.class}</mainClass> // (3) </configuration> </plugin> </plugins> </build> <repositories> // (4) <repository> <id>redhat-ga</id> <name>Red Hat General Availability Repository</name> <url>https://maven.repository.redhat.com/ga/</url> </repository> </repositories> </project>
-
Sets the Java compiler source and target versions to 11. Adjust if you are using a different JDK version.
-
Specifies the version of the
artemis-jms-client-alldependency. This version (e.g.,2.31.0.redhat-00001) is typically compatible with AMQ Broker 7.12+. Ensure this matches the version recommended for your AMQ Broker deployment. -
Configures the
exec-maven-plugin, allowing you to conveniently run your Javamainclasses directly from the command line usingmvn exec:java -Dmain.class=YourClassName. -
Adds the Red Hat General Availability Maven repository, which is necessary to resolve Red Hat-specific AMQ client artifacts.
-
-
Save the
pom.xmlfile.
Now, create a Java class that will instantiate the necessary JMS components to connect to the broker and send a text message to a queue.
-
Create a new file named
Producer.javain thesrc/main/java/com/redhat/amq/core/directory with the following content:package com.redhat.amq.core; import org.apache.activemq.artemis.jms.client.ActiveMQJMSClient; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class Producer { // Default connection parameters, override with system properties for your environment private static final String BROKER_HOST = System.getProperty("broker.host", "broker-amq-broker-core-protocol-external-route-amq-broker.apps.cluster.example.com"); // (1) private static final int BROKER_PORT = Integer.parseInt(System.getProperty("broker.port", "443")); // (2) private static final String BROKER_USER = System.getProperty("broker.user", "developer"); // (3) private static final String BROKER_PASSWORD = System.getProperty("broker.password", "password"); // (4) private static final String QUEUE_NAME = "exampleQueue"; // (5) public static void main(String[] args) throws JMSException { Connection connection = null; Session session = null; try { // Step 1: Create a ConnectionFactory // For Core Protocol, especially with OpenShift Routes, we use TransportConfiguration // to specify connection details including SSL/TLS settings. TransportConfiguration transportConfiguration = new TransportConfiguration( NettyConnectorFactory.class.getName(), new java.util.HashMap<String, Object>() {{ put("host", BROKER_HOST); put("port", BROKER_PORT); put("sslEnabled", true); // (6) }} ); ConnectionFactory connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(transportConfiguration); // If you are connecting to a non-SSL/TLS endpoint (less common for external OpenShift routes), // you might use a URI string: // ConnectionFactory connectionFactory = ActiveMQJMSClient.createConnectionFactory("tcp://" + BROKER_HOST + ":" + BROKER_PORT); // Step 2: Create a Connection to the Broker System.out.println("Connecting to broker at " + BROKER_HOST + ":" + BROKER_PORT); connection = connectionFactory.createConnection(BROKER_USER, BROKER_PASSWORD); connection.start(); // Connections must be started before use // Step 3: Create a Session // Arguments: transacted (false = no explicit transaction), acknowledgeMode (AUTO_ACKNOWLEDGE = broker acknowledges messages automatically) session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Step 4: Create a Destination (Queue) Destination queue = session.createQueue(QUEUE_NAME); // Step 5: Create a MessageProducer for the destination MessageProducer producer = session.createProducer(queue); // Step 6: Create and send a Message TextMessage message = session.createTextMessage("Hello from AMQ Core Producer! Timestamp: " + System.currentTimeMillis()); producer.send(message); // Send the message to the queue System.out.println("Successfully sent message: '" + message.getText() + "' to queue: " + QUEUE_NAME); } catch (Exception e) { System.err.println("Error encountered while sending message: " + e.getMessage()); e.printStackTrace(); } finally { // Step 7: Ensure resources are closed in a finally block if (session != null) { try { session.close(); } catch (JMSException e) { System.err.println("Error closing session: " + e.getMessage()); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { System.err.println("Error closing connection: " + e.getMessage()); } } System.out.println("Producer application finished."); } } }
-
IMPORTANT: Replace
broker-amq-broker-core-protocol-external-route-amq-broker.apps.cluster.example.comwith the actual hostname of your AMQ Broker’s external route for Core Protocol. You can obtain this from your OpenShift console or by usingoc get route -n <your-namespace>. -
Use
443for HTTPS/SSL-enabled routes, which is the common practice for external OpenShift routes. Adjust if your route exposes a different port. -
Replace
developerwith the username configured for accessing your AMQ Broker. -
Replace
passwordwith the password associated with the specified user. -
The name of the queue that both the producer and consumer will use. Ensure this queue exists or is configured to be auto-created on the broker.
-
Set to
trueif your OpenShift route for Core Protocol uses SSL/TLS (HTTPS), which is the default and recommended for external access.
-
-
Save the
Producer.javafile.
Next, create a Java class that will establish a connection to the broker and then receive a message from the same queue.
-
Create a new file named
Consumer.javain thesrc/main/java/com/redhat/amq/core/directory with the following content:package com.redhat.amq.core; import org.apache.activemq.artemis.jms.client.ActiveMQJMSClient; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer { // Default connection parameters, override with system properties for your environment private static final String BROKER_HOST = System.getProperty("broker.host", "broker-amq-broker-core-protocol-external-route-amq-broker.apps.cluster.example.com"); private static final int BROKER_PORT = Integer.parseInt(System.getProperty("broker.port", "443")); private static final String BROKER_USER = System.getProperty("broker.user", "developer"); private static final String BROKER_PASSWORD = System.getProperty("broker.password", "password"); private static final String QUEUE_NAME = "exampleQueue"; public static void main(String[] args) throws JMSException, InterruptedException { Connection connection = null; Session session = null; try { // Step 1: Create a ConnectionFactory (same as producer) TransportConfiguration transportConfiguration = new TransportConfiguration( NettyConnectorFactory.class.getName(), new java.util.HashMap<String, Object>() {{ put("host", BROKER_HOST); put("port", BROKER_PORT); put("sslEnabled", true); }} ); ConnectionFactory connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(transportConfiguration); // Step 2: Create a Connection System.out.println("Connecting to broker at " + BROKER_HOST + ":" + BROKER_PORT); connection = connectionFactory.createConnection(BROKER_USER, BROKER_PASSWORD); connection.start(); // Connections must be started before use // Step 3: Create a Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Step 4: Create a Destination (Queue) Destination queue = session.createQueue(QUEUE_NAME); // Step 5: Create a MessageConsumer for the destination MessageConsumer consumer = session.createConsumer(queue); System.out.println("Waiting for messages on queue: " + QUEUE_NAME + " (timeout: 10 seconds)"); // Step 6: Receive a Message // consumer.receive(timeout) blocks until a message arrives or the timeout expires. // Other options: receive() blocks indefinitely, receiveNoWait() returns immediately. Message message = consumer.receive(10000); // Wait up to 10 seconds for a message if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("Received message: '" + textMessage.getText() + "'"); } else if (message == null) { System.out.println("No message received within the specified timeout."); } else { System.out.println("Received a non-text message of type: " + message.getClass().getName()); } } catch (Exception e) { System.err.println("Error encountered while receiving message: " + e.getMessage()); e.printStackTrace(); } finally { // Step 7: Ensure resources are closed if (session != null) { try { session.close(); } catch (JMSException e) { System.err.println("Error closing session: " + e.getMessage()); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { System.err.println("Error closing connection: " + e.getMessage()); } } System.out.println("Consumer application finished."); } } }
-
Save the
Consumer.javafile.
Now, you will compile your Java project and then run the producer and consumer applications from separate terminal windows to observe message exchange.
-
Open your terminal in the
amq-core-java-clientproject root and compile your Java project:mvn clean install
-
Open two separate terminal windows (or tabs). One will be for the consumer, and the other for the producer.
-
In the first terminal window, run the
Consumerapplication:mvn exec:java -Dmain.class="com.redhat.amq.core.Consumer" \ -Dbroker.host="<YOUR_BROKER_CORE_PROTOCOL_ROUTE_HOSTNAME>" \ -Dbroker.port="443" \ -Dbroker.user="developer" \ -Dbroker.password="password"
Remember to replace
<YOUR_BROKER_CORE_PROTOCOL_ROUTE_HOSTNAME>with the actual hostname of your AMQ Broker’s Core Protocol route. The consumer application will start, connect to the broker, and then wait for a message on theexampleQueue. -
In the second terminal window, run the
Producerapplication:mvn exec:java -Dmain.class="com.redhat.amq.core.Producer" \ -Dbroker.host="<YOUR_BROKER_CORE_PROTOCOL_ROUTE_HOSTNAME>" \ -Dbroker.port="443" \ -Dbroker.user="developer" \ -Dbroker.password="password"
Again, replace
<YOUR_BROKER_CORE_PROTOCOL_ROUTE_HOSTNAME>with your broker’s actual Core Protocol route hostname. The producer application will connect, send a message toexampleQueue, and then exit. -
Observe the output in both terminals:
The Producer terminal should display output similar to this:
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ amq-core-java-client --- Connecting to broker at broker-amq-broker-core-protocol-external-route-amq-broker.apps.cluster.example.com:443 Successfully sent message: 'Hello from AMQ Core Producer! Timestamp: 1678886400000' to queue: exampleQueue Producer application finished. [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------
+ The Consumer terminal should then show:
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ amq-core-java-client ---
Connecting to broker at broker-amq-broker-core-protocol-external-route-amq-broker.apps.cluster.example.com:443
Waiting for messages on queue: exampleQueue (timeout: 10 seconds)
Received message: 'Hello from AMQ Core Producer! Timestamp: 1678886400000'
Consumer application finished.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------Congratulations! You have successfully developed, compiled, and executed Java Core Protocol producer and consumer applications, demonstrating fundamental message sending and receiving capabilities with an AMQ Broker deployed on OpenShift.
Developing robust messaging applications often involves more than just the basic send/receive pattern. Here are some expert insights and common troubleshooting tips for working with AMQ Core Protocol JMS clients:
-
Connection URL vs. TransportConfiguration: While simpler URI strings like
tcp://host:portcan be used withActiveMQJMSClient.createConnectionFactory(), usingTransportConfigurationdirectly (as demonstrated in the lab) offers finer-grained control, especially when dealing with advanced network settings, SSL/TLS, or custom connector parameters in an OpenShift environment. -
SSL/TLS Handshake Failures (
javax.net.ssl.SSLHandshakeException): -
Verify
sslEnabled: Ensure thatsslEnabledis set totruein yourTransportConfigurationif your OpenShift route for Core Protocol is serving over HTTPS/TLS. -
Certificate Trust: For routes using self-signed certificates (common in development environments) or certificates from a custom issuer, your Java client’s trust store might not inherently trust the broker’s certificate. You might need to import the Certificate Authority (CA) certificate into your client’s Java Key Store (
cacerts) or configure a custom trust store for your JVM. -
Hostname Verification: The hostname used in
BROKER_HOSTmust match the Common Name (CN) or a Subject Alternative Name (SAN) in the broker’s SSL/TLS certificate to prevent hostname verification errors. -
Authentication and Authorization Issues:
-
Credentials: Double-check the
BROKER_USERandBROKER_PASSWORDvalues. These must be valid credentials configured on your AMQ Broker instance. -
Permissions: Ensure the authenticated user has the necessary permissions (defined via RBAC on the broker) to connect, create sessions, and send/receive messages from the specified
QUEUE_NAME. -
Resource Management: Always close
Connection,Session,MessageProducer, andMessageConsumerobjects in afinallyblock to guarantee that resources are released, even if exceptions occur. Failing to do so can lead to resource leaks and instability. -
Asynchronous Message Consumption: For production-grade applications that need to process messages continuously without blocking the main thread, implement an asynchronous message consumer using
MessageListener. TheMessageListenerinterface allows you to define a callback method (onMessage()) that is invoked automatically when a message arrives. -
Message Acknowledgement: In our example,
Session.AUTO_ACKNOWLEDGEwas used. Other acknowledgement modes likeCLIENT_ACKNOWLEDGE(client is responsible for explicitly acknowledging messages) orDUPS_OK_ACKNOWLEDGE(lazy acknowledgement, potentially accepting duplicates) offer different trade-offs in terms of reliability and performance. -
Flow Control (
ConsumerWindowSize,ProducerWindowSize): As noted earlier, these settings are crucial for managing message flow between the client and broker. If a producer is significantly faster than a consumer, the broker or the consumer can become overwhelmed. Configuring appropriate window sizes on theConnectionFactorycan mitigate this by throttling producers or consumers. Refer to the "Flow Control" chapter in the officialRed Hat AMQ Core Protocol JMSdocumentation for detailed guidance. -
Error Handling and Retries: Real-world applications should incorporate robust error handling, including retry mechanisms with exponential backoff for transient network issues or temporary broker unavailability.
