In this step-by-step guide, we will walk you through the process of how to setup a three-node Kafka KRaft cluster for scalable data streaming. Building a robust and fault-tolerant Kafka cluster is crucial for reliable and scalable data streaming. KRaft, is a new consensus algorithm introduced in Kafka v2.8.0 and is production ready for new clusters as of Apache Kafka 3.3. It removed the need to run Kafka with Zookeeper.
Table of Contents
Setting up Three-Node Kafka KRaft Cluster
So, how can one setup a three-node Kafka KRaft cluster? Follow through!
High Level: What is KRaft and How does it work?
As already mentioned, Kafka Raft (KRaft) is a consensus algorithm that has been adopted by Kafka, over Zookeeper. It is a distributed algorithm that allows a group of Kafka broker controllers to agree on the state of a replicated event data.
Read more on the KRaft overview page.
Prerequisites for Setting up Kafka Cluster
There are a few things that you need to consider while setting up Kafka cluster;
- Ensure you assign each node enough disk space depending on the volume of the data you are expecting to store based on your data retention policies
- Ensure you have a reliable network connection between your cluster nodes
- Ensure that the CPU and RAM assigned to your cluster brokers can handle the load related to the data streaming.
- Ensure you have an odd number of nodes in the cluster to avoid the split-brain scenario.
Install and Setup Kafka with KRaft on three Nodes
To install and setup Kafka, follow our guide below;
Easy Steps: Install Apache Kafka on Debian
You need to install and setup Kafka on three Nodes.
Setting up Three-Node Kafka KRaft Cluster
Assuming you have installed and Kafka with KRaft is running on three nodes, proceed to configure three-node KRaft Kafka cluster.
In our setup, we have three nodes;
# | Node Hostname | Node IP address | Node Role |
1 | knode1.kifarunix-demo.com | 192.168.57.32 | controller/broker |
2 | knode2.kifarunix-demo.com | 192.168.57.33 | controller/broker |
3 | knode3.kifarunix-demo.com | 192.168.57.35 | controller/broker |
Define the role of Each Node in the cluster
In Kafka KRaft cluster, a node can either be a controller, a broker or can perform both roles.
- A
controller node
coordinates the Kafka cluster and manages tracking of the event metadata. It also monitors the health and status of brokers, partitions, and replicas, leader election, partition reassignment, and handling broker failures. - A
broker node
acts as a data plane. It hosts and manages Kafka topics and partitions. It is responsible for storing and serving the messages published to Kafka topics. Brokers handle the actual data replication, storage, and retrieval in a distributed manner. Each broker in the cluster may have multiple partitions of different topics assigned to it.
In our setup, we will configure our cluster nodes to function both as controller and broker. You may want to separate them!
Thus, open the KRaft server.properties configuration file and navigate to Server Basics section on each node.
vim /opt/kafka/config/kraft/server.properties
By default, a node is assigned both roles;
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
Set the Node ID for Each Node in the cluster
To uniquely identify each other, each Node in the cluster must have a unique ID.
Node 1;
# The node id associated with this instance's roles
node.id=1
Node 2;
# The node id associated with this instance's roles
node.id=2
Node 3;
# The node id associated with this instance's roles
node.id=3
Specify a list of Controller Nodes in the Cluster
Next, you need to tell Kafka which nodes to use as controllers. This can be done by updating the value of the controller.quorum.voters
parameter.
The controller is defined as ID@ADDRESS:PORT. If you have multiple controllers, define them in comma separate. The address could be resolvable hostname or IP address.
By default, Kafka expects to run as a single node cluster hence, the setting, controller.quorum.voters=1@localhost:9093.
Update this setting with the list of your nodes (Do this on all the nodes);
[email protected]:9093,[email protected]:9093,[email protected]:9093
Ensure the port used is not used by any other application/service already.
Set the Name of the Brokers and Controllers Listener
Under the Socket Server Settings, you need to define the name of listener used for communication between brokers and used by the controllers. This is set to PLAINTEXT and CONTROLLER (respectively) by default;
inter.broker.listener.name=PLAINTEXT
...
controller.listener.names=CONTROLLER
We will leave it with the default names! If you want, you can update it. These names will be used in other config settings.
Set the Socket Server Address
Next, you need to define address the socket server listens on.
By default, it is set to listen on all interfaces on port 9092/tcp (Broker listener) and port 9093/tcp (controller listener), listeners=PLAINTEXT://:9092,CONTROLLER://:9093
We will update this to set specific interface;
Node 1;
#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=PLAINTEXT://192.168.57.32:9092,CONTROLLER://192.168.57.32:9093
Node 2;
#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=PLAINTEXT://192.168.57.33:9092,CONTROLLER://192.168.57.33:9093
Node 3;
#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=PLAINTEXT://192.168.57.35:9092,CONTROLLER://192.168.57.35:9093
Update the Broker Advertised Listener Address
If you didn’t do this already, you need to update the broker listener address that is advised to the clients.
By default set to localhost.
Node 1
#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.57.32:9092
Node 2;
#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.57.33:9092
Node 3
#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.57.35:9092
Define the Number of Log Partitions per Topic
More partitions allow greater parallelism for consumption, but this will also result in more files across the brokers.
The default is set to 1. Ensure that you use a number that is at least divisible by the number of nodes in the cluster. Let’s use 6 in our case.
#num.partitions=1
num.partitions=6
In the very basic setup, those are just the only configs we can make. Save and exit the file.
Open Controller/Broker Ports on Firewall
Ensure that these ports, 9093/tcp ( between controller nodes) and 9092/tcp (between brokers and clients) are opened on firewall.
Are you Upgrading your Already Existing Kafka Cluster
If you are upgrading an already running Kafka cluster, the current state of the quorum of brokers is stored in the file called quorum-state. The quorum is a group of brokers that are responsible for managing the metadata for a Kafka cluster. The quorum-state file is used to ensure that all of the brokers in the quorum have the same view of the metadata.
This file is usually located in the logs directory;
find /opt/kafka -iname "*quorum-state*"
Sample outpu;
/opt/kafka/logs/__cluster_metadata-0/quorum-state
As you can see the sample contents of this file;
{"clusterId":"","leaderId":1,"leaderEpoch":12,"votedId":-1,"appliedOffset":0,"currentVoters":[{"voterId":1}],"data_version":0}
The contents of the quorum-state tile you provided are as follows:
- clusterId: The ID of the Kafka cluster.
- leaderId: The ID of the broker that is currently the leader of the cluster.
- leaderEpoch: The epoch of the leader. This is a number that is incremented whenever the leader changes.
- votedId: The ID of the broker that cast the most recent vote for leader.
- appliedOffset: The offset of the last message that has been applied to the metadata log.
- currentVoters: A list of the brokers that have voted for the current leader.
- data_version: The version of the quorum-state file.
If you restart a Kafka service for a cluster that you just upgraded, you may get the error;
ERROR [SharedServer id=1] Got exception while starting SharedServer (kafka.server.SharedServer)
kafka-server-start.sh[12628]: java.lang.IllegalStateException: Configured voter set: [1, 2, 3] is different from the voter set read from the state file: [1]. Check if the quorum configuration is up to date, or wipe out the local state file if necessary
So, there are two ways to fix this;
1. Manually update the current voters ID in the quorum-state file;
vim /opt/kafka/logs/__cluster_metadata-0/quorum-state
Change this (in my case);
{"clusterId":"","leaderId":1,"leaderEpoch":12,"votedId":-1,"appliedOffset":0,"currentVoters":[{"voterId":1}],"data_version":0}
to;
{"clusterId":"","leaderId":1,"leaderEpoch":12,"votedId":-1,"appliedOffset":0,"currentVoters":[{"voterId":1},{"voterId":2},{"voterId":3}],"data_version":0}
(If you cloned the other nodes to make cluster, update the same on them)
2. Delete the Quorum-state file
You can remove the file, entirely;
rm -rf /opt/kafka/logs/__cluster_metadata-0/quorum-state
Update the Cluster ID
When you setup KRaft Kafka, there is a step that you had to format Kafka logs directory to KRaft format. In the process, a random cluster ID (cluster.id) is generated. This information, is stored in the meta.properties
in the logs directory. In order to avoid unexpected error due to INCONSISTENT_CLUSTER_ID in VOTE response, you need to edit the meta.properties file and change the ID to be same across all nodes.
cat /opt/kafka/logs/meta.properties
#
#Sun Jul 16 15:19:36 EDT 2023
node.id=1
version=1
cluster.id=13OOWyQeQ_etLS811azqQQ
Thus, ensure this cluster.id value is same across the all the nodes.
You can alway print the cluster ID using the command below;
/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server BROKER-IP:9092
Restart Kafka Service
Restart Kafka service to apply the changes.
systemctl restart kafka
Check the service;
systemctl status kafka
● kafka.service - Apache Kafka
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; preset: enabled)
Active: active (running) since Tue 2023-07-18 16:50:37 EDT; 50s ago
Main PID: 16581 (java)
Tasks: 90 (limit: 4642)
Memory: 763.7M
CPU: 11.661s
CGroup: /system.slice/kafka.service
└─16581 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:>
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,674] INFO Awaiting socket connections on 192.168.57.33:9092. (kafka.network.DataPlaneAcceptor)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Finished waiting for all of the authorizer futures to be completed (kafka.server.Broker>
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.Brok>
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Transition from STARTING to STARTED (kafka.server.BrokerServer)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO Kafka version: 3.5.0 (org.apache.kafka.common.utils.AppInfoParser)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO Kafka commitId: c97b88d5db4de28d (org.apache.kafka.common.utils.AppInfoParser)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO Kafka startTimeMs: 1689713442675 (org.apache.kafka.common.utils.AppInfoParser)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,676] INFO [KafkaRaftServer nodeId=2] Kafka Server started (kafka.server.KafkaRaftServer)
Check the Kafka KRaft cluster ports;
ss -altnp | grep :90
LISTEN 0 50 [::ffff:192.168.57.35]:9092 *:* users:(("java",pid=41651,fd=151))
LISTEN 0 50 [::ffff:192.168.57.35]:9093 *:* users:(("java",pid=41651,fd=132))
If any issue, check the logs for tips.
Configure Producers to Ship Logs to Kafka Cluster
It is now time to test your Kafka cluster.
In our previous tutorial, we configure Filebeat to collect system logs and ship them to Kafka.
Complete Guide: Configuring Filebeat to Send Logs to Kafka
You might have a different type of producer setup but it doesn’t matter, your Kafka cluster should be able to receive logs. We will use the above guide to confirm the same in this guide.
So, as you can see in the guide above, we have configure Filebeat to ship logs to filebeat topic on node1 in the Kafka cluster.
Thus, login to the node1 and check the topics;
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.57.32:9092 --list
__consumer_offsets
filebeat
filebeat-ssl
kafka-topic-test
You can consume this topic to confirm if it is receiving logs;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.32:9092 --topic filebeat
Run the same consume script on other nodes;
Node2;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.33:9092 --topic filebeat
Node 3;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.35:9092 --topic filebeat
You should be able to receive similar events.
Kafka Cluster Management Scripts
You can find a whole lot of Kafka scripts under /opt/kafka/bin/
(as per our guide).
ls -1 /opt/kafka/bin/ | grep
connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
kafka-dump-log.sh
kafka-e2e-latency.sh
kafka-features.sh
kafka-get-offsets.sh
kafka-jmx.sh
kafka-leader-election.sh
kafka-log-dirs.sh
kafka-metadata-quorum.sh
kafka-metadata-shell.sh
kafka-mirror-maker.sh
kafka-producer-perf-test.sh
kafka-reassign-partitions.sh
kafka-replica-verification.sh
kafka-run-class.sh
kafka-server-start.sh
kafka-server-stop.sh
kafka-storage.sh
kafka-streams-application-reset.sh
kafka-topics.sh
kafka-transactions.sh
kafka-verifiable-consumer.sh
kafka-verifiable-producer.sh
trogdor.sh
windows
zookeeper-security-migration.sh
zookeeper-server-start.sh
zookeeper-server-stop.sh
zookeeper-shell.sh
To check how to use each script, consult the respective script help page.
For example;
/opt/kafka/bin/kafka-metadata-quorum.sh --help
usage: kafka-metadata-quorum [-h] --bootstrap-server BOOTSTRAP_SERVER [--command-config COMMAND_CONFIG] {describe} ...
This tool describes kraft metadata quorum status.
positional arguments:
{describe}
describe Describe the metadata quorum info
optional arguments:
-h, --help show this help message and exit
--bootstrap-server BOOTSTRAP_SERVER
A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.
--command-config COMMAND_CONFIG
Property file containing configs to be passed to Admin Client.
Manage Kafka Cluster from UI
You can manager your cluster using Kadeck;
Install Kadeck Apache Kafka UI Tool on Debian/Ubuntu
Setup Kafka Kraft Cluster with SSL/TLS
The guide below should be helpful in setting up Kafka Kraft cluster with SSL/TLS;
Quick Guide: Configure Apache Kafka SSL/TLS Encryption for Enhanced Security