In this tutorial we will see an example of event driven streaming using Spring Cloud Stream and Apache Kafka streaming platform. How do we deal with some events, such as, a new user has registered to a portal, an order was placed, a file has been uploaded etc? Let’s say when an order was placed then we need a call to process payment, a call to reserve inventory, a call to begin the process of packaging, picking and shipping the product.
For a few orders it’s not a big deal and we can make few backend services directly and it should not introduce too much overhead. But, suppose, we have 100 such orders and we have to deal with such 100 orders a second then we have to make 3×100 = 300 calls to backend services per second. Now if we add another service, for example, reporting to our sales track, then we have to make 400 calls to our backend services per second. Therefore it’s too much overhead.
What if instead, we can simply have our website alert our whole architecture at once? It can yell, “Hey! I made a sale” to our whole stack, and any component that’s interested can take the appropriate action. This means we don’t need to update our frontend as we add additional services, and our new services just need to know what to listen for.
The above is an example of an event-driven architecture, where instead of reaching out to each service one by one, our services instead emit a change of state. This is where Spring Cloud Stream comes into picture.
In this example we will use Apache Kafka as event streaming platform.
Prerequisites
Eclipse 2019-12, Apache Kafka 2.12-2.3.1, Gradle 6.1.1, Maven 3.6.3, Java at least 8, Spring Cloud Starter Stream Kafka 3.0.2, Spring Boot 2.2.4
Create Project
This example consists of two microservices, one to produce events and one to consume them.
The producer project name is spring-cloud-stream-kafka-ordersource and the consumer project name is spring-cloud-stream-kafka-ordercheck.
If you are creating gradle based project in Eclipse then you can use below build.gradle script:
buildscript {
ext {
springBootVersion = '2.2.4.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
plugins {
id 'java-library'
id 'org.springframework.boot' version '2.2.4.RELEASE'
}
sourceCompatibility = 12
targetCompatibility = 12
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter:${springBootVersion}")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.2.RELEASE")
}
If you are creating maven based project then you can use below pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.roytuts</groupId>
<artifactId>spring-cloud-stream-kafka-ordersource or spring-cloud-stream-kafka-ordercheck</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>3.0.2.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>at least 8</source>
<target>at least 8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Remember the above build.gradle or pom.xml file can be used for your both microservices and you just need to adjust the name of the project in your settings.gradle or pom.xml file accordingly.
Microservice – Producer
In our fictional scenario, the message producer will create a stream of orders, and our processor will check if those orders should be delivered or not (undelivered).
Let’s start by producing some messages that will be sent to Kafka.
The below code is written into OrderSourceApp.java file, which is actually producing messages.
Supplier<>
is a Java function data type. Because there is only one @Bean
method that returns this type, Spring Cloud Stream will call this method bean. By default, it will trigger this function once every second and send the result to the default MessageChannel
named supply-out-0 (which is constructed on the basis of method name followed by -out-0). But in our case as we have defined the destination topic using @InboundChannelAdapter(channel = Source.OUTPUT)
, so the messages will be sent to output
channel.
We have put @EnableBinding(Source.class)
to get the output channel binding from Source class which is already available in Spring Cloud Stream.
package com.roytuts.spring.cloud.stream.using.kafka;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import com.roytuts.spring.cloud.stream.using.kafka.model.Order;
@SpringBootApplication
@EnableBinding(Source.class)
public class OrderSourceApp {
private static final Logger LOG = LoggerFactory.getLogger(OrderSourceApp.class);
private List<String> names = Arrays.asList("Donald", "Theresa", "Vladimir", "Angela", "Emmanuel", "Shinzō",
"Jacinda", "Kim");
private List<String> items = Arrays.asList("Mobile", "Tab", "Desktop", "Laptop", "Head Phone", "Adapter", "Charger",
"USB Cable", "Watch", "Clock");
public static void main(String[] args) {
SpringApplication.run(OrderSourceApp.class, args/* "--spring.cloud.stream.function.definition=supply" */);
}
@Bean
@InboundChannelAdapter(channel = Source.OUTPUT)
public Supplier<Order> supply() {
return () -> {
String rName = names.get(new Random().nextInt(names.size()));
String rItem = items.get(new Random().nextInt(items.size()));
Order order = new Order(UUID.randomUUID().toString(), rName, rItem);
LOG.info("{} {} for {} for {}", order.getStatus(), order.getUuid(), order.getItem(), order.getName());
return order;
};
}
}
In the above class we are putting randomly some hard-coded values as a source of data, ideally your data should come from other application or database or some persistence storage.
If we have multiple beans then we can use the property spring.cloud.stream.function.definition
to explicitly declare which function bean we want to bind to the destination.
If you want to use the property spring.cloud.stream.function.definition in application.properties file, then you can put as spring.cloud.stream.function.definition=supply
, where supply is the method name. If you have multiple method beans then you can use pipe (|) separator, spring.cloud.stream.function.definition=supply|foo
.
If you want to use in main method then you can use as follows:
SpringApplication.run(OrderSourceApp.class, "--spring.cloud.stream.function.definition=supply");
For multiple method beans,
SpringApplication.run(OrderSourceApp.class, "--spring.cloud.stream.function.definition=supply|foo");
By default Spring Cloud Stream will send or produce message every second. If you want to use a different poller interval, we can use the spring.integration.poller.fixed-delay
property in the application.properties file or as an argument to the Spring runner main method.
The dependency spring-cloud-starter-stream-kafka
tells Spring that we want to send message to Kafka. Since our Kafka server is listening on localhost on the default port, we don’t need to provide any additional configuration in our application.properties file. But if you need to customize the configurations then you can read here.
The Order is a POJO class and Status is an enum that holds the constant values for order status.
package com.roytuts.spring.cloud.stream.using.kafka.model;
import com.roytuts.spring.cloud.stream.using.kafka.enums.Status;
public class Order {
private String uuid, name, item, status;
public Order() {
this.setStatus(Status.PENDING.name());
}
public Order(String uuid, String name, String item) {
this.uuid = uuid;
this.name = name;
this.item = item;
this.setStatus(Status.PENDING.name());
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getItem() {
return item;
}
public void setItem(String item) {
this.item = item;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
if (status.equals(Status.DELIVERED.name()) || status.equals(Status.UNDELIVERED.name())
|| status.equals(Status.PENDING.name()) || status.equals(Status.REJECTED.name())) {
this.status = status;
} else {
throw new IllegalArgumentException("Cannot set the Order's status to " + status);
}
}
@Override
public String toString() {
return "Order [uuid=" + uuid + ", name=" + name + ", item=" + item + ", status=" + status + "]";
}
}
And Status enum is:
package com.roytuts.spring.cloud.stream.using.kafka.enums;
public enum Status {
DELIVERED, UNDELIVERED, PENDING, REJECTED
}
Microservice – Consumer
We have produce the events in our earlier application, now we need something to consume and process these events. For this, our order checker will observe every order and deliver or undeliver it. If delivered, an delivered message will be sent to the delivered topic otherwise, an undelivered message will be sent to the undelivered topic. You can have other systems down the line could listen for and pick up these messages for further processing.
We will see little different code here, just pointing to different topics. We see that in OrderCheckApp.java, we have the @EnableBinding(OrderProcessor.class)
annotation, meaning that all of our definitions for channel bindings are found in the OrderProcessor class.
In our OrderProcessor.java file, we will see we define the MessageChannel
we’re listening on is named output, matching the topic our producer writes to. Additionally, we define two other MessageChannels
that we’ll be writing to, delivered and undelivered. For each of these, we also define which method to invoke when a message is received on those channels.
package com.roytuts.spring.cloud.stream.kafka.ordercheck.processor;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface OrderProcessor {
String ORDERS_IN = "output";
String DELIVERED_OUT = "delivered";
String UNDELIVERED_OUT = "undelivered";
@Input(ORDERS_IN)
SubscribableChannel sourceOfOrders();
@Output(DELIVERED_OUT)
MessageChannel delivered();
@Output(UNDELIVERED_OUT)
MessageChannel undelivered();
}
Finally, we can see how this ties into which method is invoked if we take a look at the OrderChecker.java file. We will see we have a method checkAndSortOrders()
with the @StreamListener
annotation that matches our Input we defined previously:
package com.roytuts.spring.cloud.stream.kafka.ordercheck.checker;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.roytuts.spring.cloud.stream.kafka.ordercheck.enums.Status;
import com.roytuts.spring.cloud.stream.kafka.ordercheck.model.Order;
import com.roytuts.spring.cloud.stream.kafka.ordercheck.processor.OrderProcessor;
@Component
public class OrderChecker {
public static final Logger LOG = LoggerFactory.getLogger(OrderChecker.class);
private static final List<String> DISCARDED_ITEMS = Arrays.asList("Watch", "Clock");
@Autowired
private OrderProcessor processor;
@StreamListener(OrderProcessor.ORDERS_IN)
public void checkAndSortOrders(Order order) {
LOG.info("{} {} for {} for {}", order.getStatus(), order.getUuid(), order.getItem(), order.getName());
if (DISCARDED_ITEMS.contains(order.getItem())) {
order.setStatus(Status.UNDELIVERED.name());
processor.undelivered().send(message(order));
} else {
order.setStatus(Status.DELIVERED.name());
processor.delivered().send(message(order));
}
LOG.info("{} {} for {} for {}", order.getStatus(), order.getUuid(), order.getItem(), order.getName());
}
private static final <T> Message<T> message(T val) {
return MessageBuilder.withPayload(val).build();
}
}
The main class to deploy our application:
package com.roytuts.spring.cloud.stream.kafka.ordercheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.roytuts.spring.cloud.stream.kafka.ordercheck.processor.OrderProcessor;
@SpringBootApplication
@EnableBinding(OrderProcessor.class)
public class OrderCheckApp {
public static final Logger LOG = LoggerFactory.getLogger(OrderCheckApp.class);
public static void main(String[] args) {
SpringApplication.run(OrderCheckApp.class, args);
LOG.info("The Order Check Application has started...");
}
}
The POJO class Order and enum Status are same as we have created for the earlier application.
We cannot run both application on same port 8080. Therefore we need to change the port for our order check application. So put the below configuration into application.properties file under classpath.
server.port=8081
Spring Cloud Stream provides an extremely powerful abstraction for potentially complicated messaging platforms, turning the act of producing messages into just a couple lines of code.
Testing the Application
Make sure your Apache Kafka server is running.
Now run both microservices.
The order source application produces the below messages every second:
18.106 INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING abd3c9c3-653e-4948-b477-42d8b3608281 for Mobile for Emmanuel
19.338 INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING b7e55aa9-6092-484c-8fd0-c805dc20072c for Head Phone for Kim
20.341 INFO 15768 --- [ask-scheduler-2] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING e272945a-c24f-43d6-85f8-268809e55c11 for Laptop for Angela
21.344 INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 20b86bad-908a-4ea6-a5fc-9d51b5dad3f6 for Tab for Angela
22.347 INFO 15768 --- [ask-scheduler-3] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 1db088dd-f175-4503-8cab-8bf5a2b54386 for USB Cable for Kim
23.349 INFO 15768 --- [ask-scheduler-2] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 701af728-630a-4208-9e8c-2274737b196f for Laptop for Emmanuel
24.351 INFO 15768 --- [ask-scheduler-4] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 3b4aeb05-a5ec-49bb-8429-9726d0a9bbb4 for Adapter for Kim
25.353 INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING ef15ca4a-9105-482a-9acf-c0ce9c5f5617 for Charger for Angela
26.355 INFO 15768 --- [ask-scheduler-5] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 5dff9d7f-536b-4215-bd08-4787dc4eab63 for Clock for Vladimir
27.357 INFO 15768 --- [ask-scheduler-3] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 797dea84-cbf9-4924-b35d-6584ffff805a for Laptop for Angela
28.360 INFO 15768 --- [ask-scheduler-6] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING ea2bc331-b390-46e9-a67a-43ebc4c41625 for USB Cable for Emmanuel
29.362 INFO 15768 --- [ask-scheduler-6] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 9fd5b7fc-69f9-4dd2-b670-4eacd70a3629 for Head Phone for Emmanuel
30.364 INFO 15768 --- [ask-scheduler-7] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 0484dd0e-f842-485d-bc6e-199de7ddef92 for Watch for Kim
31.366 INFO 15768 --- [ask-scheduler-4] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 9798edf4-da37-44e6-aca9-77c10f41939b for Clock for Angela
32.369 INFO 15768 --- [ask-scheduler-8] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING d47b878e-2c4c-4dea-91c4-3e7e98946c4b for Laptop for Kim
33.372 INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 09960737-86a5-4c63-b4ce-4b0bcf0a6c0f for Head Phone for Vladimir
34.375 INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp : PENDING 3cc5261b-50e3-447e-a528-526eb8608fcc for Charger for Emmanuel
The order check application consumes and processes these messages:
18.584 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING abd3c9c3-653e-4948-b477-42d8b3608281 for Mobile for Emmanuel
18.676 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED abd3c9c3-653e-4948-b477-42d8b3608281 for Mobile for Emmanuel
19.347 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING b7e55aa9-6092-484c-8fd0-c805dc20072c for Head Phone for Kim
19.348 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED b7e55aa9-6092-484c-8fd0-c805dc20072c for Head Phone for Kim
20.348 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING e272945a-c24f-43d6-85f8-268809e55c11 for Laptop for Angela
20.350 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED e272945a-c24f-43d6-85f8-268809e55c11 for Laptop for Angela
21.350 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 20b86bad-908a-4ea6-a5fc-9d51b5dad3f6 for Tab for Angela
21.352 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED 20b86bad-908a-4ea6-a5fc-9d51b5dad3f6 for Tab for Angela
22.353 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 1db088dd-f175-4503-8cab-8bf5a2b54386 for USB Cable for Kim
22.354 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED 1db088dd-f175-4503-8cab-8bf5a2b54386 for USB Cable for Kim
23.354 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 701af728-630a-4208-9e8c-2274737b196f for Laptop for Emmanuel
23.355 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED 701af728-630a-4208-9e8c-2274737b196f for Laptop for Emmanuel
24.357 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 3b4aeb05-a5ec-49bb-8429-9726d0a9bbb4 for Adapter for Kim
24.358 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED 3b4aeb05-a5ec-49bb-8429-9726d0a9bbb4 for Adapter for Kim
25.360 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING ef15ca4a-9105-482a-9acf-c0ce9c5f5617 for Charger for Angela
25.361 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED ef15ca4a-9105-482a-9acf-c0ce9c5f5617 for Charger for Angela
26.361 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 5dff9d7f-536b-4215-bd08-4787dc4eab63 for Clock for Vladimir
26.377 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : UNDELIVERED 5dff9d7f-536b-4215-bd08-4787dc4eab63 for Clock for Vladimir
27.363 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 797dea84-cbf9-4924-b35d-6584ffff805a for Laptop for Angela
27.363 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED 797dea84-cbf9-4924-b35d-6584ffff805a for Laptop for Angela
28.365 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING ea2bc331-b390-46e9-a67a-43ebc4c41625 for USB Cable for Emmanuel
28.366 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED ea2bc331-b390-46e9-a67a-43ebc4c41625 for USB Cable for Emmanuel
29.367 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 9fd5b7fc-69f9-4dd2-b670-4eacd70a3629 for Head Phone for Emmanuel
29.368 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : DELIVERED 9fd5b7fc-69f9-4dd2-b670-4eacd70a3629 for Head Phone for Emmanuel
30.369 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : PENDING 0484dd0e-f842-485d-bc6e-199de7ddef92 for Watch for Kim
30.370 INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker : UNDELIVERED 0484dd0e-f842-485d-bc6e-199de7ddef92 for Watch for Kim
That’s all.
Source Code
Thanks for reading.