Setup a Three-Node Kafka KRaft Cluster for Scalable Data Streaming

|
Last Updated:
|
|
Setup a Three-Node Kafka KRaft Cluster

In this step-by-step guide, we will walk you through the process of how to setup a three-node Kafka KRaft cluster for scalable data streaming. Building a robust and fault-tolerant Kafka cluster is crucial for reliable and scalable data streaming. KRaft, is a new consensus algorithm introduced in Kafka v2.8.0 and is production ready for new clusters as of Apache Kafka 3.3. It removed the need to run Kafka with Zookeeper.

Setting up Three-Node Kafka KRaft Cluster

So, how can one setup a three-node Kafka KRaft cluster? Follow through!

High Level: What is KRaft and How does it work?

As already mentioned, Kafka Raft (KRaft) is a consensus algorithm that has been adopted by Kafka, over Zookeeper. It is a distributed algorithm that allows a group of Kafka broker controllers to agree on the state of a replicated event data.

Read more on the KRaft overview page.

Prerequisites for Setting up Kafka Cluster

There are a few things that you need to consider while setting up Kafka cluster;

  • Ensure you assign each node enough disk space depending on the volume of the data you are expecting to store based on your data retention policies
  • Ensure you have a reliable network connection between your cluster nodes
  • Ensure that the CPU and RAM assigned to your cluster brokers can handle the load related to the data streaming.
  • Ensure you have an odd number of nodes in the cluster to avoid the split-brain scenario.

Install and Setup Kafka with KRaft on three Nodes

To install and setup Kafka, follow our guide below;

Easy Steps: Install Apache Kafka on Debian

You need to install and setup Kafka on three Nodes.

Setting up Three-Node Kafka KRaft Cluster

Assuming you have installed and Kafka with KRaft is running on three nodes, proceed to configure three-node KRaft Kafka cluster.

In our setup, we have three nodes;

#Node HostnameNode IP addressNode Role
1knode1.kifarunix-demo.com192.168.57.32controller/broker
2knode2.kifarunix-demo.com192.168.57.33controller/broker
3knode3.kifarunix-demo.com192.168.57.35controller/broker
Kafka KRaft Cluster Nodes

Define the role of Each Node in the cluster

In Kafka KRaft cluster, a node can either be a controller, a broker or can perform both roles.

  • A controller node coordinates the Kafka cluster and manages tracking of the event metadata. It also monitors the health and status of brokers, partitions, and replicas, leader election, partition reassignment, and handling broker failures.
  • A broker node acts as a data plane. It hosts and manages Kafka topics and partitions. It is responsible for storing and serving the messages published to Kafka topics. Brokers handle the actual data replication, storage, and retrieval in a distributed manner. Each broker in the cluster may have multiple partitions of different topics assigned to it.

In our setup, we will configure our cluster nodes to function both as controller and broker. You may want to separate them!

Thus, open the KRaft server.properties configuration file and navigate to Server Basics section on each node.

vim /opt/kafka/config/kraft/server.properties

By default, a node is assigned both roles;

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

Set the Node ID for Each Node in the cluster

To uniquely identify each other, each Node in the cluster must have a unique ID.

Node 1;

# The node id associated with this instance's roles
node.id=1

Node 2;

# The node id associated with this instance's roles
node.id=2

Node 3;

# The node id associated with this instance's roles
node.id=3

Specify a list of Controller Nodes in the Cluster

Next, you need to tell Kafka which nodes to use as controllers. This can be done by updating the value of the controller.quorum.voters parameter.

The controller is defined as ID@ADDRESS:PORT. If you have multiple controllers, define them in comma separate. The address could be resolvable hostname or IP address.

By default, Kafka expects to run as a single node cluster hence, the setting, controller.quorum.voters=1@localhost:9093.

Update this setting with the list of your nodes (Do this on all the nodes);

[email protected]:9093,[email protected]:9093,[email protected]:9093

Ensure the port used is not used by any other application/service already.

Set the Name of the Brokers and Controllers Listener

Under the Socket Server Settings, you need to define the name of listener used for communication between brokers and used by the controllers. This is set to PLAINTEXT and CONTROLLER (respectively) by default;

inter.broker.listener.name=PLAINTEXT
...
controller.listener.names=CONTROLLER

We will leave it with the default names! If you want, you can update it. These names will be used in other config settings.

Set the Socket Server Address

Next, you need to define address the socket server listens on.

By default, it is set to listen on all interfaces on port 9092/tcp (Broker listener) and port 9093/tcp (controller listener), listeners=PLAINTEXT://:9092,CONTROLLER://:9093

We will update this to set specific interface;

Node 1;

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=PLAINTEXT://192.168.57.32:9092,CONTROLLER://192.168.57.32:9093

Node 2;

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=PLAINTEXT://192.168.57.33:9092,CONTROLLER://192.168.57.33:9093

Node 3;

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=PLAINTEXT://192.168.57.35:9092,CONTROLLER://192.168.57.35:9093

Update the Broker Advertised Listener Address

If you didn’t do this already, you need to update the broker listener address that is advised to the clients.

By default set to localhost.

Node 1

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.57.32:9092

Node 2;

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.57.33:9092

Node 3

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.57.35:9092

Define the Number of Log Partitions per Topic

More partitions allow greater parallelism for consumption, but this will also result in more files across the brokers.

The default is set to 1. Ensure that you use a number that is at least divisible by the number of nodes in the cluster. Let’s use 6 in our case.

#num.partitions=1
num.partitions=6

In the very basic setup, those are just the only configs we can make. Save and exit the file.

Open Controller/Broker Ports on Firewall

Ensure that these ports, 9093/tcp ( between controller nodes) and 9092/tcp (between brokers and clients) are opened on firewall.

Are you Upgrading your Already Existing Kafka Cluster

If you are upgrading an already running Kafka cluster, the current state of the quorum of brokers is stored in the file called quorum-state. The quorum is a group of brokers that are responsible for managing the metadata for a Kafka cluster. The quorum-state file is used to ensure that all of the brokers in the quorum have the same view of the metadata.

This file is usually located in the logs directory;

find /opt/kafka -iname "*quorum-state*"

Sample outpu;

/opt/kafka/logs/__cluster_metadata-0/quorum-state

As you can see the sample contents of this file;

{"clusterId":"","leaderId":1,"leaderEpoch":12,"votedId":-1,"appliedOffset":0,"currentVoters":[{"voterId":1}],"data_version":0}

The contents of the quorum-state tile you provided are as follows:

  • clusterId: The ID of the Kafka cluster.
  • leaderId: The ID of the broker that is currently the leader of the cluster.
  • leaderEpoch: The epoch of the leader. This is a number that is incremented whenever the leader changes.
  • votedId: The ID of the broker that cast the most recent vote for leader.
  • appliedOffset: The offset of the last message that has been applied to the metadata log.
  • currentVoters: A list of the brokers that have voted for the current leader.
  • data_version: The version of the quorum-state file.

If you restart a Kafka service for a cluster that you just upgraded, you may get the error;

ERROR [SharedServer id=1] Got exception while starting SharedServer (kafka.server.SharedServer)
kafka-server-start.sh[12628]: java.lang.IllegalStateException: Configured voter set: [1, 2, 3] is different from the voter set read from the state file: [1]. Check if the quorum configuration is up to date, or wipe out the local state file if necessary

So, there are two ways to fix this;

1. Manually update the current voters ID in the quorum-state file;
vim /opt/kafka/logs/__cluster_metadata-0/quorum-state

Change this (in my case);

{"clusterId":"","leaderId":1,"leaderEpoch":12,"votedId":-1,"appliedOffset":0,"currentVoters":[{"voterId":1}],"data_version":0}

to;

{"clusterId":"","leaderId":1,"leaderEpoch":12,"votedId":-1,"appliedOffset":0,"currentVoters":[{"voterId":1},{"voterId":2},{"voterId":3}],"data_version":0}

(If you cloned the other nodes to make cluster, update the same on them)

2. Delete the Quorum-state file

You can remove the file, entirely;

rm -rf /opt/kafka/logs/__cluster_metadata-0/quorum-state

Update the Cluster ID

When you setup KRaft Kafka, there is a step that you had to format Kafka logs directory to KRaft format. In the process, a random cluster ID (cluster.id) is generated. This information, is stored in the meta.properties in the logs directory. In order to avoid unexpected error due to INCONSISTENT_CLUSTER_ID in VOTE response, you need to edit the meta.properties file and change the ID to be same across all nodes.

cat /opt/kafka/logs/meta.properties

#
#Sun Jul 16 15:19:36 EDT 2023
node.id=1
version=1
cluster.id=13OOWyQeQ_etLS811azqQQ

Thus, ensure this cluster.id value is same across the all the nodes.

You can alway print the cluster ID using the command below;

/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server BROKER-IP:9092

Restart Kafka Service

Restart Kafka service to apply the changes.

systemctl restart kafka

Check the service;

systemctl status kafka

● kafka.service - Apache Kafka
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; preset: enabled)
     Active: active (running) since Tue 2023-07-18 16:50:37 EDT; 50s ago
   Main PID: 16581 (java)
      Tasks: 90 (limit: 4642)
     Memory: 763.7M
        CPU: 11.661s
     CGroup: /system.slice/kafka.service
             └─16581 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:>

Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,674] INFO Awaiting socket connections on 192.168.57.33:9092. (kafka.network.DataPlaneAcceptor)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Finished waiting for all of the authorizer futures to be completed (kafka.server.Broker>
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.Brok>
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO [BrokerServer id=2] Transition from STARTING to STARTED (kafka.server.BrokerServer)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO Kafka version: 3.5.0 (org.apache.kafka.common.utils.AppInfoParser)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO Kafka commitId: c97b88d5db4de28d (org.apache.kafka.common.utils.AppInfoParser)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,675] INFO Kafka startTimeMs: 1689713442675 (org.apache.kafka.common.utils.AppInfoParser)
Jul 18 16:50:42 knode2.kifarunix-demo.com kafka-server-start.sh[16581]: [2023-07-18 16:50:42,676] INFO [KafkaRaftServer nodeId=2] Kafka Server started (kafka.server.KafkaRaftServer)

Check the Kafka KRaft cluster ports;

ss -altnp | grep :90

LISTEN 0      50     [::ffff:192.168.57.35]:9092             *:*    users:(("java",pid=41651,fd=151))                                                                                                                            
LISTEN 0      50     [::ffff:192.168.57.35]:9093             *:*    users:(("java",pid=41651,fd=132))

If any issue, check the logs for tips.

Configure Producers to Ship Logs to Kafka Cluster

It is now time to test your Kafka cluster.

In our previous tutorial, we configure Filebeat to collect system logs and ship them to Kafka.

Complete Guide: Configuring Filebeat to Send Logs to Kafka

You might have a different type of producer setup but it doesn’t matter, your Kafka cluster should be able to receive logs. We will use the above guide to confirm the same in this guide.

So, as you can see in the guide above, we have configure Filebeat to ship logs to filebeat topic on node1 in the Kafka cluster.

Thus, login to the node1 and check the topics;

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.57.32:9092 --list
__consumer_offsets
filebeat
filebeat-ssl
kafka-topic-test

You can consume this topic to confirm if it is receiving logs;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.32:9092 --topic filebeat

Run the same consume script on other nodes;

Node2;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.33:9092 --topic filebeat

Node 3;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.35:9092 --topic filebeat

You should be able to receive similar events.

Kafka Cluster Management Scripts

You can find a whole lot of Kafka scripts under /opt/kafka/bin/ (as per our guide).

ls -1 /opt/kafka/bin/ | grep 

connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
kafka-dump-log.sh
kafka-e2e-latency.sh
kafka-features.sh
kafka-get-offsets.sh
kafka-jmx.sh
kafka-leader-election.sh
kafka-log-dirs.sh
kafka-metadata-quorum.sh
kafka-metadata-shell.sh
kafka-mirror-maker.sh
kafka-producer-perf-test.sh
kafka-reassign-partitions.sh
kafka-replica-verification.sh
kafka-run-class.sh
kafka-server-start.sh
kafka-server-stop.sh
kafka-storage.sh
kafka-streams-application-reset.sh
kafka-topics.sh
kafka-transactions.sh
kafka-verifiable-consumer.sh
kafka-verifiable-producer.sh
trogdor.sh
windows
zookeeper-security-migration.sh
zookeeper-server-start.sh
zookeeper-server-stop.sh
zookeeper-shell.sh

To check how to use each script, consult the respective script help page.

For example;

/opt/kafka/bin/kafka-metadata-quorum.sh --help

usage: kafka-metadata-quorum [-h] --bootstrap-server BOOTSTRAP_SERVER [--command-config COMMAND_CONFIG] {describe} ...

This tool describes kraft metadata quorum status.

positional arguments:
  {describe}
    describe             Describe the metadata quorum info

optional arguments:
  -h, --help             show this help message and exit
  --bootstrap-server BOOTSTRAP_SERVER
                         A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.
  --command-config COMMAND_CONFIG
                         Property file containing configs to be passed to Admin Client.

Manage Kafka Cluster from UI

You can manager your cluster using Kadeck;

Install Kadeck Apache Kafka UI Tool on Debian/Ubuntu

Setup Kafka Kraft Cluster with SSL/TLS

The guide below should be helpful in setting up Kafka Kraft cluster with SSL/TLS;

Quick Guide: Configure Apache Kafka SSL/TLS Encryption for Enhanced Security

SUPPORT US VIA A VIRTUAL CUP OF COFFEE

We're passionate about sharing our knowledge and experiences with you through our blog. If you appreciate our efforts, consider buying us a virtual coffee. Your support keeps us motivated and enables us to continually improve, ensuring that we can provide you with the best content possible. Thank you for being a coffee-fueled champion of our work!

Photo of author
Kifarunix
Linux Certified Engineer, with a passion for open-source technology and a strong understanding of Linux systems. With experience in system administration, troubleshooting, and automation, I am skilled in maintaining and optimizing Linux infrastructure.

Leave a Comment