Publish Subscribe Message onto MQTT using Spring Boot

Introduction

We will see an example here how to publish and subscribe message onto MQTT (Mosquitto) using Spring Boot framework. We had seen in our previous example how to publish and subscribe message using Mosquitto client – MQTT.fx. So here we are going to ingrate Spring application with MQTT broker through Spring integration framework, though the style of programming is not a typical Spring integration programming.

We have discussed what is Mosquitto (MQTT broker) or how it is useful in introduction section of the tutorial where I had shown how to install Mosquitto server on Windows environment.

We have also discusses about QoS (Quality of Service) in QoS section of the tutorial where I had shown how to publish and subscribe message using MQTT.fx tool.

We will also see here how MQTT.fx client tool receives message from MQTT broker once message is published to the broker using Spring Boot application.

Prerequisites

Eclipse 4.9.0, Java 1.8, Gradle 4.10.2, Spring Boot 2.1.8, Mosquitto

Project Configuration

In order to create Spring Boot application we need to first setup a project in Eclipse. We will create a gradle based project here and the project name is spring-integration-mqtt-publisher-subscriber.

Build Script

We need to add the required dependency for Spring Boot and MQTT broker (Eclipse Paho library).

buildscript {
	ext {
		springBootVersion = '2.1.8.RELEASE'
	}
	
    repositories {
    	mavenLocal()
    	mavenCentral()
    }
    
    dependencies {
    	classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'org.springframework.boot'

sourceCompatibility = 8
targetCompatibility = 8

repositories {
	mavenLocal()
    mavenCentral()
}

dependencies {
	implementation("org.springframework.boot:spring-boot-starter:${springBootVersion}")
	implementation('org.springframework.integration:spring-integration-mqtt:5.2.0.RELEASE')
}

MQTT Client Configuration

In order to send to and/or receive messages from MQTT broker, the first thing is to obtain an implementation of the IMqttClient interface.

This interface contains all methods required by an application in order to establish a connection to the server, send and receive messages.

Eclipse paho library provides two implementations of the IMqttClient interface, an asynchronous – MqttAsyncClient and a synchronous – MqttClient.

For our example we will focus on synchronous one – MqttClient.

Creating MqttClient Instance

The following code snippets show how to create MqttClient instance.

This is using a simple constructor from the library and it takes endpoint address of the broker and client identifier.

IMqttClient mqttClient = new MqttClient("tcp://" + hostname + ":" + port, clientId);

We will use the computer name as a client identifier.

Connecting to MQTT Server

The above code snippet only creates an instance of the MqttClient class but it does not connect to the server. So we need to call connect() method for connecting to the server by passing optional connection parameters.

mqttClient.connect(mqttConnectOptions());

The complete source code in a method for connecting to the MQTT server is given below:

@Bean
public IMqttClient mqttClient(@Value("${mqtt.clientId}") String clientId,
		@Value("${mqtt.hostname}") String hostname, @Value("${mqtt.port}") int port) throws MqttException {

	IMqttClient mqttClient = new MqttClient("tcp://" + hostname + ":" + port, clientId);

	mqttClient.connect(mqttConnectOptions());

	return mqttClient;
}

Connection Parameters Configurations

Connection parameters are configured through a bean as shown below:

@Bean
@ConfigurationProperties(prefix = "mqtt")
public MqttConnectOptions mqttConnectOptions() {
	return new MqttConnectOptions();
}

Connection Parameters

The above bean uses parameters from application.properties file which is under src/main/resources folder.

The MqttConnectOptions bean uses the following parameters and notice how I have configured it using @ConfigurationProperties annotation with prefix = "mqtt".

The MqttConnectOptions maps only the required parameters which match with its attributes. Rest of the parameters are used to create MqttClient instance above as shown in the Connecting to MQTT Server section.

mqtt.automaticReconnect=true
mqtt.cleanSession=true
mqtt.connectionTimeout=10
mqtt.clientId=${COMPUTERNAME}-MQTT
mqtt.hostname=127.0.0.1
mqtt.port=1883

Publishing and Receiving Messages

Now we will see how to publish message to a topic and subscribe to a topic. We will subscribe to a topic and get the message and return the response.

Publishing Message

Publishing message to the MQTT broker using MqttClient instance is straight forward.

We can use one of the publish() method variants to publish message. You can see one variant is commented in the following code snippets.

You need to send some parameters as shown in the below method.

First parameter is the name of the topic, second parameter is message or payload you want to publish to the topic, third parameter is QoS (Quality of Service), fourth parameter is flag to the broker and it indicates whether you want to retain the message or not until subscriber consumes the message.

Here I have called disconnect method once message publishing is done. Ideally you may not want to disconnect from the server.

public void publish(final String topic, final String payload, int qos, boolean retained)
		throws MqttPersistenceException, MqttException {
	MqttMessage mqttMessage = new MqttMessage();
	mqttMessage.setPayload(payload.getBytes());
	mqttMessage.setQos(qos);
	mqttMessage.setRetained(retained);

	mqttClient.publish(topic, mqttMessage);
	
	//mqttClient.publish(topic, payload.getBytes(), qos, retained);

	mqttClient.disconnect();
}

Receiving Message

In order to receive messages from the MQTT broker, we need to use one of the subscribe() method variants.

In the following method we have shown how to add a message listener to an existing MqttClient instance to receive messages from a given topic.

The subscribe() method used below takes an IMqttMessageListener instance as its second argument.

public void subscribe(final String topic) throws MqttException, InterruptedException {
	System.out.println("Messages received:");

	mqttClient.subscribeWithResponse(topic, (tpic, msg) -> {
		System.out.println(msg.getId() + " -> " + new String(msg.getPayload()));
	});
}

Creating Main Class

As we know main class with annotation @SpringBootApplication and main method is enough to run the application. Here we are creating Spring Boot standalone project and run using commandline runner.

So in the run() method we call the above publish() and subsribe() methods as shown below:

@Override
public void run(String... args) throws Exception {
	final String topic = "roytuts/topic/event";

	messagingService.subscribe(topic);

	messagingService.publish(topic, "Hi\nThis is a sample message published to topic roytuts/topic/event", 0, true);

	context.close();
}

Finally we close the context and terminate the application.

Testing the Application

Run the main class, your application will publish the message to the broker through topic and subscriber will receive the message.

Console Output

On Eclipse console you will see the below message which is received by the subscriber:

Messages received:
0 -> Hi
This is a sample message published to topic roytuts/topic/event

Using MQTT Client

Using MQTT client – MQTT.fx tool, you will also receive the same message as shown below in the image:

publish subscribe message onto mqtt using spring boot

That’s all. Hope you got an idea on how to publish subscribe message onto MQTT using Spring Boot.

Source Code

Download

Thanks for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *