Run streaming jobs with Kafka
In this article, we will discuss what Apache Kafka is and its use cases. We will also build a demo example of a Kafka Consumer using Python and Confluent Cloud.
Apache Kafka is an open source streaming platform. Even though its code base was written in Java, some APIs allow it to use Python, Scala, etc. You can think of Kafka as an instrument processing the incoming commit logs in real-time and storing them in sequence. The users of these logs can access and use them per their requirements.
In this article, we will discuss the following points:
- Definition of basic terms of Kafka application
- Kafka use cases
- Example of Kafka Consumer in Python
Before working with Kafka, we need to define several key terms:
It is a single entity of your data and also a named container for events. Your application can contain a bunch of different topics. We can also imagine the topics as logs of events that are durable, immutable, and can only be appended. We can also set up a log retention period to preserve the storage space over time.
Messages are the content of topics. They are represented as (key, value) pairs. Inside a topic, the keys should be unique and are often of primitive types. They can be used as identifiers of some entity in your app (user, device, transaction, etc.). Values can be represented by any type of object and can be duplicated. The messages inside a topic can be partitioned among multiple instances.
Partitioning is a separation of logs in a single topic across multiple instances of the Kafka cluster. The partitioning logic is defined depending on the key of the message. If the key is empty, the partitions are filled evenly. Otherwise, the keys are run through the hash function, and the number of attributed partitions is the hash value. The messages having the same key land in the same partitions.
Brokers are the actual machines and instances running the Kafka process. Each broker can host a set of partitions and handle requests to write and read events to these partitions. Brokers can also handle the replication of partitions between each other.
Applications interacting with Kafka can operate in two modes: producer or consumer. Producers write messages on Kafka topics. When writing the message, they provide the broker’s address, security config, and the content of the message.
Consumers represent the client-side applications. They read the messages from Kafka topics. Consumers manage connection pooling, and the network protocol just like producers do. It is also normal for Kafka that many consumer instances read from one topic because after reading a message it is not destroyed, and it is there to be read by any other consumer interested in it.
Kafka use cases
Kafka processes can be applied to Data Science projects, mostly when we need to process data streams and make predictions in real-time.
Let’s take, for example, real-time recommendations in e-commerce. We can train a model offline of whether the user likes our recommendations. The event when a user loads the page can be written with Kafka Producer into a topic of our system. Then to extract and enrich the feature and make a prediction, we can read the message from Kafka Consumer.
We can also use Kafka brokers to message across different services. The system can be implemented as a microservice architecture where some microservices could be producers while the others consumers. For example, one can create new user accounts on the topic, and another can consume the information about the accounts and send emails to users.
Example of a Kafka consumer in Python
Here we will demonstrate a small example of how to produce and consume messages. We will set up a cluster in Confluent Cloud and create a Kafka topic. Then we will write a consumer in Python using the
confluent_kafka library and run it to observe the output messages.
First, we need to create an account on Confluent Cloud. Then we set up the Kafka cluster. Here we can choose the service on which it runs (AWS, Google Cloud, or Azure) and choose the region and availability zones. We name our cluster,
When the cluster is up and running, we can go to the topics tab and create the Kafka topic. We will name it as
Now we need to install the Confluent CLI and connect to our cluster by login into the Confluent Cloud account. Here’s the command:
brew install --cask confluent-cli
After the connection, we can check our environment ID and select it for usage.
confluent environment list
confluent environment use <env-ID>
As we have already created the Kafka cluster, we should also be able to list it and select it for usage.
confluent kafka cluster list
confluent kafka cluster use <cluster-ID>
To communicate with the cluster, we need to define the API Key and Secret Key.
confluent api-key create --resource <cluster-ID>
confluent api-key use <API-Key> --resource <cluster-ID>
Now when the communication with the cluster has been set up, we should be able to see the created topic
confluent kafka topic list
Now we should define the consumer in Python. First, we create the config file that defines the server on which the Kafka cluster is running, security protocol, and credentials.
To be able to see the bootstrap server address run the following command and copy the content of the field
Endpoint SASL_SSL .
confluent kafka cluster describe
Then we define the
consumer.py where we parse the configuration file, create a
Consumer instance, and then pull the existing messages from the Kafka topic and print their content.
We make the
Consumer script executable and run it.
chmod u+x consumer.py
After running the
Consumer, we need to populate the topic with the data. To do that, we ran the
Producer in the second terminal and write the following messages. Here the index is a key, and the string to the right of
: is a value of the message.
confluent kafka topic produce topic_0 --parse-key1:message 1
When we switch back to the first terminal, we should see that the messages are being consumed and printed.
The messages from Kafka’s topic are successfully consumed. We can also visualize these messages using Confluent Cloud UI. For example, message 7 is available with an offset 0, partition 0.
In this article, we saw the basic functions and terms of Kafka, set up a Confluent Cloud cluster, and built an example of a Kafka Consumer.
In the upcoming article, we will introduce the Kafka process into a Machine Learning pipeline.
I appreciate your feedback in the comment section! Please stay tuned.