Setting Priority In JMS Message

Priority Of Message

In this tutorial I am going to tell you how to set priority of messages in JMS (Java Messaging Service). So it is possible to control the order of the message before sending to a destination by setting a priority level. It forces the JMS provider to deliver high priority messages first.

JMS API has message priority levels from 0 (lowest priority) to 9 (highest priority). The default priority level is 4 if you do not specify any priority level for a message.

How to set Message Priority

You can use message priority levels to instruct the JMS provider to deliver urgent messages first. In a JMS application, priority can be set in either of the following ways:

  • By using setPriority(int value) method of MessageProducer interface.
  • By using the overloaded publish() method. The third argument will be the priority.

The priority using the publish() method can be set in the following way:

topicPublisher.publish(message, DeliveryMode.NON_PERSISTENT, 3, 20000)

ActiveMQ observes three distinct levels of “Priority”:

  • Default (JMSPriority == 4)
  • High (JMSPriority > 4 && <= 9)
  • Low (JMSPriority > 0 && < 4)

JMS Message Expiry

By default a message never expires. In some cases message will become obsolete after a particular time period. In such situation it is desirable to set expiration time. After the message expires, it will not be delivered.

You can set the expiration time in program in two ways:

a) Using setTimeToLive() method of MessageProducer interface to set the expiry time of messages from that producer.

Example: setTimeToLive(10000)

The above statement sets the expiry time 10000 milliseconds (10 seconds).

b) Using the overridden method publish()

Example : topPublisher.publish(message, DeliveryMode.PERSISTENT, 3, 10000)

The fourth argument gives the expiry time as 10000 milliseconds.

A JMS provider tries to deliver higher priority messages before lower priority messages but not necessarily to be delivered in exact order of priority.

Example On Setting Message Priority

Here I am going to create an example on how to set priority to messages while sending to the destination.

Prerequisites

Java 19, ActiveMQ 5.18.3, Maven 3.8.5, Apache ActiveMQ Configuration in Windows

JNDI Configuration

Java Naming Directory Interface (JNDI) configuration is required for the JMS services. JNDI allows Java software clients to discover and look up data and resources, in the form of Java objects, via a name. Create below jndi.properties file under src/main/resources folder.

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616?jms.messagePrioritySupported=true
queue.queue/queueName=IN_QUEUE

Note that instead of using the default connection to the ActiveMQ broker, I have specified the ‘messagePrioritySupported’ property in order to assure the message is prioritized on the consumer side.

Message Producer

The below producer class sends message to a queue IN_QUEUE. The role of the message producer is to produce the message. The message broker is the intermediary that communicates between message producer and message consumer. So the message is exchanged through message broker.

package com.roytuts.jms.message.priority.producer;


public class Producer {

	public static void produceMessage() {
		InitialContext initialContext = null;
		QueueConnectionFactory connectionFactory;
		QueueConnection connection = null;
		QueueSender sender;
		QueueSession session;
		Queue queue;

		try {
			initialContext = new InitialContext();

			queue = (Queue) initialContext.lookup("queue/queueName");

			connectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");

			connection = connectionFactory.createQueueConnection();

			connection.start();

			session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

			sender = session.createSender(queue);

			sender.setPriority(3);

			// Send message with priority 3
			TextMessage message = session.createTextMessage("This message has a priority of 3");
			sender.send(message);

			// Send another message with priority 9
			message = session.createTextMessage("This message has a priority of 9");
			sender.send(message, DeliveryMode.PERSISTENT, 9, 0);
			
			Thread.sleep(1000);
		} catch (JMSException | NamingException ex) {
			ex.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (initialContext != null) {
				try {
					initialContext.close();
				} catch (NamingException e) {
					e.printStackTrace();
				}
			}
		}

	}

}

In the above class notice that I am sending two messages and before sending the first message I have set the priority 3. Later before sending the second message I have set message priority to 9. So, the second message the highest priority and the second message should be received by broker first.

Message Consumer

The message consumer consumes messages. Here I am creating two consumers to consume two messages. In point to point communication a producer and a consumer participate in message exchange.

Here is the source code for consumer:

package com.roytuts.jms.message.priority.consumer;


public class Consumer {

	private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

	Session session;
	Connection connection;
	MessageConsumer consumer;
	InitialContext initialContext;

	public void initialContext() {
		try {
			initialContext = new InitialContext();
		} catch (NamingException e) {
			e.printStackTrace();
		}
	}

	public void openConnection() {
		try {
			ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
			connection = connectionFactory.createConnection();
		} catch (NamingException | JMSException e) {
			e.printStackTrace();
		}
	}

	public void closeConnection() {
		try {
			if (connection != null) {
				connection.close();
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void createConsumer() {
		try {
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			Destination queue = (Queue) initialContext.lookup("queue/queueName");

			consumer = session.createConsumer(queue);

			connection.start();
		} catch (NamingException | JMSException e) {
			e.printStackTrace();
		}
	}

	public void receive() {
		Message message;
		try {
			message = consumer.receive();
			if (message != null && message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				LOGGER.info("Consumer received a message '{}' at {}", textMessage.getText(), LocalTime.now());
			} else if (message == null) {
				LOGGER.error("Consumer fails to receive the message sent by the producer.");
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

Main Class

The following class will start the producer and consumer to start the message communication.

package com.roytuts.jms.message.priority;


public class App {

	public static void main(String[] args) {
		Producer producer = new Producer();
		producer.produceMessage();

		Consumer consumer = new Consumer();

		consumer.initialContext();
		consumer.openConnection();

		consumer.createConsumer();

		consumer.receive();
		consumer.receive();

		consumer.closeConnection();
	}

}

Testing Message Priority

Running the above main class will give you the following output:

 INFO | Consumer received a message 'This message has a priority of 9' at 20:03:25.946926800
 INFO | Consumer received a message 'This message has a priority of 3' at 20:03:25.953926300

So, it’s obvious that the high priority message was received by consumer earlier than low priority message.

That’s all about setting message priority in JMS in point to point communication architecture in JMS.

Source Code

Download

1 thought on “Setting Priority In JMS Message

Leave a Reply

Your email address will not be published. Required fields are marked *