ActiveMQ point-to-point messaging domain

This tutorial will show you how we can send a message to Queue using point-to-point messaging system in Apache ActiveMQ. For more information on point-to-point messaging system please read tutorial https://roytuts.com/configure-jms-client-using-glassfish-3/

Before you move forward please read the tutorial https://roytuts.com/apache-activemq-configuration-in-windows/ for configuring ActiveMQ but do not create any Queue.

Now we will look into the following steps in order to implement point-to-point messaging system.

1. Create a class called MessageProducer that will produce message or send message to the destination – Queue.

package com.roytuts.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.roytuts.jms.constants.JmsConstants;
public class MessageProducer {
	private ConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private javax.jms.MessageProducer messageProducer;
	public void sendMessage(final String message) {
		try {
			// get the ConnectionFactory
			// default broker URL is tcp://localhost:61616
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_BROKER_URL);
			// create a Connection object
			connection = connectionFactory.createConnection();
			// start the connection
			connection.start();
			// create a Session object
			// transaction false
			// auto acknowledgment sent when sending or receiving a message
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// create a destination - Queue or Topic - where message will be
			// sent or received from
			Destination destination = session
					.createQueue(JmsConstants.QUEUE_NAME);
			// create a Producer who will send the message
			messageProducer = session.createProducer(destination);
			// create a TextMessage, there are many more message types
			TextMessage textMessage = session.createTextMessage(message);
			// send the message
			messageProducer.send(textMessage);
			System.out.println("Sending Message : " + textMessage);
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				messageProducer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

2. Create a class called MessageConsumer that will receive message from the destination – Queue.

package com.roytuts.jms.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.roytuts.jms.constants.JmsConstants;
public class MessageConsumer {
	private ConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private javax.jms.MessageConsumer messageConsumer;
	public void consumeMessage() {
		try {
			// get the ConnectionFactory
			// default broker URL is tcp://localhost:61616
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_BROKER_URL);
			// create a Connection object
			connection = connectionFactory.createConnection();
			// start the connection
			connection.start();
			// create a Session object
			// transaction false
			// auto acknowledgment sent when sending or receiving a message
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// create a destination - Queue or Topic - where message will be
			// sent or received from
			Destination destination = session
					.createQueue(JmsConstants.QUEUE_NAME);
			// create a Producer who will send the message
			messageConsumer = session.createConsumer(destination);
			// consume message sent by Producer
			Message message = messageConsumer.receive();
			// check whether it is an instance of TextMessage
			// because we have let Producer send TextMessage
			// there may be other message type
			if (message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				System.out.println("Got message sent by Producer : "
						+ textMessage);
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				messageConsumer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

3. Create a class called JmsConstants that will hold all constants related to JMS.

package com.roytuts.jms.constants;
public class JmsConstants {
	private JmsConstants() {
	}
	public static final String QUEUE_NAME = "IN_QUEUE";
}

4. Create a main class that will run the MessageProducer.

package com.roytuts.jms.producer;
public class MessageProducerTest {
	public static void main(String[] args) {
		new MessageProducer()
				.sendMessage("This message is coming from Message Producer.");
	}
}

5. Create a main class that will run the MessageConsumer.

package com.roytuts.jms.consumer;
public class MessageConsumerTest {
	public static void main(String[] args) {
		new MessageConsumer().consumeMessage();
	}
}

6. Run the MessageProducer.
Output

Sending Message : ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:admin-PC-50423-1429331013443-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://IN_QUEUE, transactionId = null, expiration = 0, timestamp = 1429331013666, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = This message is coming from Message Producer.}

7. Go to the ActiveMQ Web Console and click on Queues. You will see one message is enqueued in the IN_QUEUE and pending for consumption because at this moment the MessageConsumer is not up and running.

ActiveMQ MessageProducer sends a message
8. Run the MessageConsumer.
Output

Got message sent by Producer : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:admin-PC-50413-1429330725391-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:admin-PC-50413-1429330725391-1:1:1:1, destination = queue://IN_QUEUE, transactionId = null, expiration = 0, timestamp = 1429330725957, arrival = 0, brokerInTime = 1429330725960, brokerOutTime = 1429330834373, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6824be, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = This message is coming from Message Producer.}

9. Now again click on Queues in ActiveMQ Web Console. You will not see any message in the IN_QUEUE because as soon as you ran the MessageConsumer the message has been consumed by the MessageConsumer and now you will see one message has been dequeued.

ActiveMQ no message pending in IN_QUEUE

Note: if you run the MessageConsumer first then MessageProducer then you will not see any message pending in the IN_QUEUE.

Here is the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.roytuts</groupId>
	<artifactId>apache-activemq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>apache-activemq</name>
	<url>http://maven.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<jdk.version>1.8</jdk.version>
		<junit.version>4.11</junit.version>
		<slf4j.version>1.7.5</slf4j.version>
		<activemq.version>5.11.1</activemq.version>
		<spring.version>4.1.5.RELEASE</spring.version>
	</properties>
	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>${jdk.version}</source>
					<target>${jdk.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

 
That’s all. Thank you for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *