#Extending #MSOMS Log Analytics through Linux – part 2 – custom ruby filter


#Extending #MSOMS Log Analytics through Linux – part 2 – custom ruby filter

Other articles in the serie

Extending #MSOMS Log Analytics through Linux – part 1

The solution

The Kemp solution I’m trying to implement is based on two data sources:

  • syslog logs
  • SNMP GET/WALK information / counters

I’m not going to use at this stage:

  • SNMP traps, this can be done, but traps are less reliable than a SNMP GET polling where even if I miss one get I can always read what I need at the next poll cycle
  • REST API, this API is probably the way to go in the long term even if at the moment doesn’t add anything to the SNMP interface. Since I already know what to look for in SNMP I will start using SNMP and as v2 I will move to REST

Right now my goal is to ingest as much as I can in a meaningful way, and only then try to find a way to represent the data with a custom solution. After all this is an experiment and a journey, if I’m wrong I can always step back and try another way.
In this installment I will tackle the syslog stream. The syslog information from Kemp devices has two message categories (ident field in syslog message), let’s call them standard syslog messages and stats messages. Stats messages report statistics for the Kemp device such as the number of Virtual and Real servers. As a first step the solution must read the stream and ingest stats messages as performance data points, while ingesting other messages as a custom log named KempLog.

Getting the syslog data

Getting the syslog data is straightforward since we already have an input datasource for syslog defined for the OMS agent. While this is true, at the same time what I want to do is to have my own custom log data type for Kemp so that I can enrich the default syslog schema when and if it is needed.

There are two ways to achieve this result:

  1. you can filter and retag the standard syslog stream (oms.syslog) using a rewrite_tag_filter and filtering the incoming data based on the hostnames of the Kemp devices
  2. you can define a new syslog input on a different port and configure accordingly the syslog stack for the Kemp devices

I choose the latter approach. The resulting configuration is:

Kemp –> Syslog on Linux –> OMS agent listener –> custom fluentd filter

I could have connected the Kemp device syslog stream directly to the OMS agent listener, but for consistency sake (with the default syslog source) I went through this double hop design.

Configuring the syslog stream

To be able to process the syslog stream form the Kemp devices, you must configure the rsyslog demon (in my case, if you’re using syslogd the configuration is slightly different) to redirect all the messages from my kemp Devices to a specific port. The input plugin for the fluentd data flow must have a bind to this port (obviously), in our case let’s use port 25326/udp. The rsyslog configuration file must be at an higher priority than the standard OMS one so that the messages from the Kemp devices can be managed and filtered before the standard processing (i.e. I don’t want to inget them twice). Higher means with a lower rank if sorted by name.

# /etc/rsyslog.conf.d/60-kemp-rsyslog.conf
# Custom Rules for Kemp Devices.
#
# For more information see rsyslog.conf(5) and /etc/rsyslog.conf

#
# First some standard log files.
# Un commenbt for debug purposes and to get the log messages in kemp.log
#if $fromhost-ip startswith '172.20.2.71' then /var/log/kemp.log

#as an alternative we can send everthing from these hosts to the OMS agent
if $fromhost-ip startswith '172.20.2.71' then @127.0.0.1:25326

#stop any fruther processing, remove if you want to ingest the data as std syslog too
stop

then you must configure your own fluentd configuration to get the messages and process them. As a first step just get the messages and write them into a log files without ingesting anything into OMS. First define a of type syslog on port 25326 and define that any data from this source must be tagged oms.qnd.Kemp, then use an output plugin of type file to write an output file. This is super useful to better understand your messages schema and be able to build your custom filter, but it’s just an intermediate step, so it is here just for future reference.

# /etc/opt/microsoft/omsagent/conf/omsagent.d/kemp.conf
type syslog
port 25326
bind 127.0.0.1
protocol_type udp
tag oms.qnd.Kemp
log_level debug
type file
path /var/opt/microsoft/omsagent/log/kemp.log

Parsing the data

Now that the data flow is working let’s take one more step: get the messages and process them differently between stats and non stats. To do this we’re going to implement the following data flow:

SNMP source oms.qnd.Kemp [1] –> custom fluentd filter [2] –> rewrite_tag_filter (oms.qnd.KempLog) [3] –> out_oms_api [4]
–> rewrite_tag_filter (oms.qnd.KempPerfStats) [3] –> out_oms [4]

Basically the logic is:

  1. get the data from snmp and tag it as oms.qnd.Kemp
  2. process the messages and create two different payload one for the stats data formatted as performance and the other as a custom log
  3. based on different properties in the payload use the rewrite tag filter to tag the performance data as oms.qnd.KempPerfStats and the custom log data as oms.qnd.KempLog
  4. use the output plugin out_oms_api for the oms.qnd.KempLog data, where the third part of the tag will be the log name in OMS; use out_oms for ingesting the perfomance data

This is the complete configuration of our fluentd data flow

# /etc/opt/microsoft/omsagent/conf/omsagent.d/kemp.conf
type syslog
port 25326
bind 127.0.0.1
protocol_type udp
tag oms.qnd.Kemp
log_level debug
type filter_kemp
type rewrite_tag_filter
rewriterule1 DataType LINUX_PERF_BLOB oms.qnd.KempPerfStats
reqriterule3 ident [^stats] oms.qnd.KempLog
type out_oms_api
time_generated_field EventTime
log_level debug
num_threads 1
buffer_chunk_limit 1m
buffer_type file
buffer_path /var/opt/microsoft/omsagent/state/out_oms_kemp*.buffer
buffer_queue_limit 10
buffer_queue_full_action drop_oldest_chunk
flush_interval 10s
retry_limit 10
retry_wait 30s
max_retry_wait 10m

as you can see there’s no out_oms match in the configuration, in fact since the tag for performance data starts with oms. it matches the default OMS agent configuration for standard data types, so it’s unnecessary.

The custom fluentd filter

Lastly let’s add the custom fluentd filter that creates the two different data payload based on the ident field. The filter is based on the standard syslog one the only notable part is when the record is parsed for stats (actually the message part is parsed to see if it is a stats message) and two different payload are created accordingly and set in a hash map called “polyResults”

regexp_stats=Regexp.new("(?.*)status:\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)")
result=regexp_stats.match(record["message"])
if !result.nil?
#let's try to transform this in a perf data point
#for sure there's a better and more elegant way to do this in ruby, too bad I don't know it
data_items = []

data_info = {}
data_info["Timestamp"] = record["Timestamp"]
data_info["Host"] = record["Host"]
object_name = result["type"]
data_info["ObjectName"] = "KempLM-#{object_name}"
data_info["InstanceName"] = "_Total"
counters=[]
counter_pair = {}
counter_pair["CounterName"]="Total"
counter_pair["Value"]=Integer(result["total"]) rescue nil
counters.push(counter_pair)
counter_pair = {}
counter_pair["CounterName"]="Up"
counter_pair["Value"]=Integer(result["up"]) rescue nil
counters.push(counter_pair)
counter_pair = {}
counter_pair["CounterName"]="Down"
counter_pair["Value"]=Integer(result["down"]) rescue nil
counters.push(counter_pair)
counter_pair = {}
counter_pair["CounterName"]="Disabled"
counter_pair["Value"]=Integer(result["disabled"]) rescue nil
counters.push(counter_pair)

data_info["Collections"] = counters
data_items.push(data_info)

wrapper = {
"DataType"=>"LINUX_PERF_BLOB",
"IPName"=>"LogManagement",
"DataItems"=>data_items
}
polyResult=wrapper
else
record["Message"] = record["message"]
record.delete "message"
polyResult = record
end

As you can read the perf data type must have a standard schema (pseudo json schema from reverse engineering, take with cautions):

//pseudo schema for LINUX_PERF_BLOB DataItems
{
"Timestamp": {"type": "string"},
"Host": {"type": "string"},
"ObjectName": {"type": "string"},
"InstanceName": {"type": "string"},
"Collections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"CounterName": {"type": "string"},
"Value": {"type": "number"}
}
}
}
}

and here is the complete kemp filter

module Fluent
class KempFilter < Filter Fluent::Plugin.register_filter('filter_kemp', self) def initialize super require 'socket' require_relative 'omslog' require_relative 'oms_common' end # Interval in seconds to refresh the cache config_param :ip_cache_refresh_interval, :integer, :default => 300

def configure(conf)
super
@ip_cache = OMS::IPcache.new @ip_cache_refresh_interval
end

def start
super
end

def shutdown
super
end

def filter(tag, time, record)
# Use Time.now, because it is the only way to get subsecond precision in version 0.12.
# The time may be slightly in the future from the ingestion time.
record["Timestamp"] = OMS::Common.format_time(Time.now.to_f)
record["EventTime"] = OMS::Common.format_time(time)
hostname = record["host"]
record["Host"] = hostname
record.delete "host"
record["HostIP"] = "Unknown IP"

host_ip = @ip_cache.get_ip(hostname)
if host_ip.nil?
OMS::Log.warn_once("Failed to get the IP for #{hostname}.")
else
record["HostIP"] = host_ip
end

if record.has_key?("pid")
record["ProcessId"] = record["pid"]
record.delete "pid"
end

# The tag should looks like this : oms.qnd.Kemp.authpriv.notice
tags = tag.split('.')
if tags.size == 5
record["Facility"] = tags[3]
record["Severity"] = tags[4]
else
$log.error "The syslog tag does not have 4 parts #{tag}"
end

regexp_stats=Regexp.new("(?.*)status:\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)")
result=regexp_stats.match(record["message"])
if !result.nil?
#let's try to transform this in a perf data point
#for sure there's a better and more elegant way to do this in ruby, too bad I don't know it
data_items = []

data_info = {}
data_info["Timestamp"] = record["Timestamp"]
data_info["Host"] = record["Host"]
object_name = result["type"]
data_info["ObjectName"] = "KempLM-#{object_name}"
data_info["InstanceName"] = "_Total"
counters=[]
counter_pair = {}
counter_pair["CounterName"]="Total"
counter_pair["Value"]=Integer(result["total"]) rescue nil
counters.push(counter_pair)
counter_pair = {}
counter_pair["CounterName"]="Up"
counter_pair["Value"]=Integer(result["up"]) rescue nil
counters.push(counter_pair)
counter_pair = {}
counter_pair["CounterName"]="Down"
counter_pair["Value"]=Integer(result["down"]) rescue nil
counters.push(counter_pair)
counter_pair = {}
counter_pair["CounterName"]="Disabled"
counter_pair["Value"]=Integer(result["disabled"]) rescue nil
counters.push(counter_pair)

data_info["Collections"] = counters
data_items.push(data_info)

wrapper = {
"DataType"=>"LINUX_PERF_BLOB",
"IPName"=>"LogManagement",
"DataItems"=>data_items
}
polyResult=wrapper
else
record["Message"] = record["message"]
record.delete "message"
polyResult = record
end

polyResult
end
end
end

what you get

Once configured the rsyslog and omsagent need to be restarted, configuration changes are not dynamic, for the data to start to be ingested.

kemp1kemp2

-Daniele
This posting is provided “AS IS” with no warranties, and confers no rights

,

  1. Extending #MSOMS with the Linux Agent | Quae Nocent Docent

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.