Introduction

In this tutorial I am going to show you how to use Python client to work with Apache kafka distributed stream processing system. I will create Kafka producer and consumer examples using Python language.

KafkaConsumer is a high-level message consumer that consumes records from a kafka cluster. The consumer is not thread safe and should not be shared across threads.

KafkaProducer is a high-level, asynchronous message producer that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

Prerequisites

Python 3.8.3, Apache Kafka 2.12_2.5.0, kafka-python (pip install kafka-python)

Install and configure Apache Kafka

Produce and Consume Messages

Now we will see how to produce and consumer messages using kafka producer and consumer, respectively.

The following code snippets (consumer.py) shows how a consumer consumes messages from a topic foobar and prints on console:

from kafka import KafkaConsumer

consumer = KafkaConsumer('foobar')

for msg in consumer:
	print (msg)

The following code snippets (producer.py) shows how a producer produces messages to the topic foobar:

from kafka import KafkaProducer

producer = KafkaProducer()

for i in range(5):
	producer.send('foobar', b'Hello World')
	producer.flush()

In the above code snippets we have not included kafka server details because, by default, kafka server’s default host and port (localhost/9092) are used.

Let’s test our producer and consumer code snippets. First run the consumer.py followed by producer.py script.

Make sure you run the zookeeper server followed by kafka server by executing the following commands, respectively:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties

You will see the following output in the console of consumer.py:

The cursor will blink after all messages are consumed by consumer.

You can also specify the host and port of the kafka server as shown in the below example:

producer = KafkaProducer(bootstrap_servers='localhost:9092')

You can also specify timeout for sending messages for producer:

producer.send('foobar', b'Hello World').get(timeout=30)

Need more help, please read here.

Source Code

Download

Thanks for reading.

Tags:

Leave a Reply

Your email address will not be published. Required fields are marked *