EventProcessor Plugin Architecture
The EventProcessor is designed for high extensibility through a micro-plugin architecture. By implementing custom plugins, you can extend the system's capabilities in log ingestion, parsing, analysis, and notifications.
Architecture Highlights
Communication: gRPC over Unix Domain Sockets.
Location: Sockets are automatically managed in
{{WorkDir}}/sockets/.Language: While gRPC supports many languages, we highly recommend Go and our official go-sdk.
Plugin Types
| Type | SDK Helper | Interaction Pattern |
|---|---|---|
| Parsing | InitParsingPlugin | Transforms logs within a pipeline (unary RPC). |
| Analysis | InitAnalysisPlugin | Evaluates events to emit alerts (server streaming). |
| Input | SendLogsFromChannel | Ingests data from external sources and pushes to the engine. |
| Correlation | InitCorrelationPlugin | Processes alerts to find high-level patterns. |
| Notification | InitNotificationPlugin | Delivers messages to external endpoints (Email, Slack, etc). |
🛠️ Quick Start (Go)
1. Initialize your project
mkdir utmstack-plugin-custom && cd utmstack-plugin-custom
go mod init github.com/user/utmstack-plugin-custom
go get github.com/threatwinds/go-sdk2. Implement a Parsing Plugin
Parsing plugins are used for specialized logic that cannot be achieved with standard steps (Grok, JSON, etc.).
package main
import (
"context"
"github.com/threatwinds/go-sdk/plugins"
"github.com/tidwall/sjson"
)
func main() {
// "my-enricher" results in socket: {{WorkDir}}/sockets/my-enricher_parsing.sock
plugins.InitParsingPlugin("my-enricher", parseLog)
}
func parseLog(ctx context.Context, tr *plugins.Transform) (*plugins.Draft, error) {
// Add custom field to the log
newLog, _ := sjson.Set(tr.Draft.Log, "log.custom_status", "processed_by_plugin")
tr.Draft.Log = newLog
return tr.Draft, nil
}3. Implement an Input Plugin
Input plugins act as producers. They don't wait for the engine to call them; they push data to the engine.
package main
import (
"github.com/google/uuid"
"github.com/threatwinds/go-sdk/plugins"
"time"
)
func main() {
pluginName := "my-data-source"
// Start the background sender
go plugins.SendLogsFromChannel(pluginName)
for {
// Simulate fetching data
plugins.EnqueueLog(&plugins.Log{
Id: uuid.NewString(),
DataType: "custom_data",
DataSource: "external_api",
Timestamp: time.Now().Format(time.RFC3339),
Raw: `{"message": "hello world"}`,
}, pluginName)
time.Sleep(10 * time.Second)
}
}🚀 Deployment & Configuration
Build:
go build -o plugin-binaryNaming: The plugin name passed to
InitParsingPlugin("name")must match the configuration in your YAML.YAML Integration:
pipeline: - dataTypes: [custom_data] steps: - dynamic: plugin: my-enricher params: { key: value }
Best Practices
Use the Catcher: Always use
github.com/threatwinds/go-sdk/catcherfor error reporting to ensure logs are standardized.Reserved Fields: Ensure your parsing logic maps data to the common schema (
origin.*,target.*,action). Avoid creating top-level fields outside oflog.*unless they are part of the core schema.Graceful Shutdown: The SDK's
Init...functions handle SIGINT and SIGTERM automatically.Config Management: Use
plugins.PluginCfg("plugin_name", true)to fetch configuration variables from the UTMStack environment.