Implementing Filters
The Parsing Lifecycle
Ingestion: A log enters the system (via Input Plugin) with basic metadata:
dataSource,dataType, andtenantId.Draft Creation: The engine creates a Draft object. The original log is stored in the
rawfield.Pipeline Matching: The engine iterates through the
pipelineconfiguration.Stages are evaluated in order.
A stage executes if the log's
dataTypeis included in the stage'sdataTypesarray.Multiple Matches: A log can match and run through multiple stages if they all contain its
dataType.
Step Execution: Within a stage, steps run sequentially. Each step modifies the Draft's internal JSON string.
Finalization: Once all matching stages finish, the final Draft is converted into a structured Event and sent to the Analysis stage.
Pipeline Architecture
A well-designed pipeline follows these four phases:
phase 1: extraction
Use json, csv, kv, or grok to pull data out of the raw string.
- json:
source: rawphase 2: normalization
Map extracted fields to the Standard Event Schema.
- rename:
from: [log.source_ip, log.src]
to: origin.ipphase 3: enrichment & logic
Add context or transform data using dynamic plugins or reformat.
- dynamic:
plugin: com.utmstack.geolocation
params: { source: origin.ip, destination: origin.geolocation }
- cast:
fields: [origin.port]
to: intphase 4: cleanup
Remove temporary fields to optimize storage and indexing.
- delete:
fields: [log.temp_id, log.unnecessary_meta]The raw field is protected for auditing purposes and cannot be deleted.
Technical Details
Conditional Execution (where)
Every step can be made conditional using CEL expressions. If the expression returns false, the step is skipped. For a full list of available comparison functions, see the CEL Overloads Guide.
Example: Only run a grok if a certain field exists.
- grok:
source: log.message
patterns: [...]
where: 'exists("log.message")'Stopping Processing (drop)
If you encounter "noise" (logs that shouldn't be stored or analyzed), use the drop step. It immediately stops the pipeline and discards the log.
- drop:
where: 'contains(raw, "HealthCheck")'Annotated Full Example
This example processes a firewall log that arrives as a Key-Value string.
pipeline:
- dataTypes: [firewall-fortigate-traffic]
steps:
# 1. Extract KV pairs (automatically goes into log.*)
- kv:
source: raw
fieldSplit: " "
valueSplit: "="
# 2. Normalize to standard schema
- rename:
from: [log.src, log.s_ip]
to: origin.ip
- rename:
from: [log.dst, log.d_ip]
to: target.ip
# 3. Handle data types for better indexing
- cast:
fields: [origin.port, target.port]
to: int
# 4. Logical enrichment: Determine action
- add:
function: string
params: { key: action, value: denied }
where: 'equals("log.policy", "block")'
# 5. Cleanup temporary metadata
- delete:
fields: [log.policy]For a full list of step parameters, see the Filter Steps Reference.