Improved OSSEC Log Parsing with Logstash

logstash-ossec-225x225The ELK stack (Elasticsearch-Logstash-Kibana) provides a cost effective alternative to commercial SIEMs for ingesting and managing OSSEC alert logs. Previously I wrote a blog – OSSEC Log Management with Elasticsearch – that discusses the design of an ELK based log system.

Since then some readers have asked for and suggested ways to parse additional fields from the OSSEC alert log stream. For example, the IP addresses of systems that cause certain security events is buried down in the Details field. So I have created a Logstash configuration file that does just that.


Logstash Configuration for Alert Details

The log management system I described previously gets alerts from the OSSEC server via syslog (UDP) to Logstash which, in turn, parses the alerts and forwards them to an Elasticsearch instance for indexing. When OSSEC outputs alerts over syslog they are flattened into single lines and certain field names are altered over their alert log counterparts.

Here is an example of an alert log entry that is generated when an attempt to login to a system with SSH fails, followed by the corresponding syslog alert line.

** Alert 1408299218.5566: - syslog,sshd,invalid_login,authentication_failed,
2014 Aug 17 11:13:38 localhost->/var/log/secure
Rule: 5710 (level 5) -> 'Attempt to login using a non-existent user'
Src IP:
Aug 17 11:13:38 localhost sshd[3029]: Failed none for invalid user vic from port 8400 ssh2

Aug 17 11:13:38 ossec ossec: Alert Level: 5; Rule: 5710 - Attempt to login using a non-existent user; Location: localhost->/var/log/secure; srcip:; Aug 17 11:13:38 localhost sshd[3029]: Failed none for invalid user vic from port 8400 ssh2

Note that the Src IP field in the alert log is called srcip in the syslog output. There are several other fields like this – another one is Dst IP the name of which is dstip in the syslog output – that appear in other OSSEC alerts.

To parse syslog lines like this, I’ve taken the Logstash configuration file from my previous blog then added some additional field parsing directives to tease out more fields from the Details field.

input {
# stdin{}
  udp {
     port => 9000
     type => "syslog"
filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}: Alert Level: %{NONNEGINT:Alert_Level}; Rule: %{NONNEGINT:Rule} - %{DATA:Description}; Location: %{DATA:Location}; (user: %{USER:User};%{SPACE})?(srcip: %{IP:Src_IP};%{SPACE})?(user: %{USER:User};%{SPACE})?(dstip: %{IP:Dst_IP};%{SPACE})?(src_port: %{NONNEGINT:Src_Port};%{SPACE})?(dst_port: %{NONNEGINT:Dst_Port};%{SPACE})?%{GREEDYDATA:Details}" }
      add_field => [ "ossec_server", "%{host}" ]
    mutate {
      remove_field => [ "message","syslog_timestamp", "syslog_program", "syslog_host", "syslog_message", "syslog_pid", "@version", "type", "host" ]
output {
#   stdout {
#     codec => rubydebug
#   }
   elasticsearch_http {
     host => ""

Optional fields can be handled with Logstash by placing them in a ()? block as I did with this part of the message grok:

(user: %{USER:User};%{SPACE})?(srcip: %{IP:Src_IP};%{SPACE})?(user: %{USER:User};%{SPACE})?(dstip: %{IP:Dst_IP};%{SPACE})?(src_port: %{NONNEGINT:Src_Port};%{SPACE})?(dst_port: %{NONNEGINT:Dst_Port};%{SPACE})?

With this line, if any of the fields usersrcip, dstip, src_port, or dst_port appear in the syslog output, each will be parsed and placed into a Logstash output field. Note you should replace the IP address in the host field to direct the Logstash output to your Elasticsearch cluster.


Article by Vic Hargrave

Software developer, blogger and family man enjoying life one cup of coffee at a time. I like programming and writing articles on tech topics. And yeah, I like coffee.


  1. How about OSSEC parsing the sudosh.log file… that has been a real challenge… it almost looks like binary data. Has anyone been successful in doing so?

  2. Thanks for the two blogposts, they really helped me a lot.

    However, I found it easier to configure OSSEC to send JSON formatted log lines to the syslog server (jsonout_output is not available in OSSEC 2.8.2) and process it with a few filter lines in logstash:

    grok {
    match => { “message” => “%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{SYSLOGHOST} %{SYSLOGPROG}: %{GREEDYDATA:json_message}” }
    json {
    source => json_message

    I had to prepend “%{SYSLOGS5424PRI}” because there is no intermittent syslog server (OSSEC, logstash and elasticsearch are on the same host) interpreting and removing this tiny bit of information prior to sending it to logstash.

  3. Hi 😉
    I propose:
    Location: \(?%{DATA:Host}\)?(%{SPACE}any)?->%{DATA:Location}
    To filter by host on large infrastructure.

Leave a Reply

Your email address will not be published. Required fields are marked *