How can I configure Filebeat to send logs to Kafka? This is a complete guide on configuring Filebeat to send logs to Kafka. Filebeat is one of the Elastic stack beats that is used to collect system log data and sent them either to Elasticsearch or Logstash or to distributed event store and handling large volumes of data streams processing platforms such as Kafka.
Table of Contents
Configure Filebeat to Send Logs to Kafka
Install and Configure Kafka
You can check our guides on the links below to learn how to install and configure Kafka.
Easy Steps: Install Apache Kafka on Debian 12
Quick Guide: Configure Apache Kafka SSL/TLS Encryption for Enhanced Security
Install Filebeat
You need Filebeat to be able to collect system logs from various Linux systems and push them to Kafka.
You can check our guides on how to install Filebeat;
How to install Filebeat on Linux
Configure Filebeat to Send Logs to Kafka with SSL/TLS (PLAINTEXT)
Configure Filebeat to Connect to Plaintext Kafka
By default, Filebeat is set to send event data to Elasticsearch as you can see in the output section of the filebeat.yml
configuration file.
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
In order to configure Filebeat to send logs to Kafka, edit the Filebeat configuration file and update the output section by configuring the Apache Kafka connection and other details.
vim /etc/filebeat/filebeat.yml
Start by commenting out the Elasticsearch output configs;
# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
Next, add a section for Apache Kafka output beneath (not necessarily) Elasticsearch output. If you are not sending any sensitive information to Kafka or the connection is just local to your network, this configuration below works for Kafka plaintext connection where traffic is send un-encrypted.
# ------------------------------ Kafka Output -------------------------------
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["192.168.57.32:9092"]
# message topic selection + partitioning
topic: "filebeat"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
So, what do the options used here mean?
output.kafka
: This specifies that output plugin to be used by Filebeat.hosts
: Defines the addresses and ports of the Kafka broker(s) to connect to for writing event data. In this case, Filebeat is configured to connect to a Kafka broker running on the IP address 192.168.57.32 and port 9092. If you have a Kafka cluster, you can use an HA approach;hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic
: It specifies the Kafka topic to which Filebeat will send the messages. In this case, the topic is set tofilebeat
. It can be anything, including dynamic names.partition.round_robin
: This setting determines how Filebeat selects the partition to write messages to. Withreachable_only
set tofalse
, Filebeat will attempt to write to all available partitions, even if some of them are not currently reachable.required_acks
: It indicates the number of acknowledgments required from Kafka before considering a message as successfully written. A value of 1 means that only the leader of the partition needs to acknowledge the message.compression
: This parameter specifies the compression codec to use for compressing messages sent to Kafka. Here, ‘gzip’ compression is enabled, which can reduce the message size and network bandwidth usage.max_message_bytes
: It sets the maximum size of a message that Filebeat can send to Kafka. In this case, the maximum message size is set to 1,000,000 bytes (1 megabyte).
Read more on Filebeat Kafka output configuration options.
Also check how to change output codec BUT output remains as output.kafka.
Save the changes made to the Filebeat configuration and exit.
Next, test the configuration for any syntax error;
filebeat test config
Test the connection to Kafka broker;
filebeat test output
If everything is okay, you should see an output similar to;
Kafka: 192.168.57.32:9092...
parse host... OK
dns lookup... OK
addresses: 192.168.57.32
dial up... OK
Start Filebeat.
You can begin by running it in the foreground;
filebeat -e
If everything is fine, you should see a message about the connection to Kafka;
Connection to kafka(192.168.57.32:9092) established
You can then start Filebeat as service. Thus press ctrl+c to stop the foreground process.
systemctl enable --now
Confirm Creation of Kafka Topics on Kafka Broker
On Kafka broker/server terminal, execute this command to check if your topic has been created;
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
My sample output;
filebeat
kafka-topic-test
Reading Logs from Kafka Topic
Read the logs on the topic, use the command;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <broker_host>:<broker_port> --topic <topic_name> [--from-beginning]
For example;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic filebeat
Sample output;
{"@timestamp":"2023-07-16T19:42:06.697Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"log":{"offset":2384006,"file":{"path":"/var/log/auth.log"}},"input":{"type":"log"},"fileset":{"name":"auth"},"service":{"type":"system"},"ecs":{"version":"8.0.0"},"message":"Jul 16 19:42:01 mx CRON[18368]: pam_unix(cron:session): session opened for user root by (uid=0)","host":{"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)"},"id":"e46f0b2ecf5841be8df054ae93c48214","name":"mx.kifarunix-demo.com","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"]},"agent":{"ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"}}
{"@timestamp":"2023-07-16T19:42:06.697Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"input":{"type":"log"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"},"fileset":{"name":"auth"},"ecs":{"version":"8.0.0"},"message":"Jul 16 19:42:01 mx CRON[18369]: pam_unix(cron:session): session opened for user sogo by (uid=0)","log":{"offset":2384102,"file":{"path":"/var/log/auth.log"}},"service":{"type":"system"},"host":{"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"name":"mx.kifarunix-demo.com","ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian"}},"agent":{"version":"8.8.2","ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat"}}
{"@timestamp":"2023-07-16T19:42:06.697Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"message":"Jul 16 19:42:01 mx CRON[18368]: pam_unix(cron:session): session closed for user root","log":{"offset":2384198,"file":{"path":"/var/log/auth.log"}},"service":{"type":"system"},"ecs":{"version":"8.0.0"},"fileset":{"name":"auth"},"input":{"type":"log"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"},"host":{"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"name":"mx.kifarunix-demo.com","hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false},"agent":{"version":"8.8.2","ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat"}}
{"@timestamp":"2023-07-16T19:42:09.055Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-syslog-pipeline"},"log":{"offset":549530,"file":{"path":"/var/log/syslog"}},"message":"Jul 16 19:42:01 mx CRON[18370]: (sogo) CMD (/usr/sbin/sogo-tool expire-sessions 30 >/dev/null 2>&1; /usr/sbin/sogo-ealarms-notify >/dev/null 2>&1)","event":{"module":"system","timezone":"+00:00","dataset":"system.syslog"},"fileset":{"name":"syslog"},"agent":{"name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2","ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047"},"input":{"type":"log"},"service":{"type":"system"},"ecs":{"version":"1.12.0"},"host":{"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"name":"mx.kifarunix-demo.com"}}
^CProcessed a total of 4 messages
This will read the logs in realtime only.
You can read the logs from the beginning and in realtime at the same time;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic filebeat --from-beginning
Configure Filebeat to Send Logs to Kafka with SSL/TLS
Configure Filebeat-Kafka Output SSL/TLS Connection
What if your Kafka is configured with SSL/TLS encryption, how can you configure Filebeat to send logs to Kafka with SSL/TLS?
Simply update the SSL connection settings;
# ------------------------------ Kafka Output -------------------------------
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka.kifarunix-demo.com:9092"]
ssl:
enabled: true
certificate_authorities: ["/etc/filebeat/kafka.ca.crt"]
certificate: "/etc/filebeat/kafka/server.crt"
key: "/etc/filebeat/kafka/server.key"
# message topic selection + partitioning
topic: "filebeat-ssl"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
You need to copy the CA, server certificate and key from the Kafka server.
Ensure the Kafka hostname is resolvable.
Save and exit file after making changes.
Test Filebeat configuration;
filebeat test config
Unfortunately, in Filebeat, Kafka output doesn’t support TLS testing.
Start Filebeat, in the foreground, just to verify the connection;
filebeat -e
If you see such an output, then you are good.
"Connection to kafka(kafka.kifarunix-demo.com:9092) established"
You can start Filebeat as a service;
systemctl start filebeat
Confirm Creation of Kafka Topics on Kafka Broker
On Kafka broker/server terminal, execute this command to check if your topic has been created;
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9094
(Note that we have configure port 9094 on the Kafka server for local Kafka connections without SSL)
My sample output;
__consumer_offsets
filebeat
filebeat-ssl
kafka-topic-test
Reading Logs from Kafka Topic
Read the logs on the topic, use the command;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic filebeat-ssl
Sample events;
"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"service":{"type":"system"},"input":{"type":"log"},"ecs":{"version":"8.0.0"},"host":{"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","name":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"]},"agent":{"ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2"},"log":{"offset":2602826,"file":{"path":"/var/log/auth.log"}},"message":"Jul 17 04:51:01 mx CRON[35424]: pam_unix(cron:session): session opened for user root by (uid=0)","fileset":{"name":"auth"},"event":{"dataset":"system.auth","timezone":"+00:00","module":"system"}}
{"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"fileset":{"name":"auth"},"input":{"type":"log"},"ecs":{"version":"8.0.0"},"service":{"type":"system"},"host":{"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux"},"name":"mx.kifarunix-demo.com","id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false},"agent":{"type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com"},"message":"Jul 17 04:51:01 mx CRON[35425]: pam_unix(cron:session): session opened for user sogo by (uid=0)","log":{"offset":2602922,"file":{"path":"/var/log/auth.log"}},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"}}
{"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"},"fileset":{"name":"auth"},"ecs":{"version":"8.0.0"},"message":"Jul 17 04:51:01 mx CRON[35424]: pam_unix(cron:session): session closed for user root","log":{"offset":2603018,"file":{"path":"/var/log/auth.log"}},"service":{"type":"system"},"input":{"type":"log"},"host":{"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu"},"name":"mx.kifarunix-demo.com"},"agent":{"name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047"}}
{"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"host":{"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"name":"mx.kifarunix-demo.com"},"message":"Jul 17 04:51:02 mx CRON[35425]: pam_unix(cron:session): session closed for user sogo","log":{"offset":2603103,"file":{"path":"/var/log/auth.log"}},"event":{"dataset":"system.auth","module":"system","timezone":"+00:00"},"fileset":{"name":"auth"},"ecs":{"version":"8.0.0"},"input":{"type":"log"},"service":{"type":"system"},"agent":{"type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com"}}
{"@timestamp":"2023-07-17T04:51:09.813Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-syslog-pipeline"},"fileset":{"name":"syslog"},"ecs":{"version":"1.12.0"},"host":{"architecture":"x86_64","os":{"kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"name":"mx.kifarunix-demo.com","ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com"},"log":{"file":{"path":"/var/log/syslog"},"offset":87749},"event":{"module":"system","dataset":"system.syslog","timezone":"+00:00"},"input":{"type":"log"},"agent":{"type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com"},"message":"Jul 17 04:51:01 mx CRON[35426]: (sogo) CMD (/usr/sbin/sogo-tool expire-sessions 30 >/dev/null 2>&1; /usr/sbin/sogo-ealarms-notify >/dev/null 2>&1)","service":{"type":"system"}}
{"@timestamp":"2023-07-17T04:51:09.813Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-syslog-pipeline"},"fileset":{"name":"syslog"},"ecs":{"version":"1.12.0"},"agent":{"id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d"},"host":{"name":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com"},"input":{"type":"log"},"event":{"module":"system","dataset":"system.syslog","timezone":"+00:00"},"service":{"type":"system"},"log":{"offset":87896,"file":{"path":"/var/log/syslog"}},"message":"Jul 17 04:51:01 mx CRON[35427]: (root) CMD (/bin/bash /usr/local/bin/fail2ban_banned_db unban_db)"}
^CProcessed a total of 6 messages
And that is it! You should now have encrypted connection between your Filebeat and Kafka broker, for enhanced security.
Configure Consumers
The logs are now hitting your Kafka topics. Similarly, configure your consumers to use SSL/TLS connection while reading log data from the Kafka topics.
That closes our guide on configure Filebeat to send logs to Kafka via un-encrypted and encrypted channels.
Other Tutorials
Check SSL Certificate Expiry Date from Certificate File
Monitor SSL/TLS Certificate Expiry with Prometheus and Grafana