Building Real-Time Data Streaming Applications with Kafka and kafka-python: A Quickstart Guide

Syed Muhammad Shayan
4 min readJun 18, 2023

--

In this article, we’ll be using kafka-python to interact with Kafka server in python. We’ll be streaming data using Kafka in steps.

Data streams

Essential Prerequisites and Key Terminologies for Kafka Beginners

  • Event: In the context of event streaming, an event is a type of data which describes the entity’s observable state updates over time. (example: The GPS coordinates of a moving car)
  • Kafka cluster : A kafka cluster is a combination of many kafka brokers.
  • Kafka broker: You may think of a Kafka broker as a dedicated server to receive, store, process, and distribute events
  • Zookeeper: Brokers are synchronized and managed by another dedicated server called ZooKeeper.
  • Kafka topic: Each broker contains one or many topics. You can think of a topic as a database to store specific types of events, such as logs, transactions, and metrics.
  • Kafka consumer: Consumers are client applications that can subscribe to topics and read the stored events. Then event destinations can further read events from Kafka consumers.
  • Kafka producer: They are client applications that publish events to topic partitions according to the same order as they are published.
  • Event source: the first source of data that maybe used by producer to publish (example: Twitter API for tweets in JSON)
  • Event destination: destination of data where data goes after being consumed ( example: sending to database for further analytics and dashboards)

WORKFLOW

A picture that describes the streaming workflow of kafka
The numbers indicate the order

Lets start with the steps to implement above workflow

STEP 1:

Download kafka and extract it locally using below commands.(Note: its recommended to rename the extracted directory to kafka and move it to C:/<here> )

wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar -xzf kafka_2.12-2.8.0.tgz

STEP 2:

Set the environment variable ‘PATH’ and append the location of kafka files

C:\kafka\bin\windows;C:\kafka\config  #in case of Windows
C:\kafka\bin;C:\kafka\config #in case of Linux

STEP 3:

Install kafka library a python library which we’ll use to create topics , producers and consumers

pip install kafka-python

STEP 4:

Lets start zookeeper and kafka server

#importing required libraries
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType
import json,os

#set current directory to kafka
os.chdir('C:/kafka/')
os.getcwdb()

#This will start the zookeeper first to manage the kafka broker started just after

#if Windows
!start .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
!start .\bin\windows\kafka-server-start.bat .\config\server.properties

#if Linux
!.\bin\zookeeper-server-start.sh .\config\zookeeper.properties
!.\bin\kafka-server-start.sh .\config\server.properties

STEP 5:

Lets understand how a topic is created.

Firstly we’ll have to create a python client that connects to the broker allowing us to create topics. The main purpose of KafkaAdminClient class is to enable fundamental administrative management operations on kafka server such as creating/deleting topic, retrieving, and updating topic configurations and so on.


#To use KafkaAdminClient, we first need to define and create a KafkaAdminClient object:

admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')

#creating a topic named "example"
topic=[]
new_topic = NewTopic(name="example", num_partitions= 2, replication_factor=1)
topic_list.append(new_topic)
admin_client.create_topics(new_topics=topic_list)
Output that confirms the creation of topic. Output >> CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic=’example’, error_code=0, error_message=None)])
The output ensures the creation of topic “example” with no errors

Lets find details of the topic just created

#This provides a description of topic "example"
configs = admin_client.describe_configs(
config_resources=[ConfigResource(ConfigResourceType.TOPIC, "example")])

config #run this to check the details
Code that accesses a multi-list ‘configs’ to get the topic name
Accessing a multi-list ‘configs’ to get the topic name

STEP 6:

This shows how kafka producer produces or publishes(writes) messages or events to topic ‘example’ and how kafka consumer consumes or subscribes(reads) to topic ‘example’ to read the stored events :

#create a producer and publish some messages to topic "example"
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send("example", {'atmid':1, 'transid':100})
producer.send("example", {'atmid':2, 'transid':101})

producer.flush()

producer.close()


#create a consumer to subscribe to the message published

consumer = KafkaConsumer('example',
group_id=None,
bootstrap_servers=['localhost:9092'],
auto_offset_reset = 'earliest')
print("Hello")
print(consumer)

#This will show all the messages that were published by producer above
# and continue to listen forever so you will have to interrupt the execution to stop
for msg in consumer:
print(msg.value.decode("utf-8"))
code for kafka consumer which subscribes to topic ‘example’ to read stored events
See how it keeps on listening

--

--

Syed Muhammad Shayan
Syed Muhammad Shayan

Written by Syed Muhammad Shayan

3Es — Exploration , Experience and Expertise.

No responses yet