JMS Message Persistence Example

Software applications that use Java Messaging Service (JMS) are required to ensure the reliable delivery of message between clients. Message persistency in JMS API provides a way to ensure this. By default message will persist until the consumer consumes it even on message broker restart.

JMS API provides two delivery modes for message delivery:

  1. Persistent delivery mode
  2. Non-Persistent delivery mode.

You can go through JMS Concepts – Persistent and Durable to read more on message delivery modes. In my other JMS tutorials you will find all message delivery modes are Persistent because if you do not specify any mode for message delivery then by default the message delivery mode is Persistent.

Prerequisites

Apache ActiveMQ Configuration in Windows, Active MQ 5.16.0, Java at least 1.8, Gradle 6.5.1, Maven 3.6.3

Project Setup

You can create maven or gradle based project in your favorite IDE or tool. For this example you can create a standalone project. If you are creating maven based project then you can use the following pom.xml file.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.roytuts</groupId>
	<artifactId>jms-message-persistence</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<activemq.version>5.16.0</activemq.version>
	</properties>
	
	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
	</dependencies>
	
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.1</version>
				<configuration>
					<source>${java.version}</source>
					<target>${java.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

If you are creating gradle based project then use the following build.gradle script.

plugins {
    id 'java-library'
}

sourceCompatibility = 12
targetCompatibility = 12

repositories {
    jcenter()
}

dependencies {
    implementation 'org.apache.activemq:activemq-all:5.16.0'
}

JNDI Configuration

Java Naming Directory Interface (JNDI) configuration is required for the JMS services. JNDI allows Java software clients to discover and look up data and resources, in the form of Java objects, via a name. Create below jndi.properties file under src/main/resources folder.

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
queue.queue/queueName=IN_QUEUE

Message Producer

Create below producer class which sends message to a queue IN_QUEUE. The role of the message producer is to produce the message. The message broker is the intermediary that communicates between message producer and message consumer. So the message is exchanged through message broker.

package com.roytuts.jms.message.persistence;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Producer {

    public static void main(String[] args) {
        new Producer().produceMessage();
    }

    public void produceMessage() {
        InitialContext initialContext = null;
        QueueConnectionFactory connectionFactory;
        QueueConnection connection = null;
        QueueSender sender;
        QueueSession session;
        Queue queue;

        try {
            // Step 1. Create an initial context to perform the JNDI lookup.
            initialContext = new InitialContext();

            // Step 2. Look-up the JMS queue
            queue = (Queue) initialContext.lookup("queue/queueName");

            // Step 3. Look-up the JMS queue connection factory
            connectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");

            // Step 4. Create a JMS queue connection
            connection = connectionFactory.createQueueConnection();

            // Step 5. Set the client-id on the connection
            connection.start();

            // step 6. Create queue session
            session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

            // step 7. Create queue sender
            sender = session.createSender(queue);

            // step 8. non-persistence delivery of message
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // Step 9. Create a text message
            TextMessage message = session.createTextMessage("This sample message is consumed by consumer");

            // Step 10. Send the text message to the queue
            sender.send(message);
        } catch (JMSException | NamingException ex) {
            ex.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (initialContext != null) {
                try {
                    initialContext.close();
                } catch (NamingException e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

In the above class I have set non-persistence for message delivery mode.

sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Therefore your message will not be persisted in case message is produced and broker is unavailable. So message consumer will not receive the message on broker failure.

Message Consumer

Message consumer consumes the available message.

package com.roytuts.jms.message.persistence;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Consumer {

    public static void main(String[] args) {
        new Consumer().consumeMessage();
    }

    public void consumeMessage() {
        InitialContext initialContext = null;
        QueueConnectionFactory connectionFactory;
        QueueConnection connection = null;
        MessageConsumer consumer;
        QueueSession session;
        Queue queue;

        try {
            // Step 1. Create an initial context to perform the JNDI lookup.
            initialContext = new InitialContext();

            // Step 2. Look-up the JMS queue
            queue = (Queue) initialContext.lookup("queue/queueName");

            // Step 3. Look-up the JMS queue connection factory
            connectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");

            // Step 4. Create a JMS queue connection
            connection = connectionFactory.createQueueConnection();

            // Step 5. Set the client-id on the connection
            connection.start();

            // step 6. Create queue session
            session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

            // step 7. Create queue sender
            consumer = session.createConsumer(queue);

            // Step 8. receive text message
            Message message = consumer.receive();
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Consumer received a message produced by Producer : " + textMessage.getText());
            } else if (message == null) {
                System.out.println("Consumer fails to receive the message sent by the producer.");
            } else {
                throw new JMSException("Message must be a type of TextMessage");
            }
        } catch (JMSException | NamingException ex) {
            ex.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (initialContext != null) {
                try {
                    initialContext.close();
                } catch (NamingException e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

Testing the App

Make sure your ActiveMQ broker running, run the class Consumer and Producer. You will see the below output in console:

Consumer received a message produced by Producer : This sample message is consumed by consumer

So you have just tested the application while message broker is up and running. Now to test on broker restart, first run the Producer class. Stop the broker. Again start the broker. Now run the Consumer class. The consumer will not receive any output in the console because for Non-Persistent delivery message is not saved in the disk or memory for later use.

But if you comment the following line and run the producer then restart the broker, the consumer will receive the message on broker restart.

sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

This time consumer receives message because, by default the message delivery mode is Persistent. If you want you can also set Persistent message delivery using the following line:

sender.setDeliveryMode(DeliveryMode.PERSISTENT);

Hope you got idea how persistency in JMS service works.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *