Skip to main content

Real-time network packet inspection and threat signature matching for Python-based IDS/IPS pipelines

Project description

threatwire

Real-time network packet inspection and threat signature matching for Python-based IDS/IPS pipelines.

Python 3.9+ License: MIT MITRE ATT&CK


Building a network-level threat detector in Python means stitching together scapy for capture, custom parsers for protocols, and a bespoke signature engine. There is no single library that goes from raw packets to threat alerts in one pipeline — so teams reinvent the same infrastructure for every project.

threatwire provides a streaming packet analysis pipeline with protocol decoders (DNS, HTTP, TLS, SMB), a pluggable signature engine with a built-in IOC ruleset, and an event bus for routing alerts to SIEM, Slack, or any custom handler.


Installation

# Core library (no capture dependencies)
pip install threatwire

# With live capture support
pip install threatwire[capture]

# With fast Aho-Corasick pattern matching
pip install threatwire[fast]

# Everything
pip install threatwire[all]

Quick Start

Live capture pipeline

from threatwire import ThreatPipeline
from threatwire.core.models import AlertSeverity

pipeline = ThreatPipeline(
    interface="eth0",
    bpf_filter="tcp or udp",
    enable_builtin_rules=True,
)

@pipeline.on_alert(severity="high")
def handle_threat(alert):
    print(f"[{alert.severity.value.upper()}] {alert.rule_name}")
    print(f"  {alert.src_ip}{alert.dst_ip}")
    print(f"  Technique: {alert.technique_id}")
    print(f"  Confidence: {alert.confidence:.0%}")

pipeline.run()  # blocks; Ctrl-C to stop

PCAP analysis

from threatwire import PacketStreamer, SignatureEngine

engine = SignatureEngine(enable_builtin=True)
streamer = PacketStreamer(pcap_file="/captures/suspicious.pcap")

for packet in streamer.stream():
    alert = engine.match(packet)
    if alert:
        print(alert.to_ecs())   # Elastic Common Schema output

Individual modules

from threatwire import PacketStreamer, SignatureEngine, ThreatEventBus

# 1. Capture
streamer = PacketStreamer(interface="eth0", bpf_filter="tcp port 80")

# 2. Detection
engine = SignatureEngine(enable_builtin=True)

# 3. Routing
bus = ThreatEventBus(dedup_window=30)

@bus.subscribe(severity="critical")
async def on_critical(alert):
    await siem.ingest(alert.to_ecs())

@bus.subscribe(severity="medium", tactic_ids=["TA0011"])
def on_c2(alert):
    slack.notify(f"C2 beacon detected: {alert.src_ip}")

bus.start()

for packet in streamer.stream():
    alerts = engine.match_all([packet])
    bus.publish_many(alerts)

Modules

PacketStreamer

Live and PCAP-file packet ingestion with BPF filter support. Decodes Ethernet → IP → TCP/UDP/ICMP into structured Python objects, normalizing fragmented streams and out-of-order segments via StreamReassembler before analysis.

Attack scenario: Slow SYN scan (1 pkt/sec) to evade threshold detectors + DNS C2 tunneling. How it detects it: Reconstructs the full TCP state machine, flags SYN-without-ACK patterns across a low-rate stream that naive per-packet detectors miss entirely.

streamer = PacketStreamer(
    interface="eth0",
    bpf_filter="tcp or udp",
    reconstruct_streams=True,   # enables slow-scan detection
    flow_timeout=120.0,
)

for packet in streamer.stream():
    if streamer.is_slow_syn_scan(packet.src_ip):
        print(f"Slow SYN scan from {packet.src_ip}")

SignatureEngine

Matches packet payloads and flow metadata against 1,200+ built-in rules using Aho-Corasick multi-pattern matching. Supports Suricata rule syntax for external rule imports.

Attack scenario: Emotet beacons via HTTP POST with randomised User-Agent strings but a predictable 300-second interval and fixed URI structure. How it detects it: Matches URI structure pattern and beaconing interval simultaneously — something a pure payload or pure frequency detector would miss independently.

Built-in rules cover:

  • DNS C2 tunneling (dnscat2, iodine, dns2tcp)
  • HTTP C2 beaconing (Emotet, Cobalt Strike, Meterpreter)
  • SMB exploits (EternalBlue, brute force)
  • Credential theft (DCSync, Kerberoasting)
  • Ransomware IOCs
  • Exploit kit patterns
  • TLS anomalies
engine = SignatureEngine(
    enable_builtin=True,
    rule_path="/etc/threatwire/rules",        # custom JSON rules
    suricata_rules="/etc/suricata/emerging.rules",
    min_severity=AlertSeverity.MEDIUM,
)

alert = engine.match(packet)
if alert:
    print(alert.severity, alert.technique_id, alert.confidence)

ThreatEventBus

Pub/sub event router with alert deduplication, severity-based routing, async handler support, and Elastic Common Schema output.

Attack scenario: DDoS amplification generates 50,000 UDP alerts/sec, flooding handlers. How it handles it: Deduplicates amplification into rolling summary events, ensuring critical lateral-movement alerts route immediately without being buried in volumetric noise.

bus = ThreatEventBus(
    dedup_window=30.0,
    volume_threshold=100,         # collapses to summary after 100 identical alerts
    max_queue_size=10_000,
)

# Sync handler
@bus.subscribe(severity="high")
def on_high(alert):
    db.insert(alert.to_dict())

# Async handler
@bus.subscribe(severity="critical")
async def on_critical(alert):
    await pagerduty.trigger(alert.rule_name, alert.src_ip)

# Rule-specific handler
@bus.subscribe(severity="medium", rule_ids=["TW-SMB-001"])
def on_eternalblue(alert):
    isolate_host(alert.src_ip)

Custom Rules

Rules are plain Python dataclasses or JSON files:

from threatwire.core.signature_engine import Rule
from threatwire.core.models import AlertSeverity

rule = Rule(
    rule_id="ORG-001",
    name="Plaintext password POST",
    severity=AlertSeverity.CRITICAL,
    description="Detects plaintext password fields in HTTP POST bodies.",
    technique_id="T1552",
    tactic_id="TA0006",
    tactic_name="Credential Access",
    payload_patterns=[b"password=", b"passwd="],
    regex_patterns=[r"password=[^&\s]{6,}"],
    protocols=["tcp", "http"],
    dst_ports=[80, 8080],
    base_confidence=0.9,
)
engine.add_rule(rule)

JSON format (for rule files in rule_path/):

[
  {
    "rule_id": "ORG-001",
    "name": "Plaintext password POST",
    "severity": "critical",
    "technique_id": "T1552",
    "tactic_id": "TA0006",
    "payload_patterns": ["password=", "passwd="],
    "protocols": ["tcp", "http"],
    "dst_ports": [80, 8080],
    "base_confidence": 0.9
  }
]

Alert Output

Every ThreatAlert can be serialized to dict or Elastic Common Schema:

alert.to_dict()   # plain Python dict
alert.to_ecs()    # Elastic Common Schema (ECS 8.x compatible)

ECS output example:

{
  "@timestamp": 1714000000.0,
  "event": { "kind": "alert", "severity": 5, "risk_score": 95 },
  "rule": { "id": "TW-DNS-002", "name": "DNS tunneling — dnscat2 signature" },
  "threat": {
    "technique": { "id": "T1071.004" },
    "tactic": { "id": "TA0011", "name": "Command and Control" },
    "framework": "MITRE ATT&CK"
  },
  "source": { "ip": "192.168.1.100", "port": 54321 },
  "destination": { "ip": "185.10.10.1", "port": 53 }
}

Handlers

Ready-made handlers for common destinations:

from threatwire.handlers import FileHandler, SlackHandler, SIEMHandler, LoggingHandler

# JSONL file (with rotation)
bus.add_handler(FileHandler("/var/log/threatwire/alerts.jsonl").handle)

# Slack (high+ only)
bus.add_handler(SlackHandler(webhook_url="https://hooks.slack.com/...").handle, severity="high")

# Elasticsearch
bus.add_handler(SIEMHandler(endpoint="https://es:9200/alerts", api_key="...").handle)

# Python logging
bus.add_handler(LoggingHandler().handle)

MITRE ATT&CK Coverage

Tactic ID Techniques Covered
Reconnaissance TA0043 T1046 (Network Scan)
Initial Access TA0001 T1189 (Exploit Kit)
Execution TA0002 T1059 (Scripting)
Credential Access TA0006 T1003.006 (DCSync), T1110 (Brute Force), T1558.003 (Kerberoasting)
Lateral Movement TA0008 T1210 (EternalBlue)
Command & Control TA0011 T1071.001 (HTTP), T1071.004 (DNS), T1090.003 (Tor)
Impact TA0040 T1486 (Ransomware)

Development

git clone https://github.com/ontedduabhishakereddy/threatwire
cd threatwire
pip install -e ".[dev]"

# Run tests
pytest

# Run with coverage
pytest --cov=threatwire --cov-report=html

# Lint
ruff check threatwire/

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threatwire-1.0.0.tar.gz (35.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threatwire-1.0.0-py3-none-any.whl (38.0 kB view details)

Uploaded Python 3

File details

Details for the file threatwire-1.0.0.tar.gz.

File metadata

  • Download URL: threatwire-1.0.0.tar.gz
  • Upload date:
  • Size: 35.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for threatwire-1.0.0.tar.gz
Algorithm Hash digest
SHA256 e7bb7dfc2e783131afb5b83ae380e396d3df68b4c0448df3581ad8300e9c38ee
MD5 da7f8d000180814be4cf83b6b6010750
BLAKE2b-256 eb91290bf60d184b8e661761490ab83d20dc446474890a507cdd7e8992833883

See more details on using hashes here.

File details

Details for the file threatwire-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: threatwire-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 38.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for threatwire-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 27d6c6e30b4d4126693484664f0f764c99cdfe38100a5b7fa02988d8aff88e64
MD5 efa2d8740136385e46240b26eb10eae2
BLAKE2b-256 c07596f4280900aee9b10e4bbb0c4af948cc05b1c94e54b2de6a07ef589e87c6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page