Is it possible to add Geo IP information automatically to my events even if it’s not present in the original log? How can I automatically decode a URL to dissect all its components? How can I convert a human readable byte value (e.g., 1KB) to its value in bytes (e.g., 1024) so I can use it on a Kibana dashboard?
One answer that everyone knows is: Logstash Filters. But there’s another very smart option available from NetEye 4.9 that’s not used very often: the Ingest Pipeline. This feature offers the possibility to enrich or modify events automatically before they are indexed in Elasticsearch.
{
"description" : "...",
"processors" : [ ... ]
}
The description is a special field to store a helpful message about what the pipeline does.
The processors parameter defines a list of processors to be executed in order.
An ingest pipeline changes documents before they are actually indexed. You can think of an ingest pipeline as an assembly line made up of a series of workers, called processors. Each processor makes specific changes, such as lowercasing field values, to incoming documents before moving on to the next one. When all the processors in a pipeline are done, the completed document is added to the target index.
Let’s go ahead and make our pipelines:
The geoip processor adds information about the geographical location of IP addresses based on data from the Maxmind GeoLite2 City Database. Because the processor uses a geoIP database that’s installed on Elasticsearch, you don’t need to install your own geoIP database on machines running Logstash.
PUT _ingest/pipeline/geoip-info
{
"description": "Add geoip info",
"processors": [
{
"geoip": {
"field": "client.ip",
"target_field": "client.geo",
"ignore_missing": true
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "source.geo",
"ignore_missing": true
}
},
{
"geoip": {
"field": "destination.ip",
"target_field": "destination.geo",
"ignore_missing": true
}
},
{
"geoip": {
"field": "server.ip",
"target_field": "server.geo",
"ignore_missing": true
}
},
{
"geoip": {
"field": "host.ip",
"target_field": "host.geo",
"ignore_missing": true
}
}
]
}
...
elasticsearch {
hosts => ["elasticsearch.neteyelocal:9200"]
index => "my-index"
pipeline => "geoip-info"
codec => "plain"
...
Note: The Elastic GeoLite DB used automatically by the processor must be set up to have the geo point up to date, even if the Geo IP information is still the same. See the Max Mind Site.
The user_agent
processor extracts details from the user agent string that a browser sends with its web requests. This processor adds that information by default under the user_agent
field.
Sample : 10.10.3.110 - - [01/Oct/2020:15:27:51 +0200] "GET /neteye/layout/menu?url=%2Fneteye%2Fdoc%2Fmodule HTTP/1.1" 200 11665 "https://neteye4.wp.lan/neteye/doc/module/update/chapter/update-general-update" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:81.0) Gecko/20100101 Firefox/81.0"
PUT _ingest/pipeline/user-agent-apache2
{
"description": "Extract details from the User Agent on Apache2 Logs",
"processors": [
{
"user_agent": {
"field": "user_agent.original",
"target_field": "user_agent",
"ignore_missing": true
}
}
]
}
Result on Elastic Index:
...
"user_agent": {
"name": "Firefox",
"original": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:81.0) Gecko/20100101 Firefox/81.0",
"version": "81.0",
"os": {
"name": "Windows NT",
"version": "10.0",
"full": "Windows NT 10.0"
},
"device" : {
"name" : "Other"
},
...
The bytes processor converts a human-readable byte value (e.g., 1KB) to its value in bytes (e.g., 1024). If the field is an array of strings, all members of the array will be converted.
PUT _ingest/pipeline/bytes-convert
{
"description": "Converts Bytes filed from human to its value in bytes.",
"processors": [
{
"bytes": {
"field": "file.size",
"target_field": "file.size",
"ignore_missing": true
}
}
]
}
The solution: use a main pipeline, with various nested pipelines to enrich all logs based on conditionals
PUT _ingest/pipeline/main-pipeline
{
"description": "Exec processor in sequence.",
"processors": [
{
"pipeline": {
"name": "geoip-info"
}
},
{
"pipeline": {
"if": "ctx.event?.module == 'apache'",
"name": "user-agent-apache2"
}
},
{
"pipeline": {
"name": "bytes-convert"
}
}
3. Add the main pipeline to Logstash
...
elasticsearch {
hosts => ["elasticsearch.neteyelocal:9200"]
index => "my-index"
pipeline => "main-pipeline"
codec => "plain"
...
The pipeline and processors are very powerful and versatile instruments that can help enrich logs ingested on NetEye SIEM more easily. And if you don’t find what you need in the out-of-the-box processors made by Elastic, you can even directly write a Painless script! (see script processor)
Useful links:
Elastic Ref: Ingest Processors
Other pipelines use: Firewall Log Collection