Real Time Data Over WebSocket Using Kafka Streams Spring Boot Angular

Web Socket and Apache Kafka

In this tutorial I will show you how to work with Apache Kafka Streams for building Real Time Data Processing with STOMP over Websocket using Spring Boot and Angular. You will see how to build push notifications using Apache Kafka, Spring Boot and Angular. You need to provide some basic things that Kafka Streams requires, such as, the cluster information, application id, the topic to consume, Serdes to use, and so on.

I would not make the content lengthy by explaining about Kafka Streams but you can always find very good documentation about Apache Kafka.

Let’s look at some of these basic things that Kafka Streams requires:

Cluster Information

By default, the binder will try to connect to a cluster that is running on http://localhost:9092. If that is not the case, you can override that by using the available configuration properties.

Application ID

In a Kafka Streams application, application.id is a mandatory field. Without it, you cannot start a Kafka Streams application. By default, the binder will generate an application ID and assign it to the processor. It uses the function bean name as a prefix.

Topic to consume from

You need to provide a topic from where Kafka will consume the stream of messages.

Serialization and Deserialization (Serdes)

Kafka Streams uses a special class called Serde to deal with data marshaling. It is essentially a wrapper around a deserializer on the inbound and a serializer on the outbound. Normally, you have to tell Kafka Streams what Serde to use for each consumer. Binder, however, infers this information by using the parametric types provided as part of Kafka Streams.

Prerequisites

Java 12/19, Spring Boot 2.2.2/3.1.5, Maven 3.6.1/3.8.5, Gradle 5.6, Spring Kafka 2.3.4/3.6.0, Angular 8/17

How to setup and work with Apache Kafka in Windows Environment

How to create new Angular project in Windows

Server Application

Project Setup

Let’s create a project either maven or gradle based in the IDE or Java based tool. The name of the project is spring-apache-kafka-streams-websocket-stomp-server.

For the maven based project you can use the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-apache-kafka-streams-websocket-stomp-server</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>19</maven.compiler.source>
		<maven.compiler.target>19</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.5</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

If you are creating gradle based project then you can use below build.gradle script:

buildscript {
	ext {
		springBootVersion = '2.2.2.RELEASE'
	}
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

plugins {
    id 'java-library'
    id 'org.springframework.boot' version '2.2.2.RELEASE'
}

sourceCompatibility = 12
targetCompatibility = 12

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-websocket:${springBootVersion}")
    implementation 'org.springframework.kafka:spring-kafka:2.3.4.RELEASE'
    implementation 'org.apache.kafka:kafka-streams:2.4.0'
}

Application Properties

I will create application.properties file under class path directory src/main/resources to configure some basic settings for Kafka.

Producer will produce messages into roytuts-input topic. Kafka stream processor will consume the message from roytuts-input topic and write into roytuts-output topic. Next consumer will consume messages from roytuts-output topic.

Finally SimpMessagingTemplate will write to /topic/greeting.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=roytutsGroup

kafka.input.topic=roytuts-input
kafka.output.topic=roytuts-output

stomp.topic=/topic/greeting

Kafka Topics

I will create two topics in Kafka for consuming and publishing messages.

To create a topic you need to add a bean of type NewTopic. If the topic already exists then this bean is ignored.

The topics will be created during application start up.

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server.config;

@Configuration
public class KafkaTopicConfig {

	@Value("${kafka.input.topic}")
	private String kafkaInputTopic;

	@Value("${kafka.output.topic}")
	private String kafkaOutputTopic;

	@Bean
	public NewTopic inputTopic() {
		NewTopic newTopic = new NewTopic(kafkaInputTopic, 1, (short) 1);

		return newTopic;
	}

	@Bean
	public NewTopic outputTopic() {
		NewTopic newTopic = new NewTopic(kafkaOutputTopic, 1, (short) 1);

		return newTopic;
	}

}

Kafka Streams

Kafka stream processor will consume from input topic and do some business processing on input data and write to output topic. Though in this example the processor just reads the messages and writes to topic but ideally your application will do some business processing on the input data.

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server.config;

@Configuration
public class KafkaStreamsConfig {

	@Value("${kafka.input.topic}")
	private String kafkaInputTopic;

	@Value("${kafka.output.topic}")
	private String kafkaOutputTopic;

	@Value("${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServer;

	@Bean
	public KStream<String, String> kstream() {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "roytuts-stomp-websocket");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

		StreamsBuilder streamsBuilder = new StreamsBuilder();

		final KStream<String, String> stream = streamsBuilder.stream(kafkaInputTopic,
				Consumed.with(Serdes.String(), Serdes.String()));

		stream.map((key, value) -> KeyValue.pair(key, value)).to(kafkaOutputTopic,
				Produced.with(Serdes.String(), Serdes.String()));

		KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), props);
		streams.start();

		return stream;
	}

}

Configure WebSocket and Stomp

The above WebSocketConfig class is annotated with @Configuration to indicate that it is a Spring configuration class.

The class is also annotated @EnableWebSocketMessageBroker and @EnableWebSocketMessageBroker enables WebSocket message handling, backed by a message broker.

The configureMessageBroker() method overrides the default method in WebSocketMessageBrokerConfigurer interface to configure the message broker.

It starts by calling enableSimpleBroker() to enable a simple memory-based message broker to carry the greeting messages back to the client on destinations prefixed with /topic.

The registerStompEndpoints() method registers the /websocket endpoint, enabling SockJS fallback options so that alternate transports may be used if WebSocket is not available.

The SockJS client will attempt to connect to /websocket and use the best transport available (websocket, xhr-streaming, xhr-polling, etc).

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server.config;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		//registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
		registry.addEndpoint("/websocket").setAllowedOrigins("http://localhost:4200").withSockJS();
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry config) {
		config.enableSimpleBroker("/topic");
	}

}

Related Posts:

Greeting Service

This class will generate different greet messages depending upon the time of the day. I have also appended random generated string so that we will get different random string appended with actual greet message to differentiate from each other every and it will actually tell us that we are getting every time the new message.

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server.service;

@Service
public class GreetingService {

	public String greet() {
		Calendar c = Calendar.getInstance();

		int timeOfDay = c.get(Calendar.HOUR_OF_DAY);

		StringBuilder sb = new StringBuilder();

		String message = "Have a Good Day";

		if (timeOfDay >= 0 && timeOfDay < 12) {
			message = "Good Morning";
		} else if (timeOfDay >= 12 && timeOfDay < 16) {
			message = "Good Afternoon";
		} else if (timeOfDay >= 16 && timeOfDay < 21) {
			message = "Good Evening";
		} else if (timeOfDay >= 21 && timeOfDay < 24) {
			message = "Good Night";
		}

		sb.append(message).append(" - ").append(generateString());

		return sb.toString();
	}

	private String generateString() {
		String uuid = UUID.randomUUID().toString();
		return uuid;
	}

}

Send Message

Spring’s KafkaTemplate is auto-configured and it can be autowired directly into bean to send a message.

You will get different overloaded methods of send() and you can choose according to your needs.

In your real application the source of data ideally would be different, such as some feed URL, external web service or anything else but in this example I am pushing data every 3 seconds using scheduler to simulate the data feed.

The data or messages are sent to topic roytuts-input.

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server.producer;

@Component
@EnableScheduling
public class MessageProducer {

	@Value("${kafka.input.topic}")
	private String kafkaInputTopic;

	@Autowired
	private GreetingService greetingService;

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@Scheduled(fixedRate = 1000)
	public void produce() {
		String msg = greetingService.greet();

		System.out.println("Greeting Message :: " + msg);

		kafkaTemplate.send(kafkaInputTopic, msg);
	}

}

Consume Message

Now we will consume the messages which were written to roytuts-output topic by Kafka stream processor.

With Apache Kafka infrastructure a bean can be annotated with @KafkaListener to create a listener endpoint on a topic.

Finally we will send to stomp topic /topic/greeting.

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server.consumer;

@Component
public class MessageConsumer {

	@Value("${stomp.topic}")
	private String stompTopic;

	@Autowired
	private SimpMessagingTemplate messagingTemplate;

	@KafkaListener(topics = "${kafka.output.topic}", groupId = "${spring.kafka.consumer.group-id}")
	public void consumeMessage(String msg) {
		System.out.println("Message received: " + msg);
		messagingTemplate.convertAndSend(stompTopic, msg);
	}

}

Main Class

A main class means a class is having the main method that starts the application. So in this Spring Boot application main class is enough to deploy and run the application.

package com.roytuts.spring.apache.kafka.streams.websocket.stomp.server;

@SpringBootApplication
public class SpringKafkaStreamaRealTimeApp {

	public static void main(String[] args) {
		SpringApplication.run(SpringKafkaStreamaRealTimeApp.class, args);
	}

}

Testing the Kafka Application

Make sure your ZooKeeper server and Kafka broker are running before you run the main class.

Running the main class will produce the stream of messages in the console:

Greeting Message :: Good Afternoon - a6bc90a4-0edf-43ec-9e13-2ede13465fdd
Message received: Good Afternoon - a6bc90a4-0edf-43ec-9e13-2ede13465fdd
Greeting Message :: Good Afternoon - 035ab6dd-9b35-4144-a7a7-10c02bace24c
Message received: Good Afternoon - 035ab6dd-9b35-4144-a7a7-10c02bace24c
Greeting Message :: Good Afternoon - 8dc634ac-a316-4de3-af9d-214ee3c60e66
Message received: Good Afternoon - 8dc634ac-a316-4de3-af9d-214ee3c60e66
Greeting Message :: Good Afternoon - 87d57df9-1a8b-4610-90c2-85a62a7a7e67
Message received: Good Afternoon - 87d57df9-1a8b-4610-90c2-85a62a7a7e67
Greeting Message :: Good Afternoon - d48eeccf-b68d-4fdc-ab08-070207237d1c
... and so on

You are done with the server application on building real time data processing using Apache Kafka Streams.

Now you will see how to create client application in Angular to see push notifications continuously on browser.

Client Application

Project Setup

As I said in prerequisites section how to create Angular project in Windows environment, so first create an Angular project. The name of the project is spring-apache-kafka-streams-websocket-stomp-client-angular.

Installing Required Modules

Install the required modules with the following commands.

For Angular 17 use the following command format:

npm i --save-dev @types/sockjs-client
npm i --save-dev @types/jquery
npm i --save-dev @types/stompjs
npm i net -S

For Angular 8 use the following command format:

npm install stompjs
npm install sockjs-client
npm install jquery
npm i net -S

The stompjs is required to connect over STOMP.

The sockjs-client is required to establish connection with WebSocket server.

The jquery is required to directly access DOM elements in the HTML page.

To avoid net issue we need to install net module.

Update index.html

You need to declare window in the src/index.html file to avoid the below issue:

Uncaught ReferenceError: global is not defined
    at Object../node_modules/sockjs-client/lib/utils/browser-crypto.js

The complete content of src/index.html file is given below:

<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>SpringApacheKafkaStreamsWebsocketStompClientAngular</title>
  <base href="/">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <link rel="icon" type="image/x-icon" href="favicon.ico">
  <script>
	if (global === undefined) {
      var global = window;
    }
  </script>
</head>
<body>
  <app-root></app-root>
</body>
</html>

Update app.component.html

We will update the src/app/app.component.html file to put a div tag where greeting message will be updated.

<div class="msg"></div>

<router-outlet></router-outlet>

Update app.component.ts

We will update src/app/app.component.ts file to consume the message over STOMP.

We set the page title by implementing OnInit interface in the ngOnInit() method.

We establish connection to the WebSocket server, client socket subscribe to the topic /topic/greeting destination, where the server will publish greeting messages and finally we update the div (having a class msg) on HTML page.

For Angular 17, use the following code:

import { OnInit, Component } from '@angular/core';
import { Title } from '@angular/platform-browser';

import { CommonModule } from '@angular/common';
import { RouterOutlet } from '@angular/router';

import Stomp from 'stompjs';
import SockJS from 'sockjs-client';
import $ from 'jquery';

@Component({
  selector: 'app-root',
  standalone: true,
  imports: [CommonModule, RouterOutlet],
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  url = 'http://localhost:8080/websocket'
  client: any;
  
  ngOnInit() {
	this.title.setTitle('Angular Spring Websocket');
  }
  
  constructor(private title: Title){
    this.connection();
  }
  
  connection(){
    let ws = new SockJS(this.url);
    this.client = Stomp.over(ws);
    let that: any = this;
	
    this.client.connect({}, function(frame: any) {
      that.client.subscribe("/topic/greeting", (message: any) => {
        if(message.body) {
		  $(".msg").html(message.body)
        }
      });
    });
  }
  
}

For Angular 8, use the following code:

import { OnInit, Component } from '@angular/core';
import { Title } from '@angular/platform-browser';

import * as Stomp from 'stompjs';
import * as SockJS from 'sockjs-client';
import $ from 'jquery';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  url = 'http://localhost:8080/websocket'
  client: any;
  greeting: string;
  
  ngOnInit() {
	this.title.setTitle('Angular Spring Websocket');
  }
  
  constructor(private title: Title){
    this.connection();
  }
  
  connection(){
    let ws = new SockJS(this.url);
    this.client = Stomp.over(ws);
    let that = this;
	
    this.client.connect({}, function(frame) {
      that.client.subscribe("/topic/greeting", (message) => {
        if(message.body) {
          this.greeting = message.body;
		  //$(".msg").append(this.greeting)
		  $(".msg").html(this.greeting)
		  //alert(this.greeting);
		  //console.log(this.greeting);
        }
      });
    });
  }
}

I am here to just get the message from server as a push notification towards clients.

Testing the Kafka and Angular Application

With your server running now run the client application by executing command ng serve --open.

Your application opens at http://localhost:4200 and you will see the message being updated every 3 seconds.

Apache Kafka Streams for building Real Time Data Processing with STOMP over Websocket using Spring and Angular

In the above image the highlighted random string will be continuously changing.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *