How to create filters in data parsing?

Data parsing is a process in which a data string is converted from one format to another. A data parse program converts unstructured data into JSON, CSV, and other file formats, adding structure to said information. Grok is a great way to parse unstructured log data into something structured and queryable. This tool is perfect for Syslog logs, apache and other webserver logs, MySQL logs, and any log format generally written for humans, not computer consumption.

UTMStack allows you to customize filters in data parsing:

  1. 1.
    Click on the hamburger positioned in the upper right part of the dashboard. Select the option Data Parsing.
  1. 1.
    In this panel, you will see a list of the filters implemented by UTMStack according to your user; each can be modified or deleted using the options
    page1image26067568
    at the right; optionally, you can search for a filter by name. Click on the button Add filter to create your own.
UTMStack uses filter plugins from Logstash to process a log line, map specific parts of the cable into dedicated fields, and perform actions based on this mapping. OBasically, the syntax of Grok pattern is %{SYNTAX:SEMANTIC}.
The SYNTAX represents the pattern in the text of each log. The SEMANTIC is the identifier that you give to the piece of text in your parsed logs. Put simply, the Grok pattern represents %{PATTERN:FieldName}.
For example, a pattern like 10.0.0.1 will match the Grok IP pattern.
filter {
#Some filter operations like grok, geoip, json, kv, etc.
}
page2image26156768
As you can see in the picture above, you must provide a filter name; then, your code goes into the filter definition area. At the bottom of the editor, you can see the options to cancel your changes or save them. After applying a filter, you will see it in the Data Parsing panel.

UTMStack needs that the filters include the following output fields so that the correlation engine can analyze data.

  • @timestamp
The "@timestamp" is a JSON representation of the date and time where the log was entered to UTMStack. Its structure follows the pattern YYYY-MM-DDTHH:MM:SS.MsMsMsZ, which means "2017-01-18T11:41:28.753Z". You need to use Grok to extract that date string into a new field called timestamp.
  • dataType
This field indicates what kind of log you are processing: AWS log, Mac OS log, o365 log, Windows Event log, iis log, NIDS, VMware ESXi, file beat module log, etc. For naming conventions, the value of this field has to be the lower case; and can only have letters, numbers, hyphens, and underscore; because UTMStack uses this value to create the index on elastic to each filter created.
  • dataSource
This field represents the source where the log was generated. Usually, the value of this field is an IP address, hostname, or user-defined string.
  • [logx][utm][action]
This field has to be in the output only to indicate when a connection was established between a source and a destination according to the log, and its value must be "Success."

Let's assume that we need to process an arbitrary log lines as follows:

<166>2022-06-07T23:50:09.785Z 10.26.2.1 fields[port="80" sendpkg="1224" rcvpkg="312" con_status="Success"] msg=Connection success on port 80
<14>2022-06-07T23:50:00.573Z 10.26.2.8 fields[port="8000" sendpkg="2236" rcvpkg="4589" con_status="Deny"] msg=Authentication failed
<14>2022-06-07T23:50:00.573Z 10.26.2.12 fields[sendpkg="3523" rcvpkg="2153" con_status="Success"] msg=Authentication phase success
<166>2022-06-07T23:53:20.185Z mymachine fields[port="3000" sendpkg="1342" rcvpkg="0" con_status="Fail"] msg=Connection with the server failed, the server is unavailable
<14>2022-06-07T23:50:00.439Z mymachine fields[port="8080" sendpkg="0" con_status="Fail"] msg=Internal error
As you can see some of the log lines have some values missing between fields[] part, so we have to process fields[] content in a dynamic way, to do that we can use KV filter. To detect established connections we will use the con_status="Success" condition. Let's put it all together in a filter, check that we create dataSource, dataType and action fields as we explained before:
filter {
#......................................................................#
#Using grok to parse the input message (log line)
grok {
match => {
"message" => [
"<%{NUMBER:priority}>%{TIMESTAMP_ISO8601:datetime} %{IPORHOST:log_origin} fields\[%{GREEDYDATA:all_fields}\] msg=%{GREEDYDATA:msg}"
]
}
}
#......................................................................#
#Using the kv filter with default config, usefull in key-value logs
if [all_fields] {
kv {
source => "all_fields"
allow_duplicate_values => false
target => "kv_field"
}
}
#......................................................................#
#Generating dataSource field required by CurrelationRulesEngine
mutate {
add_field => { "dataSource" => "%{log_origin}" }
}
#......................................................................#
#Generating dataType field required by CurrelationRulesEngine
mutate {
add_field => {
"dataType" => "example-filter"
}
}
#......................................................................#
#Rename fields to set the final structure used by utmstack
mutate {
rename => { "priority" => "[logx][example][priority]" }
rename => { "[kv_field][port]" => "[logx][example][port]" }
rename => { "[kv_field][sendpkg]" => "[logx][example][sendpkg]" }
rename => { "[kv_field][rcvpkg]" => "[logx][example][rcvpkg]" }
rename => { "[kv_field][con_status]" => "[logx][example][con_status]" }
rename => { "msg" => "[logx][example][msg]" }
rename => { "datetime" => "[logx][example][datetime]" }
}
#......................................................................#
#Detecting established connections
if [logx][example][con_status] and [logx][example][con_status] == "Success"{
mutate {
add_field => { "[logx][utm][action]" => "Success" }
}
}
#......................................................................#
#Finally, remove unnecessary fields
mutate {
remove_field => ["@version","path","all_fields","tags","kv_field","log_origin","message"]
}
}
Example.conf
2KB
Binary
Download this example config
After applying the filter, these are the important output fields, may be a few more fields appear added during the process:
"host": "HOST-PC"
"sendpkg": "2236"
"priority": "14"
"rcvpkg": "4589"
"datetime": "2022-06-07T23:50:00.573Z"
"port": "8000"
"con_status": "Deny"
"msg": "Authentication failed\r"
"dataSource": "10.26.2.8"
"@timestamp": "2023-05-03T21:46:14.552Z"
"dataType": "example-filter"
See the output in json format:
ExampleOutput.json
2KB
Code