Logstash: Write Specific Events to Specific Index

0
698

In this blog post, you will learn how to write specific events to specific index using Logstash. Logstash is a free and open server-side data processing pipeline that ingests data from a multitude of sources, transforms it, and then sends it to your favorite “stash”, which in this example setup is an Elasticsearch.

Logstash can be configured to write specific events to specific indices based on conditionals. Follow through to learn how to.

Logstash: Write Specific Events to Specific Index

So, how can you configure Logstash to write specific events to specific Elasticsearch index? Well, if you are forwarding your events to Logstash, then definitely you must be having some filters to parse or extract specific fields before being forwarded to stash such Elasticsearch for indexing.

In our previous guides, we covered various tutorials on processing various logs with Logstash;

How to parse SSH logs with Logstash

Process and Visualize ModSecurity Logs on ELK Stack

Visualize WordPress User Activity Logs on ELK Stack

For the purposes of demonstrating how to write specific events to specific index using Logstash, we will consider our guide on how to parse SSH logs with Logstash.

Consider the Logstash configuration file below which parses SSH authentication events;

  • Failed login by invalid users
May 20 20:18:59 elk sshd[1831]: Failed password for invalid user admin from 192.168.59.1 port 41150 ssh2
  • Accepted Password
May 20 20:20:32 elk sshd[1863]: Accepted password for root from 192.168.59.1 port 41174 ssh2
  • Failed Password for valid users
May 20 20:22:05 elk sshd[1967]: Failed password for kifarunix from 192.168.59.1 port 41190 ssh2
input {
  beats {
    port => 5044
  }
}
filter {
  # parses Successful login events
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s+%{IPORHOST:dst_host}\s+%{WORD:syslog_program}\[\d+\]:\s+(?<status>Accepted\s+password)\s+for\s+%{USER:auth_user}\s+from\s+%{SYSLOGHOST:src_host}.*" }
    add_field => { "activity" => "SSH Logins" }
    add_tag => "successful_login"
    }
  # parses Failed login events for valid users
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s+%{IPORHOST:dst_host}\s+%{WORD:syslog_program}\[\d+\]:\s+(?<status>Failed\s+password)\s+for\s+%{USER:auth_user}\s+from\s+%{SYSLOGHOST:src_host}.*" }
    add_field => { "activity" => "SSH Logins" }
    add_tag => "failed_login"
    }
  # parses Failed login events for invalid users
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s+%{IPORHOST:dst_host}\s+%{WORD:syslog_program}\[\d+\]:\s+(?<status>Failed\s+password)\s+for\s+invalid\s+user\s+%{USER:auth_user}\s+from\s+%{SYSLOGHOST:src_host}.*" }
    add_field => { "activity" => "SSH Logins" }
    add_tag => "invalid_users"
    }
}
output {
   elasticsearch {
     hosts => ["localhost:9200"]
     manage_template => false
     index => "ssh_auth-%{+YYYY.MM}"
 }
}

As per the Logstash output configuration section;

output {
   elasticsearch {
     hosts => ["localhost:9200"]
     manage_template => false
     index => "ssh_auth-%{+YYYY.MM}"
 }
}

All the events are sent to a single index, called ssh_auth-%{+YYYY.MM}.

Logstash: Write Specific Events to Specific Index

Now, we would like to write the events for failed login for valid users, failed login events for invalid users, and successful login events to individual indices.

Logstash supports the use conditionals that can help with this.

To easily demonstrate how to write specific events to specific Index with Logstash, we will have configured each grok pattern regex to add a tag to the event that will seperate the events from each other.

For example, for successful login events, a tag, successful_login, will be added to the event, same applies to failed login for valid users and failed login events for invalid users events which adds the failed_login and invalid_users tags respectively.

So, we will modify our Logstash output with the if statement as shown below;

output {
if "successful_login" in [tags] {
   elasticsearch {
     hosts => ["localhost:9200"]
     index => "ssh-success-logins-%{+YYYY.MM}"
     }
   }
   else if "failed_login" in [tags] {
   elasticsearch {
     hosts => ["localhost:9200"]
     index => "ssh-failed-logins-%{+YYYY.MM}"
     }
   }
   else if "invalid_users" in [tags] {
   elasticsearch {
     hosts => ["localhost:9200"]
     index => "ssh-invalid-users-%{+YYYY.MM}"
     }
   }
   else {
   elasticsearch {
     hosts => ["localhost:9200"]
     index => "other-ssh-events-%{+YYYY.MM}"
     }
   }
}

This will cause Logstash to write successful login events to Elasticsearch index called ssh-success-logins-*, failed login events to index called ssh-failed-logins-*, invalid user login events to index called ssh-invalid-users-* and any other events to other-ssh-events-* index.

Check Logstash Configuration Syntax

Save the Logstash configuration file and run syntax check using the command;

/usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
Configuration OK
[2021-05-20T21:50:10,111][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

If you get Configuration OK then you are good to go.

Logstash is now ready to receive events from Filebeat.

Assuming you already setup Filebeat and performed the ssh authentication events, then you navigate to Kibana and verify if you have Elasticsearch indices created;

Logstash: Write Specific Events to Specific Index

You can now go ahead and create Kibana indices, Kibana > Index Patterns > Create index patterns.

Logstash: Write Specific Events to Specific Index

From the Discovery;

Logstash: Write Specific Events to Specific Index

And there you go, you have written different events to different Elasticsearch indices.

That marks the end of the tutorial on Logstash: Write Specific Events to Specific Index.

Other Related Tutorials

Easy way to configure Filebeat-Logstash SSL/TLS Connection

How to Debug Logstash Grok Filters

Integrate Wazuh Manager with ELK Stack

LEAVE A REPLY

Please enter your comment!
Please enter your name here