Skip to content

Implementing Filters

Osmany Montero edited this page Jan 19, 2026 · 5 revisions

The Parsing Lifecycle

  1. Ingestion: A log enters the system (via Input Plugin) with basic metadata: dataSource, dataType, and tenantId.
  2. Draft Creation: The engine creates a Draft object. The original log is stored in the raw field.
  3. Pipeline Matching: The engine iterates through the pipeline configuration.
    • Stages are evaluated in order.
    • A stage executes if the log's dataType is included in the stage's dataTypes array.
    • Multiple Matches: A log can match and run through multiple stages if they all contain its dataType.
  4. Step Execution: Within a stage, steps run sequentially. Each step modifies the Draft's internal JSON string.
  5. Finalization: Once all matching stages finish, the final Draft is converted into a structured Event and sent to the Analysis stage.

Pipeline Architecture

A well-designed pipeline follows these four phases:

phase 1: extraction

Use json, csv, kv, or grok to pull data out of the raw string.

- json:
    source: raw

phase 2: normalization

Map extracted fields to the Standard Event Schema.

- rename:
    from: [log.source_ip, log.src]
    to: origin.ip

phase 3: enrichment & logic

Add context or transform data using dynamic plugins or reformat.

- dynamic:
    plugin: com.utmstack.geolocation
    params: { source: origin.ip, destination: origin.geolocation }
- cast:
    fields: [origin.port]
    to: int

phase 4: cleanup

Remove temporary fields and the raw log to save storage.

- delete:
    fields: [raw, log.temp_id]

Technical Details

Conditional Execution (where)

Every step can be made conditional using CEL expressions. If the expression returns false, the step is skipped.

  • Example: Only run a grok if a certain field exists.
    - grok:
        source: log.message
        patterns: [...]
        where: 'exists("log.message")'

Stopping Processing (drop)

If you encounter "noise" (logs that shouldn't be stored or analyzed), use the drop step. It immediately stops the pipeline and discards the log.

- drop:
    where: 'contains(raw, "HealthCheck")'

Annotated Full Example

This example processes a firewall log that arrives as a Key-Value string.

pipeline:
  - dataTypes: [firewall-fortigate-traffic]
    steps:
      # 1. Extract KV pairs (automatically goes into log.*)
      - kv:
          source: raw
          fieldSplit: " "
          valueSplit: "="

      # 2. Normalize to standard schema
      - rename:
          from: [log.src, log.s_ip]
          to: origin.ip
      - rename:
          from: [log.dst, log.d_ip]
          to: target.ip

      # 3. Handle data types for better indexing
      - cast:
          fields: [origin.port, target.port]
          to: int

      # 4. Logical enrichment: Determine action
      - add:
          function: string
          params: { key: action, value: denied }
          where: 'equals("log.policy", "block")'

      # 5. Cleanup
      - delete:
          fields: [raw]

For a full list of step parameters, see the Filter Steps Reference.

Clone this wiki locally