{"id":1075,"date":"2025-09-03T19:32:47","date_gmt":"2025-09-03T19:32:47","guid":{"rendered":"https:\/\/yairmartinezcybersecurityportfolio.com\/?p=1075"},"modified":"2025-12-26T19:18:06","modified_gmt":"2025-12-26T19:18:06","slug":"ngfw-daemon-my-ids-firewall-learning-project","status":"publish","type":"post","link":"https:\/\/yairmartinezcybersecurityportfolio.com\/?p=1075","title":{"rendered":"NGFW Daemon \u2014 Suricata-Driven Reactive Firewall (IPv4, iptables)"},"content":{"rendered":"\n<p><strong>Title:<\/strong> NGFW Daemon \u2014 Suricata-Driven Reactive Firewall (IPv4, iptables)<\/p>\n\n\n\n<p><strong>Quick take:<\/strong> a Python daemon I built that listens to Suricata\u2019s <code>fast.log<\/code>, enriches alerts with threat intel (AbuseIPDB + FireHOL), and automatically blocks attacker IPs via iptables. It writes human-readable logs and JSONL for SIEMs. This is a home-lab \/ learning project\u2014intentionally simple, IPv4-only, iptables-based.<\/p>\n\n\n\n<p><strong>Stack:<\/strong> Python 3 \u00b7 Suricata \u00b7 iptables \u00b7 AbuseIPDB \u00b7 FireHOL \u00b7 systemd \u00b7 logrotate<\/p>\n\n\n\n<p><strong>Repo:<\/strong> <a href=\"https:\/\/github.com\/yairemartinez\/ngfw-daemon?utm_source=chatgpt.com\">https:\/\/github.com\/yairemartinez\/ngfw-daemon<\/a><\/p>\n\n\n\n<p>Below is a example of my daemon in action, the blocks from the 10.0.0.0\/24 network come from inside my network for breaking the icmp custom rules. The other blocks from other addresses come from inserting the false alarms into fast.log, in the logs you can view  both human readable and json formatted logs for the blocks that occurred and for every time a alarm was set off. You can then see the ipv4 addresses blocked in iptables. <\/p>\n\n\n\n<div class=\"wp-block-stackable-image-box stk-block-image-box stk-hover-parent stk-block stk-db0cd7e is-style-default\" data-block-id=\"db0cd7e\"><style>.stk-db0cd7e {margin-bottom:25px !important;}<\/style><div class=\"stk-block-content stk-inner-blocks has-text-align-center stk-row stk-block-image-box__content\">\n<div class=\"wp-block-stackable-column stk-block-column stk-column stk-block stk-2643aab\" data-v=\"4\" data-block-id=\"2643aab\"><style>.stk-2643aab {align-items:center !important;display:flex !important;}<\/style><div class=\"stk-column-wrapper stk-block-column__content stk-container stk-2643aab-container stk--no-background stk--no-padding\"><div class=\"stk-block-content stk-inner-blocks stk-2643aab-inner-blocks\">\n<div class=\"wp-block-stackable-image-box stk-block-image-box stk-hover-parent stk-block stk-4a15f21 is-style-default\" data-block-id=\"4a15f21\"><div class=\"stk-block-content stk-inner-blocks has-text-align-center stk-row stk-block-image-box__content\">\n<div class=\"wp-block-stackable-image stk-block-image stk-block stk-0c3b0b2\" data-block-id=\"0c3b0b2\"><style>.stk-0c3b0b2 .stk-img-wrapper{width:null% !important;height:nullpx !important;}:where(.stk-hover-parent:hover,  .stk-hover-parent.stk--is-hovered) .stk-0c3b0b2 .stk-img-wrapper::after{background-color:#000000B3 !important;}<\/style><figure><span class=\"stk-img-wrapper stk-image--shape-stretch\"><img loading=\"lazy\" decoding=\"async\" class=\"stk-img wp-image-1296\" src=\"https:\/\/yairmartinezcybersecurityportfolio.com\/wp-content\/uploads\/2025\/09\/ngfwdaemon.gif\" width=\"1304\" height=\"687\"\/><\/span><\/figure><\/div>\n<\/div><\/div>\n<\/div><\/div><\/div>\n<\/div><\/div>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Table of Contents<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li><a href=\"#what-i-built\">What I Built<\/a><\/li>\n\n\n\n<li><a href=\"#feature-1\">1) Incremental, resilient log ingestion<\/a><\/li>\n\n\n\n<li><a href=\"#feature-2\">2) Threat-intel decision engine (cache, policy, fallbacks)<\/a><\/li>\n\n\n\n<li><a href=\"#feature-3\">3) Idempotent firewall enforcement + time-based unblocking<\/a><\/li>\n\n\n\n<li><a href=\"#main-loop\">The main loop: calm when quiet, reactive when noisy<\/a><\/li>\n\n\n\n<li><a href=\"#logging\">Logging built for analysts and machines<\/a><\/li>\n\n\n\n<li><a href=\"#testing\">Testing, trial &amp; error<\/a><\/li>\n\n\n\n<li><a href=\"#why-useful\">Why this daemon is useful &amp; efficient<\/a><\/li>\n\n\n\n<li><a href=\"#lab-setup\">How It Works in a Lab Setup<\/a><\/li>\n\n\n\n<li><a href=\"#limitations\">Limitations (by design)<\/a><\/li>\n\n\n\n<li><a href=\"#shortcomings\">Shortcomings observed in testing<\/a><\/li>\n\n\n\n<li><a href=\"#ai-disclosure\">How I used AI on this project<\/a><\/li>\n\n\n\n<li><a href=\"#learned\">What I Learned<\/a><\/li>\n\n\n\n<li><a href=\"#final-thoughts\">Final Thoughts<\/a><\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<div class=\"wp-block-stackable-heading stk-block-heading stk-block-heading--v2 stk-block stk-948e1e6\" id=\"what-i-built\" data-block-id=\"948e1e6\"><h2 class=\"stk-block-heading__text\">What I Built<\/h2><\/div>\n\n\n\n<p>The NGFW Daemon is a Python-based security tool I created to work alongside Suricata IDS. Its job is to:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Monitor Suricata\u2019s <code>fast.log<\/code> in real time<\/li>\n\n\n\n<li>Parse alerts (IP, SID, rule message)<\/li>\n\n\n\n<li>Enrich with threat intelligence (AbuseIPDB + FireHOL)<\/li>\n\n\n\n<li>Block attacker IPs automatically via iptables (IPv4)<\/li>\n\n\n\n<li>Log everything in both analyst-friendly text and machine-readable JSONL<\/li>\n<\/ul>\n\n\n\n<p>This makes it a reactive firewall: Suricata detects \u2192 the daemon decides \u2192 iptables enforces.<\/p>\n\n\n\n<p>Below are the three design features I\u2019m most proud of, and how the main loop ties them together.<\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<h3 class=\"wp-block-heading\">Why it\u2019s good engineering<\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Keeps a durable byte offset so each cycle only reads new lines<\/li>\n\n\n\n<li>Detects log rotation \/ truncation and resets safely<\/li>\n\n\n\n<li>Robust parsing via compiled regex (full Suricata format + IPv4 flow fallback)<\/li>\n\n\n\n<li>Deduplicates per IP so a single decision is made per IP per cycle<\/li>\n<\/ul>\n\n\n\n<details>\n  <summary><strong>View code (ngfw_daemon.py)<\/strong><\/summary>\n  <pre><code class=\"language-python\">\n# ---- Regex patterns for parsing Suricata fast.log ----\nIPV4_FLOW = re.compile(r\"(\\d{1,3}(?:\\.\\d{1,3}){3})(?::\\d+)?\\s+->\\s+(\\d{1,3}(?:\\.\\d{1,3}){3})(?::\\d+)?\")\nFASTLOG_RE = re.compile(\n    r\"\"\"\\[\\*\\*\\]\\s+\\[(?P&lt;gid&gt;\\d+):(?P&lt;sid&gt;\\d+):(?P&lt;rev&gt;\\d+)\\]\\s+\n        (?P&lt;msg&gt;.+?)\\s+\\[\\*\\*\\].*?\\{(?P&lt;proto&gt;[A-Z]+)\\}\\s+\n        (?P&lt;src_ip&gt;\\d{1,3}(?:\\.\\d{1,3}){3})(?::\\d+)?\\s+->\\s+\n        (?P&lt;dst_ip&gt;\\d{1,3}(?:\\.\\d{1,3}){3})(?::\\d+)?\"\"\",\n    re.VERBOSE,\n)\n\n# ---- fast.log incremental reader with SID\/MSG ----\ndef _read_offset():\n    try:\n        if os.path.exists(OFFSET_FILE):\n            with open(OFFSET_FILE, \"r\") as f:\n                return int((f.read() or \"0\").strip())\n    except ValueError:\n        logger.debug(f\"Offset file {OFFSET_FILE} contains invalid integer; resetting to 0\")\n    except PermissionError:\n        logger.critical(f\"Permission denied reading {OFFSET_FILE}\")\n    except Exception as e:\n        logger.debug(f\"Unexpected error reading offset: {e}\")\n    return 0\n\ndef _write_offset(offset):\n    try:\n        with open(OFFSET_FILE, \"w\") as f:\n            f.write(str(offset))\n    except PermissionError:\n        logger.critical(f\"Permission denied writing {OFFSET_FILE}\")\n    except OSError as e:\n        logger.error(f\"OS error writing {OFFSET_FILE}: {e}\")\n    except Exception as e:\n        logger.exception(f\"Failed to write offset file: {e}\")\n\ndef extract_events_from_fastlog(include_private: bool = False):\n    \"\"\"\n    Read new fast.log lines since saved offset and return deduped event list:\n    [{'ip': '1.2.3.4', 'sid': 1000001 | None, 'msg': '...'}, ...]\n    \"\"\"\n    events = []\n    if not os.path.exists(FAST_LOG):\n        logger.debug(f\"FAST_LOG {FAST_LOG} not present; skipping.\")\n        return events\n\n    offset = _read_offset()\n    try:\n        with open(FAST_LOG, \"r\", errors=\"ignore\") as f:\n            file_size = os.path.getsize(FAST_LOG)\n            if offset &gt; file_size:\n                logger.info(\"fast.log rotated\/truncated; resetting offset to 0\")\n                offset = 0\n            f.seek(offset)\n            lines = f.readlines()\n            _write_offset(f.tell())\n    except FileNotFoundError:\n        logger.warning(f\"{FAST_LOG} missing during read\")\n        return events\n    except PermissionError:\n        logger.critical(f\"Permission denied reading {FAST_LOG}\")\n        return events\n    except OSError as e:\n        logger.error(f\"OS error reading fast.log: {e}\")\n        return events\n    except Exception as e:\n        logger.exception(f\"Unexpected error reading fast.log: {e}\")\n        return events\n\n    # parse lines -> create raw events\n    raw = []\n    for line in lines:\n        line = line.strip()\n        if not line:\n            continue\n        sid = None\n        msg = None\n        src_ip = None\n\n        try:\n            m = FASTLOG_RE.search(line)\n            if m:\n                try:\n                    sid = int(m.group(\"sid\"))\n                except (TypeError, ValueError):\n                    sid = None\n                msg = m.group(\"msg\").strip() if m.group(\"msg\") else None\n                src_ip = m.group(\"src_ip\")\n            else:\n                m2 = IPV4_FLOW.search(line)\n                if m2:\n                    src_ip = m2.group(1)\n        except re.error as e:\n            logger.debug(f\"Regex parse error on fast.log line: {e}\")\n            continue\n\n        if not src_ip:\n            continue\n\n        try:\n            ip_obj = ipaddress.IPv4Address(src_ip)\n        except ValueError:\n            logger.debug(f\"Skipping malformed IP in fast.log: {src_ip}\")\n            continue\n        except Exception as e:\n            logger.debug(f\"Unexpected IP parse error for '{src_ip}': {e}\")\n            continue\n\n        if not include_private and (ip_obj.is_private or ip_obj.is_loopback):\n            continue\n\n        raw.append({\"ip\": str(ip_obj), \"sid\": sid, \"msg\": msg})\n\n    # dedupe\/aggregate per-ip so a single decision is made per IP per cycle\n    events_by_ip = {}\n    for r in raw:\n        ip = r[\"ip\"]\n        if ip not in events_by_ip:\n            events_by_ip[ip] = {\"ip\": ip, \"sids\": [], \"msgs\": []}\n        if r.get(\"sid\") is not None:\n            events_by_ip[ip][\"sids\"].append(r[\"sid\"])\n        if r.get(\"msg\"):\n            events_by_ip[ip][\"msgs\"].append(r[\"msg\"])\n\n    for ip, v in events_by_ip.items():\n        sid_val = v[\"sids\"][0] if v[\"sids\"] else None\n        msg_val = \"; \".join(v[\"msgs\"]) if v[\"msgs\"] else None\n        events.append({\"ip\": ip, \"sid\": sid_val, \"msg\": msg_val})\n\n    return events\n  <\/code><\/pre>\n<\/details>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<h3 class=\"wp-block-heading\">Why it\u2019s good engineering<\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li>AbuseIPDB with TTL cache + per-cycle rate limiting<\/li>\n\n\n\n<li>FireHOL CIDR matching with periodic reload<\/li>\n\n\n\n<li>Fail policy: choose fail-closed or fail-open when intel is \u201cunknown\u201d<\/li>\n\n\n\n<li>Returns explainable reason strings (great for audits &amp; debugging)<\/li>\n<\/ul>\n\n\n\n<details>\n  <summary><strong>View code (ngfw_daemon.py)<\/strong><\/summary>\n  <pre><code class=\"language-python\">\n# ---- AbuseIPDB + FireHOL ----\n_abuse_cache = {}\nABUSE_TTL = int(os.getenv(\"ABUSEIPDB_CACHE_TTL_S\", \"1800\"))\n\ndef check_abuseipdb(ip: str, allow_query: bool = True):\n    \"\"\"\n    Returns (verdict, score, source_tag)\n    source_tag = 'abuseipdb(cache)', 'abuseipdb' (live), 'abuseipdb(nolookup)' when not allowed to query,\n    or ('unknown', None, 'abuseipdb') if error\/no-key.\n    \"\"\"\n    now = datetime.now(timezone.utc)\n    if ip in _abuse_cache:\n        verdict, score, ts = _abuse_cache[ip]\n        if (now - ts).total_seconds() &lt; ABUSE_TTL:\n            return verdict, score, \"abuseipdb(cache)\"\n\n    if not ABUSEIPDB_API_KEY or not allow_query:\n        return \"unknown\", None, \"abuseipdb(nolookup)\"\n\n    try:\n        headers = {\"Key\": ABUSEIPDB_API_KEY, \"Accept\": \"application\/json\"}\n        url = f\"https:\/\/api.abuseipdb.com\/api\/v2\/check?ipAddress={ip}\"\n        r = requests.get(url, headers=headers, timeout=ABUSEIPDB_TIMEOUT_S)\n        r.raise_for_status()\n        data = r.json().get(\"data\", {})\n        score = int(data.get(\"abuseConfidenceScore\", 0))\n        verdict = \"block\" if score &gt;= ABUSEIPDB_THRESHOLD else \"clean\"\n    except requests.Timeout:\n        logger.warning(f\"AbuseIPDB timeout for {ip}\")\n        verdict, score = \"unknown\", None\n    except requests.ConnectionError:\n        logger.error(f\"AbuseIPDB connection error for {ip}\")\n        verdict, score = \"unknown\", None\n    except requests.HTTPError as e:\n        status = e.response.status_code if e.response else \"unknown\"\n        logger.error(f\"AbuseIPDB HTTP {status} for {ip}\")\n        verdict, score = \"unknown\", None\n    except ValueError:\n        logger.error(f\"Invalid JSON from AbuseIPDB for {ip}\")\n        verdict, score = \"unknown\", None\n    except Exception as e:\n        logger.exception(f\"Unexpected AbuseIPDB error for {ip}: {e}\")\n        verdict, score = \"unknown\", None\n\n    _abuse_cache[ip] = (verdict, score, now)\n    return verdict, score, \"abuseipdb\"\n\n_firehol = set()\n_firehol_loaded_at = None\n\ndef _load_firehol():\n    global _firehol, _firehol_loaded_at\n    if not FIREHOL_ENABLED:\n        return\n    try:\n        with open(FIREHOL_FILE, \"r\") as f:\n            entries = [ln.strip() for ln in f if ln.strip() and not ln.startswith(\"#\")]\n        _firehol = set(entries)\n        _firehol_loaded_at = datetime.now(timezone.utc)\n        logger.info(f\"FireHOL loaded: {len(_firehol)} entries\")\n    except FileNotFoundError:\n        logger.error(f\"FireHOL file not found: {FIREHOL_FILE}\")\n    except PermissionError:\n        logger.critical(f\"Permission denied reading FireHOL file: {FIREHOL_FILE}\")\n    except OSError as e:\n        logger.error(f\"OS error loading FireHOL: {e}\")\n    except Exception as e:\n        logger.exception(f\"Could not load FireHOL list: {e}\")\n\ndef _need_reload_firehol():\n    if not FIREHOL_ENABLED:\n        return False\n    if _firehol_loaded_at is None:\n        return True\n    try:\n        return (datetime.now(timezone.utc) - _firehol_loaded_at).total_seconds() &gt; FIREHOL_RELOAD_S\n    except Exception:\n        return True\n\ndef check_firehol(ip: str):\n    if not FIREHOL_ENABLED:\n        return \"unknown\", None, \"firehol\"\n    if _need_reload_firehol():\n        _load_firehol()\n    try:\n        ip_obj = ipaddress.IPv4Address(ip)\n        for item in _firehol:\n            try:\n                if \"\/\" in item:\n                    if ip_obj in ipaddress.IPv4Network(item, strict=False):\n                        return \"block\", None, \"firehol\"\n                else:\n                    if ip == item:\n                        return \"block\", None, \"firehol\"\n            except ValueError:\n                continue\n            except Exception:\n                continue\n        return \"clean\", None, \"firehol\"\n    except ValueError:\n        return \"unknown\", None, \"firehol\"\n    except Exception as e:\n        logger.debug(f\"Unexpected FireHOL check error for {ip}: {e}\")\n        return \"unknown\", None, \"firehol\"\n\n# ---- Consolidated reputation evaluation helper ----\ndef evaluate_reputation_and_decide(ip: str, sid=None, rule_msg=None, allow_abuse_query=True):\n    sources = {}\n    a_verdict, a_score, a_tag = check_abuseipdb(ip, allow_query=allow_abuse_query)\n    sources[\"abuseipdb\"] = (a_verdict, a_score, a_tag)\n\n    f_verdict, f_score, f_tag = check_firehol(ip)\n    sources[\"firehol\"] = (f_verdict, f_score, f_tag)\n\n    score = a_score if a_score is not None else None\n\n    if f_verdict == \"block\":\n        reason = f\"REPUTATION BLOCK (source=firehol)\"\n        return \"block\", score, reason, sources\n\n    if a_score is not None and a_score &gt;= RISK_THRESHOLD:\n        reason = f\"REPUTATION BLOCK (source=abuseipdb,score={a_score})\"\n        return \"block\", score, reason, sources\n\n    providers_total = 1 + (1 if FIREHOL_ENABLED else 0)\n    unknown_count = 0\n    provider_positive = False\n    for p, (v, s, t) in sources.items():\n        if v == \"unknown\":\n            unknown_count += 1\n        elif v == \"clean\":\n            provider_positive = True\n        elif v == \"block\":\n            provider_positive = True\n        if s is not None and s &lt; RISK_THRESHOLD:\n            provider_positive = True\n\n    if unknown_count == providers_total:\n        if FAIL_POLICY == \"closed\":\n            reason = \"REPUTATION UNKNOWN (fail-closed: all TI providers failed)\"\n            return \"block\", score, reason, sources\n        else:\n            reason = \"REPUTATION UNKNOWN (fail-open: all TI providers failed)\"\n            return \"allow\", score, reason, sources\n\n    reason_parts = []\n    if a_score is not None:\n        reason_parts.append(f\"abuseipdb(score={a_score})\")\n    else:\n        reason_parts.append(f\"abuseipdb({sources['abuseipdb'][0]})\")\n    if FIREHOL_ENABLED:\n        reason_parts.append(f\"firehol({sources['firehol'][0]})\")\n    reason = \"REPUTATION CLEAN (\" + \",\".join(reason_parts) + \")\"\n    return \"allow\", score, reason, sources\n  <\/code><\/pre>\n<\/details>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<h3 class=\"wp-block-heading\">Why it\u2019s good engineering<\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Won\u2019t duplicate rules; always checks state first<\/li>\n\n\n\n<li>Inserts at the top for deterministic behavior &amp; easy testing<\/li>\n\n\n\n<li>Persists block metadata; auto-unblocks after <code>BLOCK_EXPIRE_S<\/code><\/li>\n\n\n\n<li>Clean signal handling (SIGHUP\/SIGTERM); works nicely with logrotate<\/li>\n<\/ul>\n\n\n\n<details>\n  <summary><strong>View code (ngfw_daemon.py)<\/strong><\/summary>\n  <pre><code class=\"language-python\">\n# ---- file helpers ----\ndef ensure_file(path):\n    try:\n        if not os.path.exists(path):\n            with open(path, \"w\") as f:\n                f.write(f\"=== {os.path.basename(path)} started ===\\n\")\n            os.chmod(path, 0o600)\n    except PermissionError:\n        logger.critical(f\"Permission denied preparing {path}\")\n    except FileNotFoundError:\n        logger.error(f\"Path not found preparing {path}\")\n    except OSError as e:\n        logger.error(f\"OS error preparing {path}: {e}\")\n    except Exception as e:\n        logger.exception(f\"Could not prepare {path}: {e}\")\n\nensure_file(BLOCKS_LOG)\nensure_file(ALERT_LOG)\n\ndef log_block(ip: str, reason: str, score=None, sid=None, rule_msg=None):\n    ts = datetime.now().astimezone().strftime(\"%Y-%m-%d %H:%M:%S\")\n    score_txt = f\" (REPUTATION SCORE: {score})\" if score is not None else \"\"\n    enrich = _fmt_enrichment(sid, rule_msg)\n    line = f\"{ts} BLOCKED {ip}{score_txt} [{reason}]{enrich}\"\n    try:\n        with open(BLOCKS_LOG, \"a\") as f:\n            f.write(line + \"\\n\")\n    except PermissionError:\n        logger.critical(f\"Permission denied writing {BLOCKS_LOG}\")\n    except FileNotFoundError:\n        logger.error(f\"{BLOCKS_LOG} not found when writing block\")\n    except OSError as e:\n        logger.error(f\"OS error writing {BLOCKS_LOG}: {e}\")\n    except Exception as e:\n        logger.exception(f\"Unexpected error writing blocks.log: {e}\")\n    logger.warning(line)\n\n# ---- iptables helpers ----\ndef is_blocked(ip: str, chain: str = \"INPUT\") -> bool:\n    try:\n        return subprocess.run(\n            [IPTABLES, \"-C\", chain, \"-s\", ip, \"-j\", \"DROP\"],\n            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL\n        ).returncode == 0\n    except FileNotFoundError:\n        logger.critical(\"iptables binary not found while checking rule\")\n        return False\n    except Exception as e:\n        logger.error(f\"Unexpected error checking iptables for {ip}: {e}\")\n        return False\n\ndef add_iptables_drop(ip: str, chain: str = \"INPUT\") -> bool:\n    if is_blocked(ip, chain):\n        return False\n    try:\n        subprocess.run([IPTABLES, \"-I\", chain, \"1\", \"-s\", ip, \"-j\", \"DROP\"], check=True)\n        return True\n    except FileNotFoundError:\n        logger.critical(\"iptables binary not found \u2014 cannot enforce blocks!\")\n        return False\n    except subprocess.CalledProcessError as e:\n        logger.error(f\"iptables failed inserting rule for {ip}: {e}\")\n        return False\n    except PermissionError:\n        logger.critical(\"Permission denied executing iptables. Missing CAP_NET_ADMIN?\")\n        return False\n    except Exception as e:\n        logger.exception(f\"Unexpected error adding iptables rule for {ip}: {e}\")\n        return False\n\ndef remove_iptables_drop(ip: str, chain: str = \"INPUT\") -> bool:\n    try:\n        while subprocess.run(\n            [IPTABLES, \"-C\", chain, \"-s\", ip, \"-j\", \"DROP\"],\n            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL\n        ).returncode == 0:\n            subprocess.run([IPTABLES, \"-D\", chain, \"-s\", ip, \"-j\", \"DROP\"], check=True)\n        return True\n    except FileNotFoundError:\n        logger.critical(\"iptables binary not found \u2014 cannot remove rules!\")\n        return False\n    except subprocess.CalledProcessError as e:\n        logger.warning(f\"Failed to remove iptables rule for {ip}: {e}\")\n        return False\n    except PermissionError:\n        logger.critical(\"Permission denied executing iptables delete. Missing CAP_NET_ADMIN?\")\n        return False\n    except Exception as e:\n        logger.exception(f\"Unexpected error removing iptables rule for {ip}: {e}\")\n        return False\n\n# ---- persistent block store ----\n_blocks = {}  # ip -> {\"ts\": iso, \"reason\": str, \"score\": int|None, \"sid\": int|None, \"msg\": str|None}\n\ndef load_blocks():\n    global _blocks\n    try:\n        if os.path.exists(BLOCKS_DB) and os.path.getsize(BLOCKS_DB) &gt; 0:\n            with open(BLOCKS_DB, \"r\") as f:\n                _blocks = json.load(f)\n        else:\n            _blocks = {}\n    except json.JSONDecodeError as e:\n        logger.error(f\"Corrupted JSON in {BLOCKS_DB}: {e}\")\n        _blocks = {}\n    except PermissionError:\n        logger.critical(f\"Permission denied reading {BLOCKS_DB}\")\n        _blocks = {}\n    except FileNotFoundError:\n        logger.warning(f\"{BLOCKS_DB} not found; starting with empty block DB.\")\n        _blocks = {}\n    except OSError as e:\n        logger.error(f\"OS error reading {BLOCKS_DB}: {e}\")\n        _blocks = {}\n    except Exception as e:\n        logger.exception(f\"Unexpected error loading {BLOCKS_DB}: {e}\")\n        _blocks = {}\n\ndef save_blocks():\n    try:\n        with open(BLOCKS_DB, \"w\") as f:\n            json.dump(_blocks, f)\n    except PermissionError:\n        logger.critical(f\"Permission denied writing {BLOCKS_DB}\")\n    except FileNotFoundError:\n        logger.error(f\"{BLOCKS_DB} path not found when saving blocks DB\")\n    except (TypeError, ValueError) as e:\n        logger.error(f\"JSON serialization error saving blocks DB: {e}\")\n    except OSError as e:\n        logger.error(f\"OS error writing {BLOCKS_DB}: {e}\")\n    except Exception as e:\n        logger.exception(f\"Unexpected error persisting blocks DB: {e}\")\n\ndef record_block(ip, reason, score=None, sid=None, rule_msg=None):\n    ts = datetime.now(timezone.utc).isoformat()\n    _blocks[ip] = {\"ts\": ts, \"reason\": reason, \"score\": score, \"sid\": sid, \"msg\": rule_msg}\n    save_blocks()\n    log_block(ip, reason, score, sid, rule_msg)\n    log_alert(ip, reason, score, sid, rule_msg)\n\ndef prune_expired_blocks():\n    now = datetime.now(timezone.utc)\n    removed = []\n    for ip, meta in list(_blocks.items()):\n        try:\n            ts = datetime.fromisoformat(meta[\"ts\"])\n        except (KeyError, ValueError) as e:\n            logger.debug(f\"Malformed entry in blocks DB for {ip}: {e}\")\n            _blocks.pop(ip, None)\n            continue\n        except Exception as e:\n            logger.debug(f\"Unexpected parse error for block entry {ip}: {e}\")\n            continue\n\n        try:\n            if (now - ts).total_seconds() &gt; BLOCK_EXPIRE_S:\n                if remove_iptables_drop(ip):\n                    removed.append(ip)\n                _blocks.pop(ip, None)\n        except Exception as e:\n            logger.debug(f\"Error during expiry check\/removal for {ip}: {e}\")\n            continue\n    if removed:\n        save_blocks()\n        logger.info(f\"Unblocked expired IPs: {', '.join(removed)}\")\n  <\/code><\/pre>\n<\/details>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<h3 class=\"wp-block-heading\">Why it\u2019s good engineering<\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Adaptive polling: Normal vs High-Alert interval based on volume<\/li>\n\n\n\n<li>For each event: decide \u2192 enforce\/log with explainable reasons<\/li>\n\n\n\n<li>Safe by default: skips internal\/private IPs (toggleable for labs)<\/li>\n<\/ul>\n\n\n\n<details>\n  <summary><strong>View code (ngfw_daemon.py)<\/strong><\/summary>\n  <pre><code class=\"language-python\">\n# ---- Main loop ----\ndef main():\n    ensure_file(BLOCKS_LOG)\n    ensure_file(ALERT_LOG)\n    load_blocks()\n\n    logger.info(\"NGFW daemon starting up...\")\n    logger.info(f\"Poll interval={POLL_INTERVAL}s, High alert threshold={HIGH_ALERT_THRESHOLD}, block expiry={BLOCK_EXPIRE_S}s\")\n    logger.info(f\"Risk threshold={RISK_THRESHOLD}, AbuseIPDB threshold={ABUSEIPDB_THRESHOLD}, Fail policy={FAIL_POLICY}\")\n\n    # Quick sanity: iptables availability (non-fatal)\n    try:\n        subprocess.run([IPTABLES, \"-L\"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)\n    except FileNotFoundError:\n        logger.critical(\"iptables not found. Blocking will not work.\")\n    except Exception as e:\n        logger.debug(f\"iptables preflight error (non-fatal): {e}\")\n\n    high_alert = False\n\n    while True:\n        try:\n            prune_expired_blocks()\n            logger.debug(\"Reading fast.log incrementally...\")\n            events = extract_events_from_fastlog(include_private=True)\n\n            if not events:\n                logger.debug(\"No new events found.\")\n            else:\n                abuse_queries = 0\n                blocked_this_cycle = 0\n\n                for ev in events:\n                    try:\n                        ip = ev[\"ip\"]\n                        sid = ev.get(\"sid\")\n                        rule_msg = ev.get(\"msg\")\n\n                        if ip in LOCAL_IPS or is_private_or_reserved(ip):\n                            logger.debug(f\"Skipping internal\/reserved IP {ip}\")\n                            log_alert(ip, reason=\"INTERNAL\/PRIVATE (skipped)\", sid=sid, rule_msg=rule_msg)\n                            continue\n\n                        allow_query = abuse_queries &lt; ABUSEIPDB_MAX_PER_CYCLE\n                        decision, score, reason, sources = evaluate_reputation_and_decide(\n                            ip, sid=sid, rule_msg=rule_msg, allow_abuse_query=allow_query\n                        )\n\n                        a_tag = sources.get(\"abuseipdb\", (None, None, None))[2]\n                        if a_tag == \"abuseipdb\":\n                            abuse_queries += 1\n\n                        if decision == \"block\":\n                            if add_iptables_drop(ip):\n                                record_block(ip, reason=reason, score=score, sid=sid, rule_msg=rule_msg)\n                                blocked_this_cycle += 1\n                            else:\n                                log_alert(ip, reason=f\"{reason} (already-blocked?)\", score=score, sid=sid, rule_msg=rule_msg)\n                        else:\n                            log_alert(ip, reason=reason, score=score, sid=sid, rule_msg=rule_msg)\n                    except KeyError as e:\n                        logger.debug(f\"Event missing expected key {e}; skipping event: {ev}\")\n                        continue\n                    except Exception as e:\n                        logger.exception(f\"Unexpected error processing event {ev}: {e}\")\n                        continue\n\n                if blocked_this_cycle &gt;= HIGH_ALERT_THRESHOLD:\n                    if not high_alert:\n                        logger.warning(f\"High-Alert Mode activated! Interval={HIGH_ALERT_INTERVAL}s\")\n                    high_alert = True\n                elif high_alert and blocked_this_cycle == 0:\n                    logger.info(f\"Returning to normal polling ({POLL_INTERVAL}s)\")\n                    high_alert = False\n\n            time.sleep(HIGH_ALERT_INTERVAL if high_alert else POLL_INTERVAL)\n\n        except KeyboardInterrupt:\n            logger.info(\"Shutdown requested (KeyboardInterrupt).\")\n            break\n        except Exception as e:\n            logger.exception(f\"Main loop unexpected error: {e}\")\n            time.sleep(POLL_INTERVAL)\n\nif __name__ == \"__main__\":\n    try:\n        main()\n    except KeyboardInterrupt:\n        logger.info(\"Shutdown (KeyboardInterrupt).\")\n    except Exception as e:\n        logger.exception(f\"Fatal error: {e}\")\n        sys.exit(1)\n  <\/code><\/pre>\n<\/details>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<ul class=\"wp-block-list\">\n<li>Text logs (<code>alert.log<\/code>, <code>blocks.log<\/code>, <code>daemon.log<\/code>) for quick human scanning<\/li>\n\n\n\n<li>JSONL (<code>alerts.json<\/code>) for SIEM pipelines (tail-friendly, line-oriented)<\/li>\n\n\n\n<li>Enriched fields: timestamp, offender IP, reputation score, TI source, SID, rule message, action<\/li>\n<\/ul>\n\n\n\n<p><strong>Example<\/strong><\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>2025-09-02 11:24:45 BLOCKED 10.0.0.1 (REPUTATION SCORE: 0) &#91;REPUTATION BLOCK (source=firehol)] SID=2000004 MSG=\"TEST ICMP Ping Detected\"\n<\/code><\/pre>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<ul class=\"wp-block-list\">\n<li>Parsing edge cases: Suricata <code>fast.log<\/code> variants, partial lines, regex mismatches<\/li>\n\n\n\n<li>Offset correctness: no duplicate reads post-rotation; safe recovery on truncation<\/li>\n\n\n\n<li>Intel timing &amp; quotas: caching, timeouts, and per-cycle caps to keep the loop responsive<\/li>\n\n\n\n<li>Operator ergonomics: SIGHUP-driven log reopen for logrotate; clear reason strings; top-insertion iptables rules to simplify testing\/removal<\/li>\n\n\n\n<li>Lab realism: toggleable behavior (skip\/allow private IPs) to safely simulate internal threats<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Useful:<\/strong> Converts IDS alerts into actionable blocks immediately, with transparent reasoning and solid audit logs<\/li>\n\n\n\n<li><strong>Efficient:<\/strong> Incremental reads, cache-first TI, rate-limited lookups, deduped events, and adaptive polling under load<\/li>\n\n\n\n<li><strong>Impressive:<\/strong> A cohesive pipeline\u2014ingestion \u2192 enrichment \u2192 decision \u2192 enforcement \u2192 expiry\u2014plus production-aware details (signals, rotation, state)<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<ol class=\"wp-block-list\">\n<li>Suricata provides raw detection via <code>fast.log<\/code><\/li>\n\n\n\n<li><code>ngfw_daemon.py<\/code> consumes \u2192 enriches \u2192 enforces<\/li>\n\n\n\n<li>Logs and state live in <code>logs_and_utilities\/<\/code> for visibility &amp; SIEM<\/li>\n\n\n\n<li><code>post_server.py<\/code> helps simulate malicious POSTs to test blocking<br><em>(Setup and path specifics are documented in the repo README.)<\/em><\/li>\n<\/ol>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<ul class=\"wp-block-list\">\n<li>IPv4-only (no IPv6 yet)<\/li>\n\n\n\n<li><code>iptables<\/code> backend (not <code>nftables<\/code>\/<code>ufw<\/code>)<\/li>\n\n\n\n<li>Single-host focus (not distributed)<\/li>\n\n\n\n<li>Best for home labs \/ learning, not production-critical use<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"shortcomings\">Shortcomings observed in testing (and how I\u2019d fix them)<\/h2>\n\n\n\n<p><strong>What I saw:<\/strong> During testing, a few ICMP pings (about 6\u20137) got through before the IP was blocked. Similarly, a suspicious HTTP POST could complete and then the source was blocked shortly after.<\/p>\n\n\n\n<p><strong>Why this happens (expected trade-off):<\/strong> The pipeline is reactive by design: Suricata detects \u2192 writes a log line \u2192 my daemon reads on its next cycle \u2192 optional threat-intel lookup \u2192 iptables rule insertion. Latency can come from:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Detection timing:<\/strong> Some Suricata signatures (especially HTTP\/app-layer) alert <em>after<\/em> enough of the request is parsed, so the first request may complete before an alert exists.<\/li>\n\n\n\n<li><strong>Log buffering:<\/strong> <code>fast.log<\/code> lines aren\u2019t always immediately readable; flush\/buffering adds delay.<\/li>\n\n\n\n<li><strong>Daemon polling:<\/strong> The loop sleeps between checks, so alerts found just after a read wait until the next cycle.<\/li>\n\n\n\n<li><strong>Threat-intel latency:<\/strong> First-time AbuseIPDB lookups add network time; timeouts\/rate limits can push evaluation to a later cycle.<\/li>\n\n\n\n<li><strong>Rule insertion race:<\/strong> There\u2019s a brief gap between deciding to block and the new DROP rule taking effect.<\/li>\n<\/ul>\n\n\n\n<p><strong>How I\u2019d reduce the delay next:<\/strong><\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Go event-driven<\/strong> on Suricata output (watch the file and react on each new line) or <strong>shrink the poll interval<\/strong> substantially to tighten the loop.<\/li>\n\n\n\n<li><strong>Provisional block first, enrich after:<\/strong> Immediately add a short-TTL DROP on first alert, then confirm with threat intel; remove the block quickly if intel looks clean.<\/li>\n\n\n\n<li><strong>Read <code>eve.json<\/code> instead of <code>fast.log<\/code>:<\/strong> Stream line-oriented JSON for lower latency and richer fields.<\/li>\n\n\n\n<li><strong>Trim TI overhead:<\/strong> aggressive caching, sub-second timeouts, and skipping TI for clearly malicious signatures.<\/li>\n\n\n\n<li><strong>Verify chain precedence:<\/strong> keep inserting at <code>INPUT<\/code> position 1 (or the correct chain for your traffic path) so the DROP wins deterministically.<\/li>\n\n\n\n<li><strong>If zero-tolerance is required:<\/strong> consider Suricata in <em>inline IPS<\/em> mode (NFQUEUE) to drop the very first offending packet, instead of IDS-&gt;iptables reaction.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<ul class=\"wp-block-list\">\n<li>How Suricata logs (like <code>fast.log<\/code>) work under the hood<\/li>\n\n\n\n<li>Parsing + enriching logs for humans and SIEMs<\/li>\n\n\n\n<li>Balancing fail-open vs fail-closed when TI providers are unavailable<\/li>\n\n\n\n<li>The importance of log rotation, structured state, and safe test traffic<\/li>\n\n\n\n<li>Trial-and-error to reach meaningful enriched logs (SIDs, reasons, timestamps)<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n\n\n<p>I used AI (ChatGPT) as a coding and design assistant to help debug, refine, and extend my original concept. The first version worked, but it lacked the more complex behaviors I wanted. With AI as a sounding board, I iterated toward a more robust, explainable daemon. All integration, thresholds, and testing are mine, I reviewed and verified every change.<\/p>\n\n\n\n<h5 class=\"wp-block-heading\">Where AI helped<\/h5>\n\n\n\n<p>\u2022 Fast, resilient parsing: shaping the Suricata fast.log regex (FASTLOG_RE) plus a fallback IPv4 flow regex (IPV4_FLOW), with defensive handling for regex errors and malformed IPs.<br>\u2022 Incremental reading with rotation safety: the offset file pattern (_read_offset\/_write_offset), reset-on-truncation, and broad exception handling so the daemon keeps running.<br>\u2022 Threat intel workflow: cache-first AbuseIPDB with a TTL, strict timeouts, explicit HTTP\/JSON error paths, and FireHOL CIDR matching with periodic reload.<br>\u2022 Policy logic &amp; explainability: a consolidated decision helper (evaluate_reputation_and_decide) that returns clear reason strings (\u201cREPUTATION BLOCK\u2026\u201d, \u201cREPUTATION CLEAN\u2026\u201d) and supports fail-open\/fail-closed.<br>\u2022 Idempotent firewall control: iptables helpers that check for existing rules (-C) before inserting at the top (-I INPUT 1), plus safe looped removal and robust error handling.<br>\u2022 State &amp; expiry: a small JSON store for block metadata (ISO timestamps) with prune_expired_blocks that gracefully handles malformed entries.<br>\u2022 Logging for humans &amp; SIEM: dual outputs (text + JSONL) enriched with SID, rule message, TI source, reputation score; consistent, grep-friendly formatting.<br>\u2022 Operational polish: high-alert vs normal polling, per-cycle rate limiting for AbuseIPDB, and notes for clean log rotation\/signal handling.<\/p>\n\n\n\n<h5 class=\"wp-block-heading\">Attribution &amp; responsibility<\/h5>\n\n\n\n<ul class=\"wp-block-list\">\n<li>I used AI to brainstorm designs, draft snippets, and review edge cases; it did not run in my environment. I executed, validated, and tuned everything.<\/li>\n\n\n\n<li>Security choices, thresholds, and trade-offs are my own; I treat AI output as suggestions that require audit and testing.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<div class=\"wp-block-stackable-heading stk-block-heading stk-block-heading--v2 stk-block stk-d9a625e\" id=\"final-thoughts\" data-block-id=\"d9a625e\"><h2 class=\"stk-block-heading__text\">Final Thoughts<\/h2><\/div>\n\n\n\n<p>This isn\u2019t a perfect NGFW replacement. It\u2019s my hands-on exploration of IDS + firewall integration:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Show how detections can trigger automated responses<\/li>\n\n\n\n<li>Build a working security daemon with operational polish<\/li>\n\n\n\n<li>Grow skills in Python, Linux security, and network defense<\/li>\n<\/ul>\n\n\n\n<p>\ud83d\udc49 <strong>Full code &amp; setup:<\/strong> <a href=\"https:\/\/github.com\/yairemartinez\/ngfw-daemon?utm_source=chatgpt.com\">https:\/\/github.com\/yairemartinez\/ngfw-daemon<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Title: NGFW Daemon \u2014 Suricata-Driven Reactive Firewall (IPv4, iptables) Quick take: a Python daemon I built that listens to Suricata\u2019s fast.log, enriches alerts with threat intel (AbuseIPDB + FireHOL), and automatically blocks attacker IPs via iptables. It writes human-readable logs and JSONL for SIEMs. This is a home-lab \/ learning project\u2014intentionally simple, IPv4-only, iptables-based. Stack: [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":1296,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[40,1,37],"tags":[56,41,49,48,58,52,57,51,50,44,47],"class_list":["post-1075","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-blue-team","category-projects","category-python","tag-api","tag-blue-team","tag-daemon","tag-ids","tag-ips","tag-json","tag-log-enrichment","tag-logging","tag-parsing","tag-python","tag-suricata"],"_links":{"self":[{"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/posts\/1075","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1075"}],"version-history":[{"count":50,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/posts\/1075\/revisions"}],"predecessor-version":[{"id":1303,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/posts\/1075\/revisions\/1303"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=\/wp\/v2\/media\/1296"}],"wp:attachment":[{"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1075"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1075"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/yairmartinezcybersecurityportfolio.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1075"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}