Problem
In deinem Event wird transaction.messages in Phase 2 zwar als Feld erkannt, aber der Inhalt wird nicht weiter „aufgefächert“ (also keine transaction.messages[0].details.ruleId usw.).
Der Grund ist simpel: Für Wazuh ist transaction.messages ein einzelner Feldwert (String) – selbst wenn er „wie JSON“ aussieht. In deinem Decode-Output sieht man das auch:
transaction.messages: '[{...},{...}]'
Das ist kein JSON-Array-Typ, sondern ein String, der zufällig JSON enthält. Der Wazuh-JSON-Decoder dekodiert nur die oberste Ebene und führt keine rekursive JSON-Parsing-Logik auf String-Feldern aus.
Warum ein Custom-Decoder hier nicht sinnvoll ist
Wazuh-Decoders/Rules sind nicht dafür gedacht, verschachtelte JSON-Strings beliebiger Tiefe in einzelne Felder zu zerlegen (vor allem wenn das Array mehrere Einträge enthält und riesig wird). Das wird schnell unwartbar und teuer (Performance).
Best Practice: Logs vor dem Einlesen normalisieren
Wenn du die ModSecurity-Logs per localfile einliest, ist der sauberste Weg:
- Vorverarbeitung: JSON aus dem Log holen
transaction.messages(String) → echtes Array/Objekt konvertieren- Optional: flatten in
messages_flat.*Felder - Ergebnis als NDJSON (eine JSON pro Zeile) in neue Datei schreiben
- Wazuh-Agent überwacht die neue Datei als
log_format=json
Genau das hat der Community-Reply vorgeschlagen.
Beispiel-Lösung (NDJSON Normalizer)
Der folgende Ansatz macht aus:
transaction.messagesString →transaction.messages_flatObjekt, inkl.count+msg_0,msg_1…
Python-Script (Beispiel): (wie im Thread beschrieben – Tail/Follower + Flattening)
Schreibt normalisierte Events nach: /var/log/test1.json
Danach Agent-Konfig:
<localfile>
<log_format>json</log_format>
<location>/var/log/test1.json</location>
</localfile>
Und neu starten:
systemctl restart wazuh-agent
Alternative (wenn du NICHT flatten willst)
Wenn du lieber die Struktur behältst, kannst du statt messages_flat einfach:
transaction.messagesals echtes Arraytransaction.messagessetzen (nicht entfernen)- dann kannst du in OpenSearch/Wazuh-Indexer besser darauf filtern/visualisieren
Wichtig bleibt: String → echtes JSON Array vor dem Einlesen.
Hinweis zu Größen & Performance
transaction.messages kann sehr groß werden (CRS liefert extrem viel). Flattening kann Index-Mappings aufblasen.
Praktische Empfehlungen:
- Nur wichtige Keys extrahieren (z. B.
ruleId,severity,message,tags) - Rest optional als Raw-Blob lassen
- Output-Log rotieren (logrotate), sonst wächst
/var/log/test1.jsonendlos
Kurzfazit
Du kannst transaction.messages nicht „weiter dekodieren“, solange es als String ankommt.
Die robuste Lösung ist Vor-Normalisierung (Script/Logstash/Pipeline), damit Wazuh wieder „echtes JSON“ bekommt und die Felder sauber indexiert werden.
#!/usr/bin/env python3
import os
import re
import json
import time
from typing import Optional, Dict, Any
IN_LOG = "/var/log/apache2/error.log"
OUT_LOG = "/var/log/test1.json"
# Find a JSON object inside text: from the first '{' to the last '}'.
# Works for "apache prefix ... {JSON} ...".
JSON_CANDIDATE_RE = re.compile(r"(\{.*\})", re.DOTALL)
def safe_json_loads(s: str) -> Optional[Dict[str, Any]]:
try:
obj = json.loads(s)
return obj if isinstance(obj, dict) else None
except Exception:
return None
def flatten_transaction_messages(obj: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert:
transaction.messages = [ {...}, {...} ]
Into:
transaction.messages_flat = { count: N, msg_0: {...}, msg_1: {...} }
And remove transaction.messages.
"""
tx = obj.get("transaction")
if not isinstance(tx, dict):
return obj
msgs = tx.get("messages")
# Sometimes messages is a JSON string: "[{...},{...}]"
if isinstance(msgs, str):
parsed = safe_json_loads(msgs)
if parsed is None:
# If it's a string but not JSON, keep it as-is
return obj
msgs = parsed
if not isinstance(msgs, list):
return obj
flat = {"count": len(msgs)}
for i, m in enumerate(msgs):
if isinstance(m, dict):
flat[f"msg_{i}"] = m
else:
flat[f"msg_{i}"] = {"value": m}
tx["messages_flat"] = flat
tx.pop("messages", None) # remove the array completely
obj["transaction"] = tx
return obj
def write_ndjson(obj: Dict[str, Any]) -> None:
line = json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
with open(OUT_LOG, "a", encoding="utf-8") as f:
f.write(line + "\n")
def follow_file(path: str, sleep_s: float = 0.2) -> None:
"""
Tail -F behavior:
- reads only new lines appended
- detects log rotation via inode changes
- buffers lines to support multi-line JSON
"""
fp = None
inode = None
buf = ""
while True:
try:
st = os.stat(path)
except FileNotFoundError:
time.sleep(sleep_s)
continue
# (Re)open on first run or rotation
if fp is None or inode != st.st_ino:
if fp:
try:
fp.close()
except Exception:
pass
fp = open(path, "r", encoding="utf-8", errors="replace")
inode = st.st_ino
# Start at end (don't re-process old content)
fp.seek(0, os.SEEK_END)
line = fp.readline()
if not line:
time.sleep(sleep_s)
continue
# Buffer in case JSON spans multiple lines
buf += line
m = JSON_CANDIDATE_RE.search(buf)
if not m:
# Prevent buffer from growing forever on non-JSON lines
if len(buf) > 50000:
buf = ""
continue
candidate = m.group(1)
obj = safe_json_loads(candidate)
if obj is None:
# Probably incomplete JSON; keep buffering a bit
if len(buf) > 500000:
buf = ""
continue
obj = flatten_transaction_messages(obj)
write_ndjson(obj)
# Reset buffer after successful parse
buf = ""
def main() -> None:
os.makedirs(os.path.dirname(OUT_LOG), exist_ok=True)
open(OUT_LOG, "a", encoding="utf-8").close()
follow_file(IN_LOG)
if __name__ == "__main__":
main()
https://wazuh.slack.com/archives/C0A933R8E/p1765735456676599