Priority Of Message
In this tutorial I am going to tell you how to set priority of messages in JMS (Java Messaging Service). So it is possible to control the order of the message before sending to a destination by setting a priority level. It forces the JMS provider to deliver high priority messages first.
JMS API has message priority levels from 0 (lowest priority) to 9 (highest priority). The default priority level is 4 if you do not specify any priority level for a message.
How to set Message Priority
You can use message priority levels to instruct the JMS provider to deliver urgent messages first. In a JMS application, priority can be set in either of the following ways:
- By using
setPriority(int value)
method ofMessageProducer
interface. - By using the overloaded
publish()
method. The third argument will be the priority.
The priority using the publish() method can be set in the following way:
topicPublisher.publish(message, DeliveryMode.NON_PERSISTENT, 3, 20000)
ActiveMQ observes three distinct levels of “Priority”:
- Default (JMSPriority == 4)
- High (JMSPriority > 4 && <= 9)
- Low (JMSPriority > 0 && < 4)
JMS Message Expiry
By default a message never expires. In some cases message will become obsolete after a particular time period. In such situation it is desirable to set expiration time. After the message expires, it will not be delivered.
You can set the expiration time in program in two ways:
a) Using setTimeToLive()
method of MessageProducer
interface to set the expiry time of messages from that producer.
Example: setTimeToLive(10000)
The above statement sets the expiry time 10000 milliseconds (10 seconds).
b) Using the overridden method publish()
Example : topPublisher.publish(message, DeliveryMode.PERSISTENT, 3, 10000)
The fourth argument gives the expiry time as 10000 milliseconds.
A JMS provider tries to deliver higher priority messages before lower priority messages but not necessarily to be delivered in exact order of priority.
Example On Setting Message Priority
Here I am going to create an example on how to set priority to messages while sending to the destination.
Prerequisites
Java 19, ActiveMQ 5.18.3, Maven 3.8.5, Apache ActiveMQ Configuration in Windows
JNDI Configuration
Java Naming Directory Interface (JNDI) configuration is required for the JMS services. JNDI allows Java software clients to discover and look up data and resources, in the form of Java objects, via a name. Create below jndi.properties file under src/main/resources folder.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616?jms.messagePrioritySupported=true
queue.queue/queueName=IN_QUEUE
Note that instead of using the default connection to the ActiveMQ broker, I have specified the ‘messagePrioritySupported’ property in order to assure the message is prioritized on the consumer side.
Message Producer
The below producer class sends message to a queue IN_QUEUE
. The role of the message producer is to produce the message. The message broker is the intermediary that communicates between message producer and message consumer. So the message is exchanged through message broker.
package com.roytuts.jms.message.priority.producer;
public class Producer {
public static void produceMessage() {
InitialContext initialContext = null;
QueueConnectionFactory connectionFactory;
QueueConnection connection = null;
QueueSender sender;
QueueSession session;
Queue queue;
try {
initialContext = new InitialContext();
queue = (Queue) initialContext.lookup("queue/queueName");
connectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");
connection = connectionFactory.createQueueConnection();
connection.start();
session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
sender = session.createSender(queue);
sender.setPriority(3);
// Send message with priority 3
TextMessage message = session.createTextMessage("This message has a priority of 3");
sender.send(message);
// Send another message with priority 9
message = session.createTextMessage("This message has a priority of 9");
sender.send(message, DeliveryMode.PERSISTENT, 9, 0);
Thread.sleep(1000);
} catch (JMSException | NamingException ex) {
ex.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (initialContext != null) {
try {
initialContext.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
}
}
In the above class notice that I am sending two messages and before sending the first message I have set the priority 3. Later before sending the second message I have set message priority to 9. So, the second message the highest priority and the second message should be received by broker first.
Message Consumer
The message consumer consumes messages. Here I am creating two consumers to consume two messages. In point to point communication a producer and a consumer participate in message exchange.
Here is the source code for consumer:
package com.roytuts.jms.message.priority.consumer;
public class Consumer {
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
Session session;
Connection connection;
MessageConsumer consumer;
InitialContext initialContext;
public void initialContext() {
try {
initialContext = new InitialContext();
} catch (NamingException e) {
e.printStackTrace();
}
}
public void openConnection() {
try {
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
connection = connectionFactory.createConnection();
} catch (NamingException | JMSException e) {
e.printStackTrace();
}
}
public void closeConnection() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public void createConsumer() {
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = (Queue) initialContext.lookup("queue/queueName");
consumer = session.createConsumer(queue);
connection.start();
} catch (NamingException | JMSException e) {
e.printStackTrace();
}
}
public void receive() {
Message message;
try {
message = consumer.receive();
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
LOGGER.info("Consumer received a message '{}' at {}", textMessage.getText(), LocalTime.now());
} else if (message == null) {
LOGGER.error("Consumer fails to receive the message sent by the producer.");
} else {
throw new JMSException("Message must be a type of TextMessage");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Main Class
The following class will start the producer and consumer to start the message communication.
package com.roytuts.jms.message.priority;
public class App {
public static void main(String[] args) {
Producer producer = new Producer();
producer.produceMessage();
Consumer consumer = new Consumer();
consumer.initialContext();
consumer.openConnection();
consumer.createConsumer();
consumer.receive();
consumer.receive();
consumer.closeConnection();
}
}
Testing Message Priority
Running the above main class will give you the following output:
INFO | Consumer received a message 'This message has a priority of 9' at 20:03:25.946926800
INFO | Consumer received a message 'This message has a priority of 3' at 20:03:25.953926300
So, it’s obvious that the high priority message was received by consumer earlier than low priority message.
That’s all about setting message priority in JMS in point to point communication architecture in JMS.
need Program , Where we will impliment it.