Wazuh/ModSecurity JSON: Warum transaction.messages nicht weiter dekodiert wird – und wie du es sauber löst

Problem

In deinem Event wird transaction.messages in Phase 2 zwar als Feld erkannt, aber der Inhalt wird nicht weiter „aufgefächert“ (also keine transaction.messages[0].details.ruleId usw.).

Der Grund ist simpel: Für Wazuh ist transaction.messages ein einzelner Feldwert (String) – selbst wenn er „wie JSON“ aussieht. In deinem Decode-Output sieht man das auch:

  • transaction.messages: '[{...},{...}]'

Das ist kein JSON-Array-Typ, sondern ein String, der zufällig JSON enthält. Der Wazuh-JSON-Decoder dekodiert nur die oberste Ebene und führt keine rekursive JSON-Parsing-Logik auf String-Feldern aus.


Warum ein Custom-Decoder hier nicht sinnvoll ist

Wazuh-Decoders/Rules sind nicht dafür gedacht, verschachtelte JSON-Strings beliebiger Tiefe in einzelne Felder zu zerlegen (vor allem wenn das Array mehrere Einträge enthält und riesig wird). Das wird schnell unwartbar und teuer (Performance).


Best Practice: Logs vor dem Einlesen normalisieren

Wenn du die ModSecurity-Logs per localfile einliest, ist der sauberste Weg:

  1. Vorverarbeitung: JSON aus dem Log holen
  2. transaction.messages (String) → echtes Array/Objekt konvertieren
  3. Optional: flatten in messages_flat.* Felder
  4. Ergebnis als NDJSON (eine JSON pro Zeile) in neue Datei schreiben
  5. Wazuh-Agent überwacht die neue Datei als log_format=json

Genau das hat der Community-Reply vorgeschlagen.


Beispiel-Lösung (NDJSON Normalizer)

Der folgende Ansatz macht aus:

  • transaction.messages Stringtransaction.messages_flat Objekt, inkl. count + msg_0, msg_1

Python-Script (Beispiel): (wie im Thread beschrieben – Tail/Follower + Flattening)
Schreibt normalisierte Events nach: /var/log/test1.json

Danach Agent-Konfig:

<localfile>
  <log_format>json</log_format>
  <location>/var/log/test1.json</location>
</localfile>

Und neu starten:

systemctl restart wazuh-agent

Alternative (wenn du NICHT flatten willst)

Wenn du lieber die Struktur behältst, kannst du statt messages_flat einfach:

  • transaction.messages als echtes Array transaction.messages setzen (nicht entfernen)
  • dann kannst du in OpenSearch/Wazuh-Indexer besser darauf filtern/visualisieren

Wichtig bleibt: String → echtes JSON Array vor dem Einlesen.


Hinweis zu Größen & Performance

transaction.messages kann sehr groß werden (CRS liefert extrem viel). Flattening kann Index-Mappings aufblasen.

Praktische Empfehlungen:

  • Nur wichtige Keys extrahieren (z. B. ruleId, severity, message, tags)
  • Rest optional als Raw-Blob lassen
  • Output-Log rotieren (logrotate), sonst wächst /var/log/test1.json endlos

Kurzfazit

Du kannst transaction.messages nicht „weiter dekodieren“, solange es als String ankommt.
Die robuste Lösung ist Vor-Normalisierung (Script/Logstash/Pipeline), damit Wazuh wieder „echtes JSON“ bekommt und die Felder sauber indexiert werden.

#!/usr/bin/env python3
import os
import re
import json
import time
from typing import Optional, Dict, Any

IN_LOG = "/var/log/apache2/error.log"
OUT_LOG = "/var/log/test1.json"

# Find a JSON object inside text: from the first '{' to the last '}'.
# Works for "apache prefix ... {JSON} ...".
JSON_CANDIDATE_RE = re.compile(r"(\{.*\})", re.DOTALL)

def safe_json_loads(s: str) -> Optional[Dict[str, Any]]:
    try:
        obj = json.loads(s)
        return obj if isinstance(obj, dict) else None
    except Exception:
        return None

def flatten_transaction_messages(obj: Dict[str, Any]) -> Dict[str, Any]:
    """
    Convert:
      transaction.messages = [ {...}, {...} ]
    Into:
      transaction.messages_flat = { count: N, msg_0: {...}, msg_1: {...} }
    And remove transaction.messages.
    """
    tx = obj.get("transaction")
    if not isinstance(tx, dict):
        return obj

    msgs = tx.get("messages")

    # Sometimes messages is a JSON string: "[{...},{...}]"
    if isinstance(msgs, str):
        parsed = safe_json_loads(msgs)
        if parsed is None:
            # If it's a string but not JSON, keep it as-is
            return obj
        msgs = parsed

    if not isinstance(msgs, list):
        return obj

    flat = {"count": len(msgs)}
    for i, m in enumerate(msgs):
        if isinstance(m, dict):
            flat[f"msg_{i}"] = m
        else:
            flat[f"msg_{i}"] = {"value": m}

    tx["messages_flat"] = flat
    tx.pop("messages", None)  # remove the array completely
    obj["transaction"] = tx
    return obj

def write_ndjson(obj: Dict[str, Any]) -> None:
    line = json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
    with open(OUT_LOG, "a", encoding="utf-8") as f:
        f.write(line + "\n")

def follow_file(path: str, sleep_s: float = 0.2) -> None:
    """
    Tail -F behavior:
    - reads only new lines appended
    - detects log rotation via inode changes
    - buffers lines to support multi-line JSON
    """
    fp = None
    inode = None
    buf = ""

    while True:
        try:
            st = os.stat(path)
        except FileNotFoundError:
            time.sleep(sleep_s)
            continue

        # (Re)open on first run or rotation
        if fp is None or inode != st.st_ino:
            if fp:
                try:
                    fp.close()
                except Exception:
                    pass
            fp = open(path, "r", encoding="utf-8", errors="replace")
            inode = st.st_ino
            # Start at end (don't re-process old content)
            fp.seek(0, os.SEEK_END)

        line = fp.readline()
        if not line:
            time.sleep(sleep_s)
            continue

        # Buffer in case JSON spans multiple lines
        buf += line

        m = JSON_CANDIDATE_RE.search(buf)
        if not m:
            # Prevent buffer from growing forever on non-JSON lines
            if len(buf) > 50000:
                buf = ""
            continue

        candidate = m.group(1)

        obj = safe_json_loads(candidate)
        if obj is None:
            # Probably incomplete JSON; keep buffering a bit
            if len(buf) > 500000:
                buf = ""
            continue

        obj = flatten_transaction_messages(obj)
        write_ndjson(obj)

        # Reset buffer after successful parse
        buf = ""

def main() -> None:
    os.makedirs(os.path.dirname(OUT_LOG), exist_ok=True)
    open(OUT_LOG, "a", encoding="utf-8").close()
    follow_file(IN_LOG)

if __name__ == "__main__":
    main()

https://wazuh.slack.com/archives/C0A933R8E/p1765735456676599

Mehr zu Wazuh …

Mehr zum Wazuh Ambassador Program …