Spring JMS and ActiveMQ Integration – publish/subscribe domain

Introduction

Spring JMS and ActiveMQ Integration tutorial will show you how we can send a message to Topic using publish/subscribe messaging system in Apache ActiveMQ. Here we will create both annotation based Spring Boot application and standalone Spring application with XML configurations.

For more information on publish/subscribe messaging system please read tutorial https://www.roytuts.com/configure-jms-client-using-glassfish-3/

Prerequisites

Java 8 or 12, Eclipse 4.12, Gradle 5.6, ActiveMQ 5.15.10, Spring Boot 2.2.1, Spring 4.1.5

Make sure you read how to configure ActiveMQ in Windows environment by reading the tutorial https://www.roytuts.com/apache-activemq-configuration-in-windows/.

Let’s move onto the following steps in order to implement publish/subscribe messaging system using Spring and ActiveMQ.

Creating Project

For Spring Boot we will create gradle based project in Eclipse. The name of the project is spring-jms-activemq-publish-subscribe.

Updating Build Script

Update the gradle.build script to include the required dependencies for application.

buildscript {
	ext {
		springBootVersion = '2.2.1.RELEASE'
	}
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

plugins {
    id 'java-library'
    id 'org.springframework.boot' version '2.2.1.RELEASE'
}

sourceCompatibility = 12
targetCompatibility = 12

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-activemq:${springBootVersion}")
    implementation('org.apache.activemq:activemq-broker:5.15.11')
}

If you are using maven based project, then using Spring version 4.1.5 and Java 8, you can use below pom.xml file or you can upgrade the Spring version:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.roytuts</groupId>
	<artifactId>spring-jms-activemq-publish-subscribe</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<jdk.version>1.8</jdk.version>
		<junit.version>4.11</junit.version>
		<slf4j.version>1.7.5</slf4j.version>
		<activemq.version>5.11.1</activemq.version>
		<spring.version>4.1.5.RELEASE</spring.version>
	</properties>
	
	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
		<!-- Spring -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>${jdk.version}</source>
					<target>${jdk.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Let’s move on to the Java code example…

Property File

For Spring Boot application we will create application.properties file under classpath directory src/main/resources with the following content:

JMS.BROKER.URL=tcp://localhost:61616
JMS.TOPIC.NAME=IN_TOPIC

For XML based configuration we will create activemq-jms-spring-properties.xml under classpath directory src/main/resources.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
	<bean id="jmsSpringProperties"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
		name="jmsSpringProperties">
		<property name="order" value="99999" />
		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
		<property name="ignoreUnresolvablePlaceholders" value="true" />
		<property name="properties">
			<value>
				<!-- JMS -->
				JMS.BROKER.URL=tcp://localhost:61616
				JMS.TOPIC.NAME=IN_TOPIC
			</value>
		</property>
	</bean>
</beans>

The property file just declares ActiveMQ broker URL and topic name as key/value pairs that will be used for messaging system.

Create Message Publisher

We need message publisher that will produce message and publish to the topic. From this topic the subscribers which are subscribed to this topic will receive message.

We are using here JmsTemplate API to send the message to the topic provided by Spring framework.

Please note you don’t need @Component annotation for XML based Spring application.

package com.roytuts.spring.jms.activemq.publish.subscribe.publisher;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessagePublisher {

	@Autowired
	private JmsTemplate jmsTemplate;

	public void sendMessage(final String message) {
		jmsTemplate.convertAndSend(message);
	}

}

Create Message Subscribers

We will create two subscribers to receive message from topic. Each subscriber will receive the same message from the topic.

We are just accepting text message from the topic and for all other message it will throw an exception.

We mark each subscriber with @Component annotation to avoid creating bean ourselves.

Please note you don’t need @Component annotation for XML based Spring application.

MessageSubscriberOne

package com.roytuts.spring.jms.activemq.publish.subscribe.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class MessageSubscriberOne implements MessageListener {

	@Override
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			try {
				String msg = ((TextMessage) message).getText();
				System.out.println("Message consumed by MessageSubscriber1 : " + msg);
			} catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		} else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}

}

MessageSubscriberTwo

package com.roytuts.spring.jms.activemq.publish.subscribe.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class MessageSubscriberTwo implements MessageListener {

	@Override
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			try {
				String msg = ((TextMessage) message).getText();
				System.out.println("Message consumed by MessageSubscriber2 : " + msg);
			} catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		} else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}

}

Create Configuration

Next we need to configure our application to create various beans such as Connection Factory, Topic, Message Publisher etc.

We will tell you both XML based and annotation based configurations.

Create an XML configuration file called activemq-jms-spring-jms.xml that contains JMS related configuration for the application. Put this file under src/main/resources/jms.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
	
	<!-- Activemq connection factory -->
	<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<constructor-arg index="0" value="${JMS.BROKER.URL}" />
	</bean>
	
	<!-- ConnectionFactory Definition -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<constructor-arg ref="amqConnectionFactory" />
	</bean>
	
	<!-- Destination Topic -->
	<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg index="0" value="${JMS.TOPIC.NAME}" />
	</bean>
	
	<!-- JmsTemplate Definition -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="destinationTopic" />
		<property name="pubSubDomain" value="true" />
	</bean>
	
	<!-- Message Publisher -->
	<bean id="messagePublisher" class="com.roytuts.spring.jms.activemq.publish.subscribe.publisher.MessagePublisher" />
	
	<!-- Message Subscriber1 -->
	<bean id="messageSubscriber1" class="com.roytuts.spring.jms.activemq.publish.subscribe.subscriber.MessageSubscriber1" />
	
	<!-- Message Subscriber2 -->
	<bean id="messageSubscriber2" class="com.roytuts.spring.jms.activemq.publish.subscribe.subscriber.MessageSubscriber2" />
	
	<!-- Message Subscriber1 Container -->	
	<bean
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destinationName" value="${JMS.TOPIC.NAME}" />
		<property name="messageListener" ref="messageSubscriber1" />
		<property name="pubSubDomain" value="true" />
	</bean>
	
	<!-- Message Subscriber2 Container -->
	<bean
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destinationName" value="${JMS.TOPIC.NAME}" />
		<property name="messageListener" ref="messageSubscriber2" />
		<property name="pubSubDomain" value="true" />
	</bean>
</beans>

For Spring Boot application create below config class:

package com.roytuts.spring.jms.activemq.publish.subscribe.config;

import javax.jms.ConnectionFactory;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.MessageListenerContainer;

import com.roytuts.spring.jms.activemq.publish.subscribe.subscriber.MessageSubscriberOne;
import com.roytuts.spring.jms.activemq.publish.subscribe.subscriber.MessageSubscriberTwo;

@Configuration
@PropertySource(value = "classpath:application.properties")
public class JmsConfig {

	@Autowired
	private Environment env;

	@Autowired
	private MessageSubscriberOne messageSubscriberOne;

	@Autowired
	private MessageSubscriberTwo messageSubscriberTwo;

	@Bean
	public ConnectionFactory connectionFactory() {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(env.getProperty("JMS.BROKER.URL"));
		return connectionFactory;
	}

	@Bean
	public CachingConnectionFactory cachingConnectionFactory() {
		CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory());
		return cachingConnectionFactory;
	}

	@Bean
	public Topic topic() {
		Topic topic = new ActiveMQTopic(env.getProperty("JMS.TOPIC.NAME"));
		return topic;
	}

	@Bean
	public JmsTemplate jmsTemplate() {
		JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());

		jmsTemplate.setDefaultDestination(topic());
		jmsTemplate.setPubSubDomain(true);

		return jmsTemplate;
	}

	@Bean(name = "messageListenerContainerOne")
	public MessageListenerContainer messageListenerContainerOne() {
		DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();

		// messageListenerContainer.setPubSubDomain(true);
		messageListenerContainer.setDestination(topic());
		messageListenerContainer.setMessageListener(messageSubscriberOne);
		messageListenerContainer.setConnectionFactory(connectionFactory());

		return messageListenerContainer;
	}

	@Bean(name = "messageListenerContainerTwo")
	public MessageListenerContainer messageListenerContainerTwo() {
		DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();

		// messageListenerContainer.setPubSubDomain(true);
		messageListenerContainer.setDestination(topic());
		messageListenerContainer.setMessageListener(messageSubscriberTwo);
		messageListenerContainer.setConnectionFactory(connectionFactory());

		return messageListenerContainer;
	}

}

For XML configuration, we will load all XML configuration into one XML file. Therefore, create an XML file called activemq-jms-spring-context.xml that will load all other resources and configure support for annotation in the application. Put this file under src/main/resources/spring.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
	<context:annotation-config></context:annotation-config>
	<import resource="classpath:activemq-jms-spring-properties.xml" />
	<import resource="classpath:jms/activemq-jms-spring-jms.xml" />
</beans>

Create Main Class

We will create two separate main classes – one for Spring XML based configuration and another for Spring Boot application.

For XML based configuration, use below class:

package com.roytuts.spring.jms.activemq.publish.subscribe;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.roytuts.spring.jms.activemq.publish.subscribe.publisher.MessagePublisher;

public class SpringJmsActiveMqPubSubApp {
	
	public static void main(String[] args) {
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/activemq-jms-spring-context.xml");
		
		MessagePublisher messagePublisher = (MessagePublisher) applicationContext.getBean("messagePublisher");
		
		messagePublisher.sendMessage("This is a message that will be posted into Topic.");
	}
	
}

For Spring Boot application, use below class:

package com.roytuts.spring.jms.activemq.publish.subscribe;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.jms.activemq.publish.subscribe.publisher.MessagePublisher;

@SpringBootApplication(scanBasePackages = "com.roytuts.spring.jms.activemq.publish.subscribe")
public class SpringJmsActiveMqPubSubApp implements CommandLineRunner {

	@Autowired
	private MessagePublisher messagePublisher;

	public static void main(String[] args) {
		SpringApplication.run(SpringJmsActiveMqPubSubApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		messagePublisher.sendMessage("This is a message that will be posted into Topic.");
	}

}

Testing the Application

Running the main class you will get the following output in the console:

Message consumed by MessageSubscriber1 : This is a message that will be posted into Topic.
Message consumed by MessageSubscriber2 : This is a message that will be posted into Topic.

Verify in ActiveMQ Web Console

Click on Topics in ActiveMQ Web Console. You will not see any message in the IN_TOPIC because as soon as you ran the MessagePublisher the message has been consumed by MessageSubscriberOne and MessageSubscriberTwo.

You will also notice one message was enqueued but two messages has been dequeued because there were two message subscribers.

We do not need to run the MessageSubscriberOne or MessageSubscriberTwo because it is an asynchronous messaging system and MessageSubscriberOne or MessageSubscriberTwo is already registered to the Spring’s DefaultMessageListenerContainer. So when a message arrives to the Topic, the message gets automatically consumed by the subscriber’s onMessage() method.

ActiveMQ Spring JMS publish subscribe

Source Code

Download Spring Boot App

That’s all. Thank you for reading.

Related posts

Leave a Comment