Title: NGFW Daemon — Suricata-Driven Reactive Firewall (IPv4, iptables)

Quick take: a Python daemon I built that listens to Suricata’s fast.log, enriches alerts with threat intel (AbuseIPDB + FireHOL), and automatically blocks attacker IPs via iptables. It writes human-readable logs and JSONL for SIEMs. This is a home-lab / learning project—intentionally simple, IPv4-only, iptables-based.

Stack: Python 3 · Suricata · iptables · AbuseIPDB · FireHOL · systemd · logrotate

Repo: https://github.com/yairemartinez/ngfw-daemon

Below is a example of my daemon in action, the blocks from the 10.0.0.0/24 network come from inside my network for breaking the icmp custom rules. The other blocks from other addresses come from inserting the false alarms into fast.log, in the logs you can view both human readable and json formatted logs for the blocks that occurred and for every time a alarm was set off. You can then see the ipv4 addresses blocked in iptables.


Table of Contents


What I Built

The NGFW Daemon is a Python-based security tool I created to work alongside Suricata IDS. Its job is to:

  • Monitor Suricata’s fast.log in real time
  • Parse alerts (IP, SID, rule message)
  • Enrich with threat intelligence (AbuseIPDB + FireHOL)
  • Block attacker IPs automatically via iptables (IPv4)
  • Log everything in both analyst-friendly text and machine-readable JSONL

This makes it a reactive firewall: Suricata detects → the daemon decides → iptables enforces.

Below are the three design features I’m most proud of, and how the main loop ties them together.


Why it’s good engineering

  • Keeps a durable byte offset so each cycle only reads new lines
  • Detects log rotation / truncation and resets safely
  • Robust parsing via compiled regex (full Suricata format + IPv4 flow fallback)
  • Deduplicates per IP so a single decision is made per IP per cycle
View code (ngfw_daemon.py)

# ---- Regex patterns for parsing Suricata fast.log ----
IPV4_FLOW = re.compile(r"(\d{1,3}(?:\.\d{1,3}){3})(?::\d+)?\s+->\s+(\d{1,3}(?:\.\d{1,3}){3})(?::\d+)?")
FASTLOG_RE = re.compile(
    r"""\[\*\*\]\s+\[(?P<gid>\d+):(?P<sid>\d+):(?P<rev>\d+)\]\s+
        (?P<msg>.+?)\s+\[\*\*\].*?\{(?P<proto>[A-Z]+)\}\s+
        (?P<src_ip>\d{1,3}(?:\.\d{1,3}){3})(?::\d+)?\s+->\s+
        (?P<dst_ip>\d{1,3}(?:\.\d{1,3}){3})(?::\d+)?""",
    re.VERBOSE,
)

# ---- fast.log incremental reader with SID/MSG ----
def _read_offset():
    try:
        if os.path.exists(OFFSET_FILE):
            with open(OFFSET_FILE, "r") as f:
                return int((f.read() or "0").strip())
    except ValueError:
        logger.debug(f"Offset file {OFFSET_FILE} contains invalid integer; resetting to 0")
    except PermissionError:
        logger.critical(f"Permission denied reading {OFFSET_FILE}")
    except Exception as e:
        logger.debug(f"Unexpected error reading offset: {e}")
    return 0

def _write_offset(offset):
    try:
        with open(OFFSET_FILE, "w") as f:
            f.write(str(offset))
    except PermissionError:
        logger.critical(f"Permission denied writing {OFFSET_FILE}")
    except OSError as e:
        logger.error(f"OS error writing {OFFSET_FILE}: {e}")
    except Exception as e:
        logger.exception(f"Failed to write offset file: {e}")

def extract_events_from_fastlog(include_private: bool = False):
    """
    Read new fast.log lines since saved offset and return deduped event list:
    [{'ip': '1.2.3.4', 'sid': 1000001 | None, 'msg': '...'}, ...]
    """
    events = []
    if not os.path.exists(FAST_LOG):
        logger.debug(f"FAST_LOG {FAST_LOG} not present; skipping.")
        return events

    offset = _read_offset()
    try:
        with open(FAST_LOG, "r", errors="ignore") as f:
            file_size = os.path.getsize(FAST_LOG)
            if offset > file_size:
                logger.info("fast.log rotated/truncated; resetting offset to 0")
                offset = 0
            f.seek(offset)
            lines = f.readlines()
            _write_offset(f.tell())
    except FileNotFoundError:
        logger.warning(f"{FAST_LOG} missing during read")
        return events
    except PermissionError:
        logger.critical(f"Permission denied reading {FAST_LOG}")
        return events
    except OSError as e:
        logger.error(f"OS error reading fast.log: {e}")
        return events
    except Exception as e:
        logger.exception(f"Unexpected error reading fast.log: {e}")
        return events

    # parse lines -> create raw events
    raw = []
    for line in lines:
        line = line.strip()
        if not line:
            continue
        sid = None
        msg = None
        src_ip = None

        try:
            m = FASTLOG_RE.search(line)
            if m:
                try:
                    sid = int(m.group("sid"))
                except (TypeError, ValueError):
                    sid = None
                msg = m.group("msg").strip() if m.group("msg") else None
                src_ip = m.group("src_ip")
            else:
                m2 = IPV4_FLOW.search(line)
                if m2:
                    src_ip = m2.group(1)
        except re.error as e:
            logger.debug(f"Regex parse error on fast.log line: {e}")
            continue

        if not src_ip:
            continue

        try:
            ip_obj = ipaddress.IPv4Address(src_ip)
        except ValueError:
            logger.debug(f"Skipping malformed IP in fast.log: {src_ip}")
            continue
        except Exception as e:
            logger.debug(f"Unexpected IP parse error for '{src_ip}': {e}")
            continue

        if not include_private and (ip_obj.is_private or ip_obj.is_loopback):
            continue

        raw.append({"ip": str(ip_obj), "sid": sid, "msg": msg})

    # dedupe/aggregate per-ip so a single decision is made per IP per cycle
    events_by_ip = {}
    for r in raw:
        ip = r["ip"]
        if ip not in events_by_ip:
            events_by_ip[ip] = {"ip": ip, "sids": [], "msgs": []}
        if r.get("sid") is not None:
            events_by_ip[ip]["sids"].append(r["sid"])
        if r.get("msg"):
            events_by_ip[ip]["msgs"].append(r["msg"])

    for ip, v in events_by_ip.items():
        sid_val = v["sids"][0] if v["sids"] else None
        msg_val = "; ".join(v["msgs"]) if v["msgs"] else None
        events.append({"ip": ip, "sid": sid_val, "msg": msg_val})

    return events
  

Why it’s good engineering

  • AbuseIPDB with TTL cache + per-cycle rate limiting
  • FireHOL CIDR matching with periodic reload
  • Fail policy: choose fail-closed or fail-open when intel is “unknown”
  • Returns explainable reason strings (great for audits & debugging)
View code (ngfw_daemon.py)

# ---- AbuseIPDB + FireHOL ----
_abuse_cache = {}
ABUSE_TTL = int(os.getenv("ABUSEIPDB_CACHE_TTL_S", "1800"))

def check_abuseipdb(ip: str, allow_query: bool = True):
    """
    Returns (verdict, score, source_tag)
    source_tag = 'abuseipdb(cache)', 'abuseipdb' (live), 'abuseipdb(nolookup)' when not allowed to query,
    or ('unknown', None, 'abuseipdb') if error/no-key.
    """
    now = datetime.now(timezone.utc)
    if ip in _abuse_cache:
        verdict, score, ts = _abuse_cache[ip]
        if (now - ts).total_seconds() < ABUSE_TTL:
            return verdict, score, "abuseipdb(cache)"

    if not ABUSEIPDB_API_KEY or not allow_query:
        return "unknown", None, "abuseipdb(nolookup)"

    try:
        headers = {"Key": ABUSEIPDB_API_KEY, "Accept": "application/json"}
        url = f"https://api.abuseipdb.com/api/v2/check?ipAddress={ip}"
        r = requests.get(url, headers=headers, timeout=ABUSEIPDB_TIMEOUT_S)
        r.raise_for_status()
        data = r.json().get("data", {})
        score = int(data.get("abuseConfidenceScore", 0))
        verdict = "block" if score >= ABUSEIPDB_THRESHOLD else "clean"
    except requests.Timeout:
        logger.warning(f"AbuseIPDB timeout for {ip}")
        verdict, score = "unknown", None
    except requests.ConnectionError:
        logger.error(f"AbuseIPDB connection error for {ip}")
        verdict, score = "unknown", None
    except requests.HTTPError as e:
        status = e.response.status_code if e.response else "unknown"
        logger.error(f"AbuseIPDB HTTP {status} for {ip}")
        verdict, score = "unknown", None
    except ValueError:
        logger.error(f"Invalid JSON from AbuseIPDB for {ip}")
        verdict, score = "unknown", None
    except Exception as e:
        logger.exception(f"Unexpected AbuseIPDB error for {ip}: {e}")
        verdict, score = "unknown", None

    _abuse_cache[ip] = (verdict, score, now)
    return verdict, score, "abuseipdb"

_firehol = set()
_firehol_loaded_at = None

def _load_firehol():
    global _firehol, _firehol_loaded_at
    if not FIREHOL_ENABLED:
        return
    try:
        with open(FIREHOL_FILE, "r") as f:
            entries = [ln.strip() for ln in f if ln.strip() and not ln.startswith("#")]
        _firehol = set(entries)
        _firehol_loaded_at = datetime.now(timezone.utc)
        logger.info(f"FireHOL loaded: {len(_firehol)} entries")
    except FileNotFoundError:
        logger.error(f"FireHOL file not found: {FIREHOL_FILE}")
    except PermissionError:
        logger.critical(f"Permission denied reading FireHOL file: {FIREHOL_FILE}")
    except OSError as e:
        logger.error(f"OS error loading FireHOL: {e}")
    except Exception as e:
        logger.exception(f"Could not load FireHOL list: {e}")

def _need_reload_firehol():
    if not FIREHOL_ENABLED:
        return False
    if _firehol_loaded_at is None:
        return True
    try:
        return (datetime.now(timezone.utc) - _firehol_loaded_at).total_seconds() > FIREHOL_RELOAD_S
    except Exception:
        return True

def check_firehol(ip: str):
    if not FIREHOL_ENABLED:
        return "unknown", None, "firehol"
    if _need_reload_firehol():
        _load_firehol()
    try:
        ip_obj = ipaddress.IPv4Address(ip)
        for item in _firehol:
            try:
                if "/" in item:
                    if ip_obj in ipaddress.IPv4Network(item, strict=False):
                        return "block", None, "firehol"
                else:
                    if ip == item:
                        return "block", None, "firehol"
            except ValueError:
                continue
            except Exception:
                continue
        return "clean", None, "firehol"
    except ValueError:
        return "unknown", None, "firehol"
    except Exception as e:
        logger.debug(f"Unexpected FireHOL check error for {ip}: {e}")
        return "unknown", None, "firehol"

# ---- Consolidated reputation evaluation helper ----
def evaluate_reputation_and_decide(ip: str, sid=None, rule_msg=None, allow_abuse_query=True):
    sources = {}
    a_verdict, a_score, a_tag = check_abuseipdb(ip, allow_query=allow_abuse_query)
    sources["abuseipdb"] = (a_verdict, a_score, a_tag)

    f_verdict, f_score, f_tag = check_firehol(ip)
    sources["firehol"] = (f_verdict, f_score, f_tag)

    score = a_score if a_score is not None else None

    if f_verdict == "block":
        reason = f"REPUTATION BLOCK (source=firehol)"
        return "block", score, reason, sources

    if a_score is not None and a_score >= RISK_THRESHOLD:
        reason = f"REPUTATION BLOCK (source=abuseipdb,score={a_score})"
        return "block", score, reason, sources

    providers_total = 1 + (1 if FIREHOL_ENABLED else 0)
    unknown_count = 0
    provider_positive = False
    for p, (v, s, t) in sources.items():
        if v == "unknown":
            unknown_count += 1
        elif v == "clean":
            provider_positive = True
        elif v == "block":
            provider_positive = True
        if s is not None and s < RISK_THRESHOLD:
            provider_positive = True

    if unknown_count == providers_total:
        if FAIL_POLICY == "closed":
            reason = "REPUTATION UNKNOWN (fail-closed: all TI providers failed)"
            return "block", score, reason, sources
        else:
            reason = "REPUTATION UNKNOWN (fail-open: all TI providers failed)"
            return "allow", score, reason, sources

    reason_parts = []
    if a_score is not None:
        reason_parts.append(f"abuseipdb(score={a_score})")
    else:
        reason_parts.append(f"abuseipdb({sources['abuseipdb'][0]})")
    if FIREHOL_ENABLED:
        reason_parts.append(f"firehol({sources['firehol'][0]})")
    reason = "REPUTATION CLEAN (" + ",".join(reason_parts) + ")"
    return "allow", score, reason, sources
  

Why it’s good engineering

  • Won’t duplicate rules; always checks state first
  • Inserts at the top for deterministic behavior & easy testing
  • Persists block metadata; auto-unblocks after BLOCK_EXPIRE_S
  • Clean signal handling (SIGHUP/SIGTERM); works nicely with logrotate
View code (ngfw_daemon.py)

# ---- file helpers ----
def ensure_file(path):
    try:
        if not os.path.exists(path):
            with open(path, "w") as f:
                f.write(f"=== {os.path.basename(path)} started ===\n")
            os.chmod(path, 0o600)
    except PermissionError:
        logger.critical(f"Permission denied preparing {path}")
    except FileNotFoundError:
        logger.error(f"Path not found preparing {path}")
    except OSError as e:
        logger.error(f"OS error preparing {path}: {e}")
    except Exception as e:
        logger.exception(f"Could not prepare {path}: {e}")

ensure_file(BLOCKS_LOG)
ensure_file(ALERT_LOG)

def log_block(ip: str, reason: str, score=None, sid=None, rule_msg=None):
    ts = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S")
    score_txt = f" (REPUTATION SCORE: {score})" if score is not None else ""
    enrich = _fmt_enrichment(sid, rule_msg)
    line = f"{ts} BLOCKED {ip}{score_txt} [{reason}]{enrich}"
    try:
        with open(BLOCKS_LOG, "a") as f:
            f.write(line + "\n")
    except PermissionError:
        logger.critical(f"Permission denied writing {BLOCKS_LOG}")
    except FileNotFoundError:
        logger.error(f"{BLOCKS_LOG} not found when writing block")
    except OSError as e:
        logger.error(f"OS error writing {BLOCKS_LOG}: {e}")
    except Exception as e:
        logger.exception(f"Unexpected error writing blocks.log: {e}")
    logger.warning(line)

# ---- iptables helpers ----
def is_blocked(ip: str, chain: str = "INPUT") -> bool:
    try:
        return subprocess.run(
            [IPTABLES, "-C", chain, "-s", ip, "-j", "DROP"],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
        ).returncode == 0
    except FileNotFoundError:
        logger.critical("iptables binary not found while checking rule")
        return False
    except Exception as e:
        logger.error(f"Unexpected error checking iptables for {ip}: {e}")
        return False

def add_iptables_drop(ip: str, chain: str = "INPUT") -> bool:
    if is_blocked(ip, chain):
        return False
    try:
        subprocess.run([IPTABLES, "-I", chain, "1", "-s", ip, "-j", "DROP"], check=True)
        return True
    except FileNotFoundError:
        logger.critical("iptables binary not found — cannot enforce blocks!")
        return False
    except subprocess.CalledProcessError as e:
        logger.error(f"iptables failed inserting rule for {ip}: {e}")
        return False
    except PermissionError:
        logger.critical("Permission denied executing iptables. Missing CAP_NET_ADMIN?")
        return False
    except Exception as e:
        logger.exception(f"Unexpected error adding iptables rule for {ip}: {e}")
        return False

def remove_iptables_drop(ip: str, chain: str = "INPUT") -> bool:
    try:
        while subprocess.run(
            [IPTABLES, "-C", chain, "-s", ip, "-j", "DROP"],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
        ).returncode == 0:
            subprocess.run([IPTABLES, "-D", chain, "-s", ip, "-j", "DROP"], check=True)
        return True
    except FileNotFoundError:
        logger.critical("iptables binary not found — cannot remove rules!")
        return False
    except subprocess.CalledProcessError as e:
        logger.warning(f"Failed to remove iptables rule for {ip}: {e}")
        return False
    except PermissionError:
        logger.critical("Permission denied executing iptables delete. Missing CAP_NET_ADMIN?")
        return False
    except Exception as e:
        logger.exception(f"Unexpected error removing iptables rule for {ip}: {e}")
        return False

# ---- persistent block store ----
_blocks = {}  # ip -> {"ts": iso, "reason": str, "score": int|None, "sid": int|None, "msg": str|None}

def load_blocks():
    global _blocks
    try:
        if os.path.exists(BLOCKS_DB) and os.path.getsize(BLOCKS_DB) > 0:
            with open(BLOCKS_DB, "r") as f:
                _blocks = json.load(f)
        else:
            _blocks = {}
    except json.JSONDecodeError as e:
        logger.error(f"Corrupted JSON in {BLOCKS_DB}: {e}")
        _blocks = {}
    except PermissionError:
        logger.critical(f"Permission denied reading {BLOCKS_DB}")
        _blocks = {}
    except FileNotFoundError:
        logger.warning(f"{BLOCKS_DB} not found; starting with empty block DB.")
        _blocks = {}
    except OSError as e:
        logger.error(f"OS error reading {BLOCKS_DB}: {e}")
        _blocks = {}
    except Exception as e:
        logger.exception(f"Unexpected error loading {BLOCKS_DB}: {e}")
        _blocks = {}

def save_blocks():
    try:
        with open(BLOCKS_DB, "w") as f:
            json.dump(_blocks, f)
    except PermissionError:
        logger.critical(f"Permission denied writing {BLOCKS_DB}")
    except FileNotFoundError:
        logger.error(f"{BLOCKS_DB} path not found when saving blocks DB")
    except (TypeError, ValueError) as e:
        logger.error(f"JSON serialization error saving blocks DB: {e}")
    except OSError as e:
        logger.error(f"OS error writing {BLOCKS_DB}: {e}")
    except Exception as e:
        logger.exception(f"Unexpected error persisting blocks DB: {e}")

def record_block(ip, reason, score=None, sid=None, rule_msg=None):
    ts = datetime.now(timezone.utc).isoformat()
    _blocks[ip] = {"ts": ts, "reason": reason, "score": score, "sid": sid, "msg": rule_msg}
    save_blocks()
    log_block(ip, reason, score, sid, rule_msg)
    log_alert(ip, reason, score, sid, rule_msg)

def prune_expired_blocks():
    now = datetime.now(timezone.utc)
    removed = []
    for ip, meta in list(_blocks.items()):
        try:
            ts = datetime.fromisoformat(meta["ts"])
        except (KeyError, ValueError) as e:
            logger.debug(f"Malformed entry in blocks DB for {ip}: {e}")
            _blocks.pop(ip, None)
            continue
        except Exception as e:
            logger.debug(f"Unexpected parse error for block entry {ip}: {e}")
            continue

        try:
            if (now - ts).total_seconds() > BLOCK_EXPIRE_S:
                if remove_iptables_drop(ip):
                    removed.append(ip)
                _blocks.pop(ip, None)
        except Exception as e:
            logger.debug(f"Error during expiry check/removal for {ip}: {e}")
            continue
    if removed:
        save_blocks()
        logger.info(f"Unblocked expired IPs: {', '.join(removed)}")
  

Why it’s good engineering

  • Adaptive polling: Normal vs High-Alert interval based on volume
  • For each event: decide → enforce/log with explainable reasons
  • Safe by default: skips internal/private IPs (toggleable for labs)
View code (ngfw_daemon.py)

# ---- Main loop ----
def main():
    ensure_file(BLOCKS_LOG)
    ensure_file(ALERT_LOG)
    load_blocks()

    logger.info("NGFW daemon starting up...")
    logger.info(f"Poll interval={POLL_INTERVAL}s, High alert threshold={HIGH_ALERT_THRESHOLD}, block expiry={BLOCK_EXPIRE_S}s")
    logger.info(f"Risk threshold={RISK_THRESHOLD}, AbuseIPDB threshold={ABUSEIPDB_THRESHOLD}, Fail policy={FAIL_POLICY}")

    # Quick sanity: iptables availability (non-fatal)
    try:
        subprocess.run([IPTABLES, "-L"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
    except FileNotFoundError:
        logger.critical("iptables not found. Blocking will not work.")
    except Exception as e:
        logger.debug(f"iptables preflight error (non-fatal): {e}")

    high_alert = False

    while True:
        try:
            prune_expired_blocks()
            logger.debug("Reading fast.log incrementally...")
            events = extract_events_from_fastlog(include_private=True)

            if not events:
                logger.debug("No new events found.")
            else:
                abuse_queries = 0
                blocked_this_cycle = 0

                for ev in events:
                    try:
                        ip = ev["ip"]
                        sid = ev.get("sid")
                        rule_msg = ev.get("msg")

                        if ip in LOCAL_IPS or is_private_or_reserved(ip):
                            logger.debug(f"Skipping internal/reserved IP {ip}")
                            log_alert(ip, reason="INTERNAL/PRIVATE (skipped)", sid=sid, rule_msg=rule_msg)
                            continue

                        allow_query = abuse_queries < ABUSEIPDB_MAX_PER_CYCLE
                        decision, score, reason, sources = evaluate_reputation_and_decide(
                            ip, sid=sid, rule_msg=rule_msg, allow_abuse_query=allow_query
                        )

                        a_tag = sources.get("abuseipdb", (None, None, None))[2]
                        if a_tag == "abuseipdb":
                            abuse_queries += 1

                        if decision == "block":
                            if add_iptables_drop(ip):
                                record_block(ip, reason=reason, score=score, sid=sid, rule_msg=rule_msg)
                                blocked_this_cycle += 1
                            else:
                                log_alert(ip, reason=f"{reason} (already-blocked?)", score=score, sid=sid, rule_msg=rule_msg)
                        else:
                            log_alert(ip, reason=reason, score=score, sid=sid, rule_msg=rule_msg)
                    except KeyError as e:
                        logger.debug(f"Event missing expected key {e}; skipping event: {ev}")
                        continue
                    except Exception as e:
                        logger.exception(f"Unexpected error processing event {ev}: {e}")
                        continue

                if blocked_this_cycle >= HIGH_ALERT_THRESHOLD:
                    if not high_alert:
                        logger.warning(f"High-Alert Mode activated! Interval={HIGH_ALERT_INTERVAL}s")
                    high_alert = True
                elif high_alert and blocked_this_cycle == 0:
                    logger.info(f"Returning to normal polling ({POLL_INTERVAL}s)")
                    high_alert = False

            time.sleep(HIGH_ALERT_INTERVAL if high_alert else POLL_INTERVAL)

        except KeyboardInterrupt:
            logger.info("Shutdown requested (KeyboardInterrupt).")
            break
        except Exception as e:
            logger.exception(f"Main loop unexpected error: {e}")
            time.sleep(POLL_INTERVAL)

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        logger.info("Shutdown (KeyboardInterrupt).")
    except Exception as e:
        logger.exception(f"Fatal error: {e}")
        sys.exit(1)
  

  • Text logs (alert.log, blocks.log, daemon.log) for quick human scanning
  • JSONL (alerts.json) for SIEM pipelines (tail-friendly, line-oriented)
  • Enriched fields: timestamp, offender IP, reputation score, TI source, SID, rule message, action

Example

2025-09-02 11:24:45 BLOCKED 10.0.0.1 (REPUTATION SCORE: 0) [REPUTATION BLOCK (source=firehol)] SID=2000004 MSG="TEST ICMP Ping Detected"

  • Parsing edge cases: Suricata fast.log variants, partial lines, regex mismatches
  • Offset correctness: no duplicate reads post-rotation; safe recovery on truncation
  • Intel timing & quotas: caching, timeouts, and per-cycle caps to keep the loop responsive
  • Operator ergonomics: SIGHUP-driven log reopen for logrotate; clear reason strings; top-insertion iptables rules to simplify testing/removal
  • Lab realism: toggleable behavior (skip/allow private IPs) to safely simulate internal threats

  • Useful: Converts IDS alerts into actionable blocks immediately, with transparent reasoning and solid audit logs
  • Efficient: Incremental reads, cache-first TI, rate-limited lookups, deduped events, and adaptive polling under load
  • Impressive: A cohesive pipeline—ingestion → enrichment → decision → enforcement → expiry—plus production-aware details (signals, rotation, state)

  1. Suricata provides raw detection via fast.log
  2. ngfw_daemon.py consumes → enriches → enforces
  3. Logs and state live in logs_and_utilities/ for visibility & SIEM
  4. post_server.py helps simulate malicious POSTs to test blocking
    (Setup and path specifics are documented in the repo README.)

  • IPv4-only (no IPv6 yet)
  • iptables backend (not nftables/ufw)
  • Single-host focus (not distributed)
  • Best for home labs / learning, not production-critical use

Shortcomings observed in testing (and how I’d fix them)

What I saw: During testing, a few ICMP pings (about 6–7) got through before the IP was blocked. Similarly, a suspicious HTTP POST could complete and then the source was blocked shortly after.

Why this happens (expected trade-off): The pipeline is reactive by design: Suricata detects → writes a log line → my daemon reads on its next cycle → optional threat-intel lookup → iptables rule insertion. Latency can come from:

  • Detection timing: Some Suricata signatures (especially HTTP/app-layer) alert after enough of the request is parsed, so the first request may complete before an alert exists.
  • Log buffering: fast.log lines aren’t always immediately readable; flush/buffering adds delay.
  • Daemon polling: The loop sleeps between checks, so alerts found just after a read wait until the next cycle.
  • Threat-intel latency: First-time AbuseIPDB lookups add network time; timeouts/rate limits can push evaluation to a later cycle.
  • Rule insertion race: There’s a brief gap between deciding to block and the new DROP rule taking effect.

How I’d reduce the delay next:

  • Go event-driven on Suricata output (watch the file and react on each new line) or shrink the poll interval substantially to tighten the loop.
  • Provisional block first, enrich after: Immediately add a short-TTL DROP on first alert, then confirm with threat intel; remove the block quickly if intel looks clean.
  • Read eve.json instead of fast.log: Stream line-oriented JSON for lower latency and richer fields.
  • Trim TI overhead: aggressive caching, sub-second timeouts, and skipping TI for clearly malicious signatures.
  • Verify chain precedence: keep inserting at INPUT position 1 (or the correct chain for your traffic path) so the DROP wins deterministically.
  • If zero-tolerance is required: consider Suricata in inline IPS mode (NFQUEUE) to drop the very first offending packet, instead of IDS->iptables reaction.

  • How Suricata logs (like fast.log) work under the hood
  • Parsing + enriching logs for humans and SIEMs
  • Balancing fail-open vs fail-closed when TI providers are unavailable
  • The importance of log rotation, structured state, and safe test traffic
  • Trial-and-error to reach meaningful enriched logs (SIDs, reasons, timestamps)

I used AI (ChatGPT) as a coding and design assistant to help debug, refine, and extend my original concept. The first version worked, but it lacked the more complex behaviors I wanted. With AI as a sounding board, I iterated toward a more robust, explainable daemon. All integration, thresholds, and testing are mine, I reviewed and verified every change.

Where AI helped

• Fast, resilient parsing: shaping the Suricata fast.log regex (FASTLOG_RE) plus a fallback IPv4 flow regex (IPV4_FLOW), with defensive handling for regex errors and malformed IPs.
• Incremental reading with rotation safety: the offset file pattern (_read_offset/_write_offset), reset-on-truncation, and broad exception handling so the daemon keeps running.
• Threat intel workflow: cache-first AbuseIPDB with a TTL, strict timeouts, explicit HTTP/JSON error paths, and FireHOL CIDR matching with periodic reload.
• Policy logic & explainability: a consolidated decision helper (evaluate_reputation_and_decide) that returns clear reason strings (“REPUTATION BLOCK…”, “REPUTATION CLEAN…”) and supports fail-open/fail-closed.
• Idempotent firewall control: iptables helpers that check for existing rules (-C) before inserting at the top (-I INPUT 1), plus safe looped removal and robust error handling.
• State & expiry: a small JSON store for block metadata (ISO timestamps) with prune_expired_blocks that gracefully handles malformed entries.
• Logging for humans & SIEM: dual outputs (text + JSONL) enriched with SID, rule message, TI source, reputation score; consistent, grep-friendly formatting.
• Operational polish: high-alert vs normal polling, per-cycle rate limiting for AbuseIPDB, and notes for clean log rotation/signal handling.

Attribution & responsibility
  • I used AI to brainstorm designs, draft snippets, and review edge cases; it did not run in my environment. I executed, validated, and tuned everything.
  • Security choices, thresholds, and trade-offs are my own; I treat AI output as suggestions that require audit and testing.

Final Thoughts

This isn’t a perfect NGFW replacement. It’s my hands-on exploration of IDS + firewall integration:

  • Show how detections can trigger automated responses
  • Build a working security daemon with operational polish
  • Grow skills in Python, Linux security, and network defense

👉 Full code & setup: https://github.com/yairemartinez/ngfw-daemon