#Extending #MSOMS Log Analytics through Linux – part 2 – custom ruby filter
Other articles in the serie
Extending #MSOMS Log Analytics through Linux – part 1
The solution
The Kemp solution I’m trying to implement is based on two data sources:
- syslog logs
- SNMP GET/WALK information / counters
I’m not going to use at this stage:
- SNMP traps, this can be done, but traps are less reliable than a SNMP GET polling where even if I miss one get I can always read what I need at the next poll cycle
- REST API, this API is probably the way to go in the long term even if at the moment doesn’t add anything to the SNMP interface. Since I already know what to look for in SNMP I will start using SNMP and as v2 I will move to REST
Right now my goal is to ingest as much as I can in a meaningful way, and only then try to find a way to represent the data with a custom solution. After all this is an experiment and a journey, if I’m wrong I can always step back and try another way.
In this installment I will tackle the syslog stream. The syslog information from Kemp devices has two message categories (ident field in syslog message), let’s call them standard syslog messages and stats messages. Stats messages report statistics for the Kemp device such as the number of Virtual and Real servers. As a first step the solution must read the stream and ingest stats messages as performance data points, while ingesting other messages as a custom log named KempLog.
Getting the syslog data
Getting the syslog data is straightforward since we already have an input datasource for syslog defined for the OMS agent. While this is true, at the same time what I want to do is to have my own custom log data type for Kemp so that I can enrich the default syslog schema when and if it is needed.
There are two ways to achieve this result:
- you can filter and retag the standard syslog stream (oms.syslog) using a rewrite_tag_filter and filtering the incoming data based on the hostnames of the Kemp devices
- you can define a new syslog input on a different port and configure accordingly the syslog stack for the Kemp devices
I choose the latter approach. The resulting configuration is:
Kemp –> Syslog on Linux –> OMS agent listener –> custom fluentd filter
I could have connected the Kemp device syslog stream directly to the OMS agent listener, but for consistency sake (with the default syslog source) I went through this double hop design.
Configuring the syslog stream
To be able to process the syslog stream form the Kemp devices, you must configure the rsyslog demon (in my case, if you’re using syslogd the configuration is slightly different) to redirect all the messages from my kemp Devices to a specific port. The input plugin for the fluentd data flow must have a bind to this port (obviously), in our case let’s use port 25326/udp. The rsyslog configuration file must be at an higher priority than the standard OMS one so that the messages from the Kemp devices can be managed and filtered before the standard processing (i.e. I don’t want to inget them twice). Higher means with a lower rank if sorted by name.
# /etc/rsyslog.conf.d/60-kemp-rsyslog.conf # Custom Rules for Kemp Devices. # # For more information see rsyslog.conf(5) and /etc/rsyslog.conf # # First some standard log files. # Un commenbt for debug purposes and to get the log messages in kemp.log #if $fromhost-ip startswith '172.20.2.71' then /var/log/kemp.log #as an alternative we can send everthing from these hosts to the OMS agent if $fromhost-ip startswith '172.20.2.71' then @127.0.0.1:25326 #stop any fruther processing, remove if you want to ingest the data as std syslog too stop
then you must configure your own fluentd configuration to get the messages and process them. As a first step just get the messages and write them into a log files without ingesting anything into OMS. First define a of type syslog on port 25326 and define that any data from this source must be tagged oms.qnd.Kemp, then use an output plugin of type file to write an output file. This is super useful to better understand your messages schema and be able to build your custom filter, but it’s just an intermediate step, so it is here just for future reference.
# /etc/opt/microsoft/omsagent/conf/omsagent.d/kemp.conf type syslog port 25326 bind 127.0.0.1 protocol_type udp tag oms.qnd.Kemp log_level debug type file path /var/opt/microsoft/omsagent/log/kemp.log
Parsing the data
Now that the data flow is working let’s take one more step: get the messages and process them differently between stats and non stats. To do this we’re going to implement the following data flow:
SNMP source oms.qnd.Kemp [1] –> custom fluentd filter [2] –> rewrite_tag_filter (oms.qnd.KempLog) [3] –> out_oms_api [4]
–> rewrite_tag_filter (oms.qnd.KempPerfStats) [3] –> out_oms [4]
Basically the logic is:
- get the data from snmp and tag it as oms.qnd.Kemp
- process the messages and create two different payload one for the stats data formatted as performance and the other as a custom log
- based on different properties in the payload use the rewrite tag filter to tag the performance data as oms.qnd.KempPerfStats and the custom log data as oms.qnd.KempLog
- use the output plugin out_oms_api for the oms.qnd.KempLog data, where the third part of the tag will be the log name in OMS; use out_oms for ingesting the perfomance data
This is the complete configuration of our fluentd data flow
# /etc/opt/microsoft/omsagent/conf/omsagent.d/kemp.conf type syslog port 25326 bind 127.0.0.1 protocol_type udp tag oms.qnd.Kemp log_level debug type filter_kemp type rewrite_tag_filter rewriterule1 DataType LINUX_PERF_BLOB oms.qnd.KempPerfStats reqriterule3 ident [^stats] oms.qnd.KempLog type out_oms_api time_generated_field EventTime log_level debug num_threads 1 buffer_chunk_limit 1m buffer_type file buffer_path /var/opt/microsoft/omsagent/state/out_oms_kemp*.buffer buffer_queue_limit 10 buffer_queue_full_action drop_oldest_chunk flush_interval 10s retry_limit 10 retry_wait 30s max_retry_wait 10m
as you can see there’s no out_oms match in the configuration, in fact since the tag for performance data starts with oms. it matches the default OMS agent configuration for standard data types, so it’s unnecessary.
The custom fluentd filter
Lastly let’s add the custom fluentd filter that creates the two different data payload based on the ident field. The filter is based on the standard syslog one the only notable part is when the record is parsed for stats (actually the message part is parsed to see if it is a stats message) and two different payload are created accordingly and set in a hash map called “polyResults”
regexp_stats=Regexp.new("(?.*)status:\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)") result=regexp_stats.match(record["message"]) if !result.nil? #let's try to transform this in a perf data point #for sure there's a better and more elegant way to do this in ruby, too bad I don't know it data_items = [] data_info = {} data_info["Timestamp"] = record["Timestamp"] data_info["Host"] = record["Host"] object_name = result["type"] data_info["ObjectName"] = "KempLM-#{object_name}" data_info["InstanceName"] = "_Total" counters=[] counter_pair = {} counter_pair["CounterName"]="Total" counter_pair["Value"]=Integer(result["total"]) rescue nil counters.push(counter_pair) counter_pair = {} counter_pair["CounterName"]="Up" counter_pair["Value"]=Integer(result["up"]) rescue nil counters.push(counter_pair) counter_pair = {} counter_pair["CounterName"]="Down" counter_pair["Value"]=Integer(result["down"]) rescue nil counters.push(counter_pair) counter_pair = {} counter_pair["CounterName"]="Disabled" counter_pair["Value"]=Integer(result["disabled"]) rescue nil counters.push(counter_pair) data_info["Collections"] = counters data_items.push(data_info) wrapper = { "DataType"=>"LINUX_PERF_BLOB", "IPName"=>"LogManagement", "DataItems"=>data_items } polyResult=wrapper else record["Message"] = record["message"] record.delete "message" polyResult = record end
As you can read the perf data type must have a standard schema (pseudo json schema from reverse engineering, take with cautions):
//pseudo schema for LINUX_PERF_BLOB DataItems { "Timestamp": {"type": "string"}, "Host": {"type": "string"}, "ObjectName": {"type": "string"}, "InstanceName": {"type": "string"}, "Collections": { "type": "array", "items": { "type": "object", "properties": { "CounterName": {"type": "string"}, "Value": {"type": "number"} } } } }
and here is the complete kemp filter
module Fluent class KempFilter < Filter Fluent::Plugin.register_filter('filter_kemp', self) def initialize super require 'socket' require_relative 'omslog' require_relative 'oms_common' end # Interval in seconds to refresh the cache config_param :ip_cache_refresh_interval, :integer, :default => 300 def configure(conf) super @ip_cache = OMS::IPcache.new @ip_cache_refresh_interval end def start super end def shutdown super end def filter(tag, time, record) # Use Time.now, because it is the only way to get subsecond precision in version 0.12. # The time may be slightly in the future from the ingestion time. record["Timestamp"] = OMS::Common.format_time(Time.now.to_f) record["EventTime"] = OMS::Common.format_time(time) hostname = record["host"] record["Host"] = hostname record.delete "host" record["HostIP"] = "Unknown IP" host_ip = @ip_cache.get_ip(hostname) if host_ip.nil? OMS::Log.warn_once("Failed to get the IP for #{hostname}.") else record["HostIP"] = host_ip end if record.has_key?("pid") record["ProcessId"] = record["pid"] record.delete "pid" end # The tag should looks like this : oms.qnd.Kemp.authpriv.notice tags = tag.split('.') if tags.size == 5 record["Facility"] = tags[3] record["Severity"] = tags[4] else $log.error "The syslog tag does not have 4 parts #{tag}" end regexp_stats=Regexp.new("(?.*)status:\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)\\D+(?\\d+)") result=regexp_stats.match(record["message"]) if !result.nil? #let's try to transform this in a perf data point #for sure there's a better and more elegant way to do this in ruby, too bad I don't know it data_items = [] data_info = {} data_info["Timestamp"] = record["Timestamp"] data_info["Host"] = record["Host"] object_name = result["type"] data_info["ObjectName"] = "KempLM-#{object_name}" data_info["InstanceName"] = "_Total" counters=[] counter_pair = {} counter_pair["CounterName"]="Total" counter_pair["Value"]=Integer(result["total"]) rescue nil counters.push(counter_pair) counter_pair = {} counter_pair["CounterName"]="Up" counter_pair["Value"]=Integer(result["up"]) rescue nil counters.push(counter_pair) counter_pair = {} counter_pair["CounterName"]="Down" counter_pair["Value"]=Integer(result["down"]) rescue nil counters.push(counter_pair) counter_pair = {} counter_pair["CounterName"]="Disabled" counter_pair["Value"]=Integer(result["disabled"]) rescue nil counters.push(counter_pair) data_info["Collections"] = counters data_items.push(data_info) wrapper = { "DataType"=>"LINUX_PERF_BLOB", "IPName"=>"LogManagement", "DataItems"=>data_items } polyResult=wrapper else record["Message"] = record["message"] record.delete "message" polyResult = record end polyResult end end end
what you get
Once configured the rsyslog and omsagent need to be restarted, configuration changes are not dynamic, for the data to start to be ingested.
-Daniele
This posting is provided “AS IS” with no warranties, and confers no rights