Zeek + Wazuh: Wie man JSON-Felder ohne Konflikte ingestiert – inklusive sauberer Full-Parse-Lösung über den Indexer-Pipeline

Zeek erzeugt strukturiertes JSON – und das ist großartig.
Aber wenn man diese Logs direkt über den Wazuh Agent ins System sendet, stößt man sehr schnell auf ein Problem:

Die Felder unter id.* kollidieren mit OpenSearch-Mappings

Beispiel-Zeek-Log:

{
  "ts": 1763876253.747463,
  "uid": "CFmVts3qVlvAlQL7Kf",
  "id.orig_h": "fe80::22ab:48ff:fe33:ec5b",
  "id.orig_p": 135,
  "id.resp_h": "ff02::1:ff7f:7ea1",
  "id.resp_p": 136,
  "proto": "icmp",
  "conn_state": "OTH",
  ...
}

OpenSearch sagt dann:

object mapping for [id] tried to parse field [orig_h] as type keyword, but found conflicting type …

Der Grund:

  • OpenSearch sieht id.orig_h
  • und interpretiert id als Objekt
  • aber Zeek liefert oft unterschiedliche Datentypen für id.*
  • Indexing-Kollision, Dokumente werden abgewiesen

Die erste Vermutung:
„Ich schreibe einen Custom Decoder und ersetze die Felder.“
Das funktioniert – aber nur teilweise:

die 4 Felder werden extrahiert
aber alle anderen JSON-Felder gehen verloren
es ist nicht skalierbar (jede Zeek-Typ-Logdatei hat andere Felder)

Der Wunsch ist klar:

„Ich will ALLE Felder ingestieren – wie bei Logstash – aber ohne Konflikte.“

Die Lösung ist deutlich eleganter als der Versuch, jedes Feld einzeln zu decodieren.

1. Warum Custom Decoders hier die falsche Lösung sind

Der Agent arbeitet so:

  • Sobald ein Decoder greift, werden nur die Felder behalten, die der Decoder extrahiert
  • Alle anderen JSON-Felder werden verworfen
  • Ein einzelner Regex für vollständige Zeek-Logs wird monströs und unwartbar
  • Zeek hat zig Logtypen (conn, http, dns, ssl, weird, intel…) – alle unterschiedlich

Kurz gesagt:

Decoder = keine saubere Lösung für vollständige JSON-Ingestion

2. Die richtige Lösung: Pipeline Processor im Wazuh Indexer

Und genau diese Lösung kam von Kevin Branch („BlueWolfNinja“):

„Add an ingest pipeline processor on the indexer.
That processor will rename the data.* namespace to zeek.*,
so no field conflict is possible.“

Wichtig zu verstehen:

  • Wazuh Agent sendet JSON grundsätzlich unter dem Objekt data
  • Das heißt: Zeek-Logs landen unter data.id.orig_h, data.proto, etc.
  • OpenSearch interpretiert data.id dann als Objekt → Mapping-Konflikt
  • Wenn wir data → zeek verschieben, ist alles sauber getrennt

Das Geniale daran:

KEIN Decoder nötig

komplette JSON-Struktur bleibt erhalten

ALLE Zeek-Logs funktionieren (conn.log, http.log, dns.log …)

KEIN Konflikt mehr wegen id.*

Die Lösung betrifft nur Zeek-Logs, dank „if“-Bedinung

3. Der entscheidende Pipeline-Processor

Der entscheidende Teil des Ingest-Pipeline-Snippets lautet:

{
  "script": {
    "description": "move data to zeek.",
    "lang": "painless",
    "source": "ctx.zeek = ctx.remove('data');",
    "if": "ctx.location != null && ctx.location.contains('/opt/zeek/logs/')"
  }
}

Was passiert?

  • Prüft: kommt das Log aus /opt/zeek/logs/?
  • Wenn ja → data wird in zeek verschoben
  • Das JSON bleibt 100 % vollständig
  • Die Zeek-Felder landen ab sofort alle unter zeek.*
  • Keine Kollision mit anderen Datenquellen

Beispiel:

Vorher:

data.id.orig_h
data.proto
data.community_id

Nachher:

zeek.id.orig_h
zeek.proto
zeek.community_id

4. Wo kommt der Processor hin?

Hasitha zeigt den vollständigen Weg:

Schritt 1 – Backup erstellen

cp /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json \
   /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json.bkp

Schritt 2 – Pipeline bearbeiten

Datei öffnen:

nano /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json

Nach diesem Block einfügen:

{
  "date_index_name": {
    "field": "timestamp",
    "date_rounding": "d",
    "index_name_prefix": "{{fields.index_prefix}}",
    "index_name_format": "yyyy.MM.dd"
  }
},
{
  "script": {
    "description": "move data to zeek.",
    "lang": "painless",
    "source": "ctx.zeek = ctx.remove('data');",
    "if": "ctx.location != null && ctx.location.contains('/opt/zeek/logs/')"
  }
},

Schritt 3 – Pipeline deployen & Filebeat neu starten

filebeat setup --pipelines
systemctl restart filebeat

Bei Docker:

docker restart <container>

5. Warum diese Lösung besser ist als jede Decoder-Lösung

ZielDecoderPipeline
komplette JSON-Struktur bewahren❌ schwierig/unmöglich✔ perfekt
Namespacetrennung❌ nein✔ vollständige Trennung (zeek.*)
Multi-Type Zeek-Logs❌ jede Datei braucht eigenen Decoder✔ alle Zeek-Typen funktionieren sofort
Mapping-Konflikte verhindern❌ schwer✔ 100 % gelöst
Skalierbarkeit❌ niedrig✔ sehr hoch

Diese Lösung imitiert das Verhalten von:

  • Logstash
  • FluentBit
  • syslog-ng JSON pipelines

Aber komplett innerhalb des Wazuh-Ökosystems.

6. Fazit

Der Thread zeigt eindrucksvoll:

Wazuh-Agent + Zeek-Logs ist problemlos möglich

komplette JSON-Felder können ingestiert werden

ohne jeden Mapping-Konflikt

ohne Custom Decoder

ohne Logstash, ohne externe Tools

Der Schlüssel ist die:

Trennung des JSON-Namespace auf Indexerebene (data → zeek)

Durch den Painless-Script Processor erhalten alle Zeek-Logs ihren eigenen Feldraum – und nichts kollidiert mehr.

Das ist die sauberste, wartungsärmste und professionellste Architektur für Zeek+Wazuh.

https://wazuh.slack.com/archives/C0A933R8E/p1763879508504799

Mehr zu Wazuh …

Mehr zum Wazuh Ambassador Program …