Building Real-Time Data Streaming Applications with Kafka and kafka-python: A Quickstart Guide
In this article, we’ll be using kafka-python to interact with Kafka server in python. We’ll be streaming data using Kafka in steps.
Essential Prerequisites and Key Terminologies for Kafka Beginners
- Event: In the context of event streaming, an event is a type of data which describes the entity’s observable state updates over time. (example: The GPS coordinates of a moving car)
- Kafka cluster : A kafka cluster is a combination of many kafka brokers.
- Kafka broker: You may think of a Kafka broker as a dedicated server to receive, store, process, and distribute events
- Zookeeper: Brokers are synchronized and managed by another dedicated server called ZooKeeper.
- Kafka topic: Each broker contains one or many topics. You can think of a topic as a database to store specific types of events, such as logs, transactions, and metrics.
- Kafka consumer: Consumers are client applications that can subscribe to topics and read the stored events. Then event destinations can further read events from Kafka consumers.
- Kafka producer: They are client applications that publish events to topic partitions according to the same order as they are published.
- Event source: the first source of data that maybe used by producer to publish (example: Twitter API for tweets in JSON)
- Event destination: destination of data where data goes after being consumed ( example: sending to database for further analytics and dashboards)
WORKFLOW
Lets start with the steps to implement above workflow
STEP 1:
Download kafka and extract it locally using below commands.(Note: its recommended to rename the extracted directory to kafka and move it to C:/<here> )
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar -xzf kafka_2.12-2.8.0.tgz
STEP 2:
Set the environment variable ‘PATH’ and append the location of kafka files
C:\kafka\bin\windows;C:\kafka\config #in case of Windows
C:\kafka\bin;C:\kafka\config #in case of Linux
STEP 3:
Install kafka library a python library which we’ll use to create topics , producers and consumers
pip install kafka-python
STEP 4:
Lets start zookeeper and kafka server
#importing required libraries
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType
import json,os
#set current directory to kafka
os.chdir('C:/kafka/')
os.getcwdb()
#This will start the zookeeper first to manage the kafka broker started just after
#if Windows
!start .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
!start .\bin\windows\kafka-server-start.bat .\config\server.properties
#if Linux
!.\bin\zookeeper-server-start.sh .\config\zookeeper.properties
!.\bin\kafka-server-start.sh .\config\server.properties
STEP 5:
Lets understand how a topic is created.
Firstly we’ll have to create a python client that connects to the broker allowing us to create topics. The main purpose of KafkaAdminClient class is to enable fundamental administrative management operations on kafka server such as creating/deleting topic, retrieving, and updating topic configurations and so on.
#To use KafkaAdminClient, we first need to define and create a KafkaAdminClient object:
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')
#creating a topic named "example"
topic=[]
new_topic = NewTopic(name="example", num_partitions= 2, replication_factor=1)
topic_list.append(new_topic)
admin_client.create_topics(new_topics=topic_list)
Lets find details of the topic just created
#This provides a description of topic "example"
configs = admin_client.describe_configs(
config_resources=[ConfigResource(ConfigResourceType.TOPIC, "example")])
config #run this to check the details
STEP 6:
This shows how kafka producer produces or publishes(writes) messages or events to topic ‘example’ and how kafka consumer consumes or subscribes(reads) to topic ‘example’ to read the stored events :
#create a producer and publish some messages to topic "example"
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send("example", {'atmid':1, 'transid':100})
producer.send("example", {'atmid':2, 'transid':101})
producer.flush()
producer.close()
#create a consumer to subscribe to the message published
consumer = KafkaConsumer('example',
group_id=None,
bootstrap_servers=['localhost:9092'],
auto_offset_reset = 'earliest')
print("Hello")
print(consumer)
#This will show all the messages that were published by producer above
# and continue to listen forever so you will have to interrupt the execution to stop
for msg in consumer:
print(msg.value.decode("utf-8"))
Incase need help , refer to official documentation and stories