EventProcessor Plugin Architecture

The EventProcessor is designed for high extensibility through a micro-plugin architecture. By implementing custom plugins, you can extend the system's capabilities in log ingestion, parsing, analysis, and notifications.

Architecture Highlights

  • Communication: gRPC over Unix Domain Sockets.

  • Location: Sockets are automatically managed in {{WorkDir}}/sockets/.

  • Language: While gRPC supports many languages, we highly recommend Go and our official go-sdk.

Plugin Types

TypeSDK HelperInteraction Pattern
ParsingInitParsingPluginTransforms logs within a pipeline (unary RPC).
AnalysisInitAnalysisPluginEvaluates events to emit alerts (server streaming).
InputSendLogsFromChannelIngests data from external sources and pushes to the engine.
CorrelationInitCorrelationPluginProcesses alerts to find high-level patterns.
NotificationInitNotificationPluginDelivers messages to external endpoints (Email, Slack, etc).

🛠️ Quick Start (Go)

1. Initialize your project

mkdir utmstack-plugin-custom && cd utmstack-plugin-custom
go mod init github.com/user/utmstack-plugin-custom
go get github.com/threatwinds/go-sdk

2. Implement a Parsing Plugin

Parsing plugins are used for specialized logic that cannot be achieved with standard steps (Grok, JSON, etc.).

package main

import (
	"context"
	"github.com/threatwinds/go-sdk/plugins"
	"github.com/tidwall/sjson"
)

func main() {
	// "my-enricher" results in socket: {{WorkDir}}/sockets/my-enricher_parsing.sock
	plugins.InitParsingPlugin("my-enricher", parseLog)
}

func parseLog(ctx context.Context, tr *plugins.Transform) (*plugins.Draft, error) {
	// Add custom field to the log
	newLog, _ := sjson.Set(tr.Draft.Log, "log.custom_status", "processed_by_plugin")
	tr.Draft.Log = newLog
	
	return tr.Draft, nil
}

3. Implement an Input Plugin

Input plugins act as producers. They don't wait for the engine to call them; they push data to the engine.

package main

import (
	"github.com/google/uuid"
	"github.com/threatwinds/go-sdk/plugins"
	"time"
)

func main() {
	pluginName := "my-data-source"
	
	// Start the background sender
	go plugins.SendLogsFromChannel(pluginName)

	for {
		// Simulate fetching data
		plugins.EnqueueLog(&plugins.Log{
			Id:         uuid.NewString(),
			DataType:   "custom_data",
			DataSource: "external_api",
			Timestamp:  time.Now().Format(time.RFC3339),
			Raw:        `{"message": "hello world"}`,
		}, pluginName)
		
		time.Sleep(10 * time.Second)
	}
}

🚀 Deployment & Configuration

  1. Build: go build -o plugin-binary

  2. Naming: The plugin name passed to InitParsingPlugin("name") must match the configuration in your YAML.

  3. YAML Integration:

    pipeline:
      - dataTypes: [custom_data]
        steps:
          - dynamic:
              plugin: my-enricher
              params: { key: value }

Best Practices

  • Use the Catcher: Always use github.com/threatwinds/go-sdk/catcher for error reporting to ensure logs are standardized.

  • Reserved Fields: Ensure your parsing logic maps data to the common schema (origin.*, target.*, action). Avoid creating top-level fields outside of log.* unless they are part of the core schema.

  • Graceful Shutdown: The SDK's Init... functions handle SIGINT and SIGTERM automatically.

  • Config Management: Use plugins.PluginCfg("plugin_name", true) to fetch configuration variables from the UTMStack environment.