Zeek erzeugt strukturiertes JSON – und das ist großartig.
Aber wenn man diese Logs direkt über den Wazuh Agent ins System sendet, stößt man sehr schnell auf ein Problem:
Die Felder unter id.* kollidieren mit OpenSearch-Mappings
Beispiel-Zeek-Log:
{
"ts": 1763876253.747463,
"uid": "CFmVts3qVlvAlQL7Kf",
"id.orig_h": "fe80::22ab:48ff:fe33:ec5b",
"id.orig_p": 135,
"id.resp_h": "ff02::1:ff7f:7ea1",
"id.resp_p": 136,
"proto": "icmp",
"conn_state": "OTH",
...
}
OpenSearch sagt dann:
object mapping for [id] tried to parse field [orig_h] as type keyword, but found conflicting type …
Der Grund:
- OpenSearch sieht
id.orig_h - und interpretiert
idals Objekt - aber Zeek liefert oft unterschiedliche Datentypen für
id.* - → Indexing-Kollision, Dokumente werden abgewiesen
Die erste Vermutung:
„Ich schreibe einen Custom Decoder und ersetze die Felder.“
Das funktioniert – aber nur teilweise:
die 4 Felder werden extrahiert
aber alle anderen JSON-Felder gehen verloren
es ist nicht skalierbar (jede Zeek-Typ-Logdatei hat andere Felder)
Der Wunsch ist klar:
„Ich will ALLE Felder ingestieren – wie bei Logstash – aber ohne Konflikte.“
Die Lösung ist deutlich eleganter als der Versuch, jedes Feld einzeln zu decodieren.
1. Warum Custom Decoders hier die falsche Lösung sind
Der Agent arbeitet so:
- Sobald ein Decoder greift, werden nur die Felder behalten, die der Decoder extrahiert
- Alle anderen JSON-Felder werden verworfen
- Ein einzelner Regex für vollständige Zeek-Logs wird monströs und unwartbar
- Zeek hat zig Logtypen (conn, http, dns, ssl, weird, intel…) – alle unterschiedlich
Kurz gesagt:
Decoder = keine saubere Lösung für vollständige JSON-Ingestion
2. Die richtige Lösung: Pipeline Processor im Wazuh Indexer
Und genau diese Lösung kam von Kevin Branch („BlueWolfNinja“):
„Add an ingest pipeline processor on the indexer.
That processor will rename thedata.*namespace tozeek.*,
so no field conflict is possible.“
Wichtig zu verstehen:
- Wazuh Agent sendet JSON grundsätzlich unter dem Objekt
data - Das heißt: Zeek-Logs landen unter
data.id.orig_h,data.proto, etc. - OpenSearch interpretiert
data.iddann als Objekt → Mapping-Konflikt - Wenn wir
data → zeekverschieben, ist alles sauber getrennt
Das Geniale daran:
KEIN Decoder nötig
komplette JSON-Struktur bleibt erhalten
ALLE Zeek-Logs funktionieren (conn.log, http.log, dns.log …)
KEIN Konflikt mehr wegen id.*
Die Lösung betrifft nur Zeek-Logs, dank „if“-Bedinung
3. Der entscheidende Pipeline-Processor
Der entscheidende Teil des Ingest-Pipeline-Snippets lautet:
{
"script": {
"description": "move data to zeek.",
"lang": "painless",
"source": "ctx.zeek = ctx.remove('data');",
"if": "ctx.location != null && ctx.location.contains('/opt/zeek/logs/')"
}
}
Was passiert?
- Prüft: kommt das Log aus
/opt/zeek/logs/? - Wenn ja →
datawird inzeekverschoben - Das JSON bleibt 100 % vollständig
- Die Zeek-Felder landen ab sofort alle unter
zeek.* - Keine Kollision mit anderen Datenquellen
Beispiel:
Vorher:
data.id.orig_h
data.proto
data.community_id
Nachher:
zeek.id.orig_h
zeek.proto
zeek.community_id
4. Wo kommt der Processor hin?
Hasitha zeigt den vollständigen Weg:
Schritt 1 – Backup erstellen
cp /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json \
/usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json.bkp
Schritt 2 – Pipeline bearbeiten
Datei öffnen:
nano /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json
Nach diesem Block einfügen:
{
"date_index_name": {
"field": "timestamp",
"date_rounding": "d",
"index_name_prefix": "{{fields.index_prefix}}",
"index_name_format": "yyyy.MM.dd"
}
},
{
"script": {
"description": "move data to zeek.",
"lang": "painless",
"source": "ctx.zeek = ctx.remove('data');",
"if": "ctx.location != null && ctx.location.contains('/opt/zeek/logs/')"
}
},
Schritt 3 – Pipeline deployen & Filebeat neu starten
filebeat setup --pipelines
systemctl restart filebeat
Bei Docker:
docker restart <container>
5. Warum diese Lösung besser ist als jede Decoder-Lösung
| Ziel | Decoder | Pipeline |
|---|---|---|
| komplette JSON-Struktur bewahren | ❌ schwierig/unmöglich | ✔ perfekt |
| Namespacetrennung | ❌ nein | ✔ vollständige Trennung (zeek.*) |
| Multi-Type Zeek-Logs | ❌ jede Datei braucht eigenen Decoder | ✔ alle Zeek-Typen funktionieren sofort |
| Mapping-Konflikte verhindern | ❌ schwer | ✔ 100 % gelöst |
| Skalierbarkeit | ❌ niedrig | ✔ sehr hoch |
Diese Lösung imitiert das Verhalten von:
- Logstash
- FluentBit
- syslog-ng JSON pipelines
Aber komplett innerhalb des Wazuh-Ökosystems.
6. Fazit
Der Thread zeigt eindrucksvoll:
Wazuh-Agent + Zeek-Logs ist problemlos möglich
komplette JSON-Felder können ingestiert werden
ohne jeden Mapping-Konflikt
ohne Custom Decoder
ohne Logstash, ohne externe Tools
Der Schlüssel ist die:
Trennung des JSON-Namespace auf Indexerebene (data → zeek)
Durch den Painless-Script Processor erhalten alle Zeek-Logs ihren eigenen Feldraum – und nichts kollidiert mehr.
Das ist die sauberste, wartungsärmste und professionellste Architektur für Zeek+Wazuh.
https://wazuh.slack.com/archives/C0A933R8E/p1763879508504799