Real-time network packet inspection and threat signature matching for Python-based IDS/IPS pipelines
Project description
threatwire
Real-time network packet inspection and threat signature matching for Python-based IDS/IPS pipelines.
Building a network-level threat detector in Python means stitching together scapy for capture, custom parsers for protocols, and a bespoke signature engine. There is no single library that goes from raw packets to threat alerts in one pipeline — so teams reinvent the same infrastructure for every project.
threatwire provides a streaming packet analysis pipeline with protocol decoders (DNS, HTTP, TLS, SMB), a pluggable signature engine with a built-in IOC ruleset, and an event bus for routing alerts to SIEM, Slack, or any custom handler.
Installation
# Core library (no capture dependencies)
pip install threatwire
# With live capture support
pip install threatwire[capture]
# With fast Aho-Corasick pattern matching
pip install threatwire[fast]
# Everything
pip install threatwire[all]
Quick Start
Live capture pipeline
from threatwire import ThreatPipeline
from threatwire.core.models import AlertSeverity
pipeline = ThreatPipeline(
interface="eth0",
bpf_filter="tcp or udp",
enable_builtin_rules=True,
)
@pipeline.on_alert(severity="high")
def handle_threat(alert):
print(f"[{alert.severity.value.upper()}] {alert.rule_name}")
print(f" {alert.src_ip} → {alert.dst_ip}")
print(f" Technique: {alert.technique_id}")
print(f" Confidence: {alert.confidence:.0%}")
pipeline.run() # blocks; Ctrl-C to stop
PCAP analysis
from threatwire import PacketStreamer, SignatureEngine
engine = SignatureEngine(enable_builtin=True)
streamer = PacketStreamer(pcap_file="/captures/suspicious.pcap")
for packet in streamer.stream():
alert = engine.match(packet)
if alert:
print(alert.to_ecs()) # Elastic Common Schema output
Individual modules
from threatwire import PacketStreamer, SignatureEngine, ThreatEventBus
# 1. Capture
streamer = PacketStreamer(interface="eth0", bpf_filter="tcp port 80")
# 2. Detection
engine = SignatureEngine(enable_builtin=True)
# 3. Routing
bus = ThreatEventBus(dedup_window=30)
@bus.subscribe(severity="critical")
async def on_critical(alert):
await siem.ingest(alert.to_ecs())
@bus.subscribe(severity="medium", tactic_ids=["TA0011"])
def on_c2(alert):
slack.notify(f"C2 beacon detected: {alert.src_ip}")
bus.start()
for packet in streamer.stream():
alerts = engine.match_all([packet])
bus.publish_many(alerts)
Modules
PacketStreamer
Live and PCAP-file packet ingestion with BPF filter support. Decodes Ethernet → IP → TCP/UDP/ICMP into structured Python objects, normalizing fragmented streams and out-of-order segments via StreamReassembler before analysis.
Attack scenario: Slow SYN scan (1 pkt/sec) to evade threshold detectors + DNS C2 tunneling. How it detects it: Reconstructs the full TCP state machine, flags SYN-without-ACK patterns across a low-rate stream that naive per-packet detectors miss entirely.
streamer = PacketStreamer(
interface="eth0",
bpf_filter="tcp or udp",
reconstruct_streams=True, # enables slow-scan detection
flow_timeout=120.0,
)
for packet in streamer.stream():
if streamer.is_slow_syn_scan(packet.src_ip):
print(f"Slow SYN scan from {packet.src_ip}")
SignatureEngine
Matches packet payloads and flow metadata against 1,200+ built-in rules using Aho-Corasick multi-pattern matching. Supports Suricata rule syntax for external rule imports.
Attack scenario: Emotet beacons via HTTP POST with randomised User-Agent strings but a predictable 300-second interval and fixed URI structure. How it detects it: Matches URI structure pattern and beaconing interval simultaneously — something a pure payload or pure frequency detector would miss independently.
Built-in rules cover:
- DNS C2 tunneling (dnscat2, iodine, dns2tcp)
- HTTP C2 beaconing (Emotet, Cobalt Strike, Meterpreter)
- SMB exploits (EternalBlue, brute force)
- Credential theft (DCSync, Kerberoasting)
- Ransomware IOCs
- Exploit kit patterns
- TLS anomalies
engine = SignatureEngine(
enable_builtin=True,
rule_path="/etc/threatwire/rules", # custom JSON rules
suricata_rules="/etc/suricata/emerging.rules",
min_severity=AlertSeverity.MEDIUM,
)
alert = engine.match(packet)
if alert:
print(alert.severity, alert.technique_id, alert.confidence)
ThreatEventBus
Pub/sub event router with alert deduplication, severity-based routing, async handler support, and Elastic Common Schema output.
Attack scenario: DDoS amplification generates 50,000 UDP alerts/sec, flooding handlers. How it handles it: Deduplicates amplification into rolling summary events, ensuring critical lateral-movement alerts route immediately without being buried in volumetric noise.
bus = ThreatEventBus(
dedup_window=30.0,
volume_threshold=100, # collapses to summary after 100 identical alerts
max_queue_size=10_000,
)
# Sync handler
@bus.subscribe(severity="high")
def on_high(alert):
db.insert(alert.to_dict())
# Async handler
@bus.subscribe(severity="critical")
async def on_critical(alert):
await pagerduty.trigger(alert.rule_name, alert.src_ip)
# Rule-specific handler
@bus.subscribe(severity="medium", rule_ids=["TW-SMB-001"])
def on_eternalblue(alert):
isolate_host(alert.src_ip)
Custom Rules
Rules are plain Python dataclasses or JSON files:
from threatwire.core.signature_engine import Rule
from threatwire.core.models import AlertSeverity
rule = Rule(
rule_id="ORG-001",
name="Plaintext password POST",
severity=AlertSeverity.CRITICAL,
description="Detects plaintext password fields in HTTP POST bodies.",
technique_id="T1552",
tactic_id="TA0006",
tactic_name="Credential Access",
payload_patterns=[b"password=", b"passwd="],
regex_patterns=[r"password=[^&\s]{6,}"],
protocols=["tcp", "http"],
dst_ports=[80, 8080],
base_confidence=0.9,
)
engine.add_rule(rule)
JSON format (for rule files in rule_path/):
[
{
"rule_id": "ORG-001",
"name": "Plaintext password POST",
"severity": "critical",
"technique_id": "T1552",
"tactic_id": "TA0006",
"payload_patterns": ["password=", "passwd="],
"protocols": ["tcp", "http"],
"dst_ports": [80, 8080],
"base_confidence": 0.9
}
]
Alert Output
Every ThreatAlert can be serialized to dict or Elastic Common Schema:
alert.to_dict() # plain Python dict
alert.to_ecs() # Elastic Common Schema (ECS 8.x compatible)
ECS output example:
{
"@timestamp": 1714000000.0,
"event": { "kind": "alert", "severity": 5, "risk_score": 95 },
"rule": { "id": "TW-DNS-002", "name": "DNS tunneling — dnscat2 signature" },
"threat": {
"technique": { "id": "T1071.004" },
"tactic": { "id": "TA0011", "name": "Command and Control" },
"framework": "MITRE ATT&CK"
},
"source": { "ip": "192.168.1.100", "port": 54321 },
"destination": { "ip": "185.10.10.1", "port": 53 }
}
Handlers
Ready-made handlers for common destinations:
from threatwire.handlers import FileHandler, SlackHandler, SIEMHandler, LoggingHandler
# JSONL file (with rotation)
bus.add_handler(FileHandler("/var/log/threatwire/alerts.jsonl").handle)
# Slack (high+ only)
bus.add_handler(SlackHandler(webhook_url="https://hooks.slack.com/...").handle, severity="high")
# Elasticsearch
bus.add_handler(SIEMHandler(endpoint="https://es:9200/alerts", api_key="...").handle)
# Python logging
bus.add_handler(LoggingHandler().handle)
MITRE ATT&CK Coverage
| Tactic | ID | Techniques Covered |
|---|---|---|
| Reconnaissance | TA0043 | T1046 (Network Scan) |
| Initial Access | TA0001 | T1189 (Exploit Kit) |
| Execution | TA0002 | T1059 (Scripting) |
| Credential Access | TA0006 | T1003.006 (DCSync), T1110 (Brute Force), T1558.003 (Kerberoasting) |
| Lateral Movement | TA0008 | T1210 (EternalBlue) |
| Command & Control | TA0011 | T1071.001 (HTTP), T1071.004 (DNS), T1090.003 (Tor) |
| Impact | TA0040 | T1486 (Ransomware) |
Development
git clone https://github.com/ontedduabhishakereddy/threatwire
cd threatwire
pip install -e ".[dev]"
# Run tests
pytest
# Run with coverage
pytest --cov=threatwire --cov-report=html
# Lint
ruff check threatwire/
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file threatwire-1.0.0.tar.gz.
File metadata
- Download URL: threatwire-1.0.0.tar.gz
- Upload date:
- Size: 35.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e7bb7dfc2e783131afb5b83ae380e396d3df68b4c0448df3581ad8300e9c38ee
|
|
| MD5 |
da7f8d000180814be4cf83b6b6010750
|
|
| BLAKE2b-256 |
eb91290bf60d184b8e661761490ab83d20dc446474890a507cdd7e8992833883
|
File details
Details for the file threatwire-1.0.0-py3-none-any.whl.
File metadata
- Download URL: threatwire-1.0.0-py3-none-any.whl
- Upload date:
- Size: 38.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
27d6c6e30b4d4126693484664f0f764c99cdfe38100a5b7fa02988d8aff88e64
|
|
| MD5 |
efa2d8740136385e46240b26eb10eae2
|
|
| BLAKE2b-256 |
c07596f4280900aee9b10e4bbb0c4af948cc05b1c94e54b2de6a07ef589e87c6
|