Skip to main content

High-performance network packet manipulation with Rust and Python

Project description

Stackforge

CI PyPI Crates.io License: GPL-3.0

Stackforge is a high-performance networking stack written in Rust with Python bindings. It provides Scapy-like packet manipulation with native Rust performance — build, parse, and inspect network packets using a familiar / stacking syntax.

Features

  • Scapy-style API — Stack layers with Ether() / IP() / TCP(), set fields with keyword arguments
  • High Performance — Core logic in Rust, zero-copy parsing, copy-on-write mutation
  • Broad Protocol Support — Ethernet, ARP, IPv4/IPv6, TCP, UDP, ICMP/ICMPv6 (with echo correlation), DNS, HTTP/1.x, HTTP/2, QUIC, L2TP, 802.11 (Wi-Fi), 802.15.4 (Zigbee), and custom protocols
  • Stateful Flow Extraction — Extract bidirectional conversations from PCAP files with TCP state tracking, stream reassembly, UDP timeout handling, and optional max packet/flow length tracking
  • PCAP I/O — Read and write pcap files with rdpcap() / wrpcap()
  • Python Bindings — Seamless integration via PyO3/maturin
  • Custom Protocols — Define runtime protocols with CustomLayer and typed fields

Installation

pip install stackforge

Or with uv:

uv add stackforge

Quick Start

Build and send packets

from stackforge import Ether, IP, TCP, UDP, ICMP, Raw

# TCP SYN packet
pkt = Ether(dst="ff:ff:ff:ff:ff:ff") / IP(dst="192.168.1.1") / TCP(dport=80, flags="S")
print(pkt.show())

# UDP DNS query
pkt = Ether() / IP(dst="8.8.8.8") / UDP(dport=53)

# ICMP echo request
pkt = Ether() / IP(dst="10.0.0.1") / ICMP.echo_request(id=0x1234, seq=1)

# Packet with raw payload
pkt = Ether() / IP(dst="10.0.0.1") / TCP(dport=80) / Raw(load=b"GET / HTTP/1.1\r\n")

Build to bytes

stack = Ether() / IP(dst="10.0.0.1") / TCP(dport=443, flags="S")

# Build into a Packet object
pkt = stack.build()

# Or get raw bytes directly
raw = stack.bytes()

Parse packets from bytes

from stackforge import Packet, LayerKind

raw_bytes = b"\xff\xff..."  # raw packet bytes
pkt = Packet(raw_bytes)
pkt.parse()

print(pkt.layer_count)                  # 3
print(pkt.has_layer(LayerKind.Tcp))     # True
print(pkt.summary())                    # "Ethernet / IPv4 / TCP"
print(pkt.show())                       # detailed layer view

Read and write PCAP files

from stackforge import rdpcap, wrpcap, PcapReader, Ether, IP, TCP

# Write packets to a pcap file
packets = [
    Ether() / IP(dst="192.168.1.1") / TCP(dport=80, flags="S"),
    Ether() / IP(dst="10.0.0.1") / TCP(dport=443, flags="SA"),
]
wrpcap("capture.pcap", packets)

# Read all packets at once
packets = rdpcap("capture.pcap")
for pkt in packets:
    print(pkt.summary())

# Stream large pcap files
for pkt in PcapReader("large_capture.pcap"):
    print(pkt.summary())

Protocol Reference

Layer Builders

from stackforge import Ether, IP, IPv6, TCP, UDP, ARP, ICMP, ICMPv6, DNS, Raw

# Ethernet
Ether(dst="aa:bb:cc:dd:ee:ff", src="11:22:33:44:55:66")

# IPv4
IP(src="10.0.0.1", dst="192.168.1.100", ttl=128)

# IPv6
IPv6(src="::1", dst="2001:db8::1", hlim=64)

# TCP
TCP(sport=12345, dport=443, flags="SA", seq=1000, ack=2000)

# UDP
UDP(sport=5000, dport=53)

# ARP
ARP(op="who-has", pdst="192.168.1.100")
ARP(op="is-at", pdst="192.168.1.100")

# ICMP
ICMP(type=8, code=0)
ICMP.echo_request(id=0x1234, seq=1)
ICMP.echo_reply(id=0xABCD, seq=42)
ICMP.dest_unreach(code=3)
ICMP.redirect(code=1, gateway="10.0.0.1")
ICMP.time_exceeded(code=0)

# ICMPv6
ICMPv6(type=128, code=0)           # echo request

# DNS
DNS(id=0x1234, qr=0, rd=1)         # query

# Raw payload
Raw(load=b"Hello")
Raw.from_hex("deadbeef")
Raw.zeros(10)
Raw.repeat(0x41, 5)                 # b"AAAAA"
Raw.pattern(b"AB", 7)              # b"ABABABA"

Field Access

from stackforge import Packet, LayerKind

pkt = Packet(raw_bytes)
pkt.parse()

# Generic field access (searches all layers)
print(pkt.src)
print(pkt.dport)

# Layer-specific field access (use when field name exists in multiple layers)
dns_id = pkt.getfieldval(LayerKind.Dns, "id")
ip_id  = pkt.getfieldval(LayerKind.Ipv4, "id")

# Introspect available fields
print(pkt.fields)                          # list of all field names

# Layer presence and bytes
print(pkt.has_layer(LayerKind.Http))
print(pkt.get_layer_bytes(LayerKind.Http))

Custom Protocols

from stackforge.custom import CustomLayer, ByteField, ShortField, IntField, StrLenField

class MyHeader(CustomLayer):
    name = "MyHeader"
    fields_desc = [
        ByteField("version", default=1),
        ShortField("length", default=0),
        IntField("magic", default=0xDEADBEEF),
        StrLenField("payload", default=b"", length_from=lambda pkt: pkt.length),
    ]

pkt = Ether() / IP() / UDP(dport=9999) / MyHeader(version=2, magic=0xCAFEBABE)

HTTP/1.x

from stackforge import Packet, LayerKind

# HTTP is auto-detected on TCP ports 80, 8080, 8000, 8008, 8888
pkt = Packet(raw_bytes)
pkt.parse()

if pkt.has_layer(LayerKind.Http):
    print(pkt.get_layer_bytes(LayerKind.Http))

HTTP/2

# HTTP/2 is auto-detected via the client preface magic bytes on TCP
pkt = Packet(raw_bytes)
pkt.parse()

if pkt.has_layer(LayerKind.Http2):
    print(pkt.summary())   # "Ethernet / IPv4 / TCP / HTTP2"

QUIC

# QUIC is auto-detected on UDP ports 443 / 4433 via the Fixed Bit
pkt = Packet(raw_bytes)
pkt.parse()

if pkt.has_layer(LayerKind.Quic):
    print(pkt.getfieldval(LayerKind.Quic, "dst_conn_id"))
    print(pkt.getfieldval(LayerKind.Quic, "packet_number"))

802.11 (Wi-Fi)

# Dot11 frames are parsed directly (not over Ethernet)
from stackforge import Packet, LayerKind

pkt = Packet(raw_bytes)
pkt.parse()   # expects radiotap + Dot11 frame

print(pkt.has_layer(LayerKind.Dot11))

802.15.4 (Zigbee)

# Dot15d4 frames include optional CRC-16 (CCITT Kermit)
pkt = Packet(raw_bytes)
pkt.parse()

print(pkt.has_layer(LayerKind.Dot15d4))
print(pkt.has_layer(LayerKind.Dot15d4Fcs))

L2TP

# L2TP v2 auto-detected on UDP port 1701
pkt = Packet(raw_bytes)
pkt.parse()

print(pkt.has_layer(LayerKind.L2tp))

Stateful Flow Extraction

Extract bidirectional conversations from PCAP captures with full TCP state machine tracking, stream reassembly, and UDP timeout-based flow grouping.

from stackforge import extract_flows, extract_flows_from_packets, FlowConfig, rdpcap

# Extract conversations from a PCAP file
conversations = extract_flows("capture.pcap")

for conv in conversations:
    print(f"{conv.src_addr}:{conv.src_port} <-> {conv.dst_addr}:{conv.dst_port}")
    print(f"  Protocol: {conv.protocol}, Status: {conv.status}")
    print(f"  Packets: {conv.total_packets}, Bytes: {conv.total_bytes}")
    print(f"  Duration: {conv.duration:.3f}s")

    # TCP-specific state and reassembled stream data
    if conv.tcp_state:
        print(f"  TCP State: {conv.tcp_state}")
    if conv.reassembled_forward:
        print(f"  Forward stream: {len(conv.reassembled_forward)} bytes")
    if conv.reassembled_reverse:
        print(f"  Reverse stream: {len(conv.reassembled_reverse)} bytes")

    # Indices into the original packet list
    print(f"  Packet indices: {conv.packet_indices}")

Use extract_flows_from_packets to extract flows from already-loaded packets:

packets = rdpcap("capture.pcap")
conversations = extract_flows_from_packets(packets)

Customize timeouts and buffer limits with FlowConfig:

config = FlowConfig(
    tcp_established_timeout=3600.0,  # 1 hour (default: 86400s)
    udp_timeout=60.0,                # 1 minute (default: 120s)
    max_reassembly_buffer=1048576,   # 1 MB per flow (default: 16 MB)
)
conversations = extract_flows("capture.pcap", config=config)

Optional: Track maximum packet sizes during flow extraction:

config = FlowConfig(
    track_max_packet_len=True,   # Track max per-direction (forward_max_packet_len, reverse_max_packet_len)
    track_max_flow_len=True,     # Track overall max (max_flow_len)
)
conversations = extract_flows("capture.pcap", config=config)

for conv in conversations:
    print(f"Max fwd packet: {conv.forward_max_packet_len} bytes")
    print(f"Max rev packet: {conv.reverse_max_packet_len} bytes")
    print(f"Max overall: {conv.max_flow_len} bytes")

Disabled by default (zero overhead). Enable only when needed for flow analysis.

ICMP and ICMPv6 Flow Tracking

Automatically correlate ICMP echo request/reply pairs and track other ICMP message types:

conversations = extract_flows("capture.pcap")

for conv in conversations:
    if conv.protocol == "ICMP" or conv.protocol == "ICMPv6":
        print(f"ICMP Echo: {conv.src_addr} <-> {conv.dst_addr}")
        print(f"  Type: {conv.icmp_type}, Code: {conv.icmp_code}")
        print(f"  Identifier: {conv.icmp_identifier}")
        print(f"  Requests: {conv.icmp_request_count}, Replies: {conv.icmp_reply_count}")
        print(f"  Last seq: {conv.icmp_last_seq}")

Features:

  • Echo request/reply pairs correlated via identifier (symmetric src/dst ports)
  • Non-echo message types tracked via (type, code) substitution
  • Properties: icmp_type, icmp_code, icmp_identifier, icmp_request_count, icmp_reply_count, icmp_last_seq
  • Returns None for non-ICMP flows

Rust Crate

The core library is available as a standalone Rust crate:

[dependencies]
stackforge-core = "0.3"

Development

# Set up environment
uv sync

# Build Rust extension (required after Rust changes)
uv run maturin develop

# Run tests
cargo test               # Rust tests
uv run pytest tests/python

# Lint and format
cargo fmt
cargo clippy
uv run ruff check .

Citing Stackforge

If you use Stackforge in academic research or published work, please cite it:

@software{stackforge,
  title = {Stackforge: High-Performance Packet Manipulation in Rust with Python Bindings},
  url = {https://github.com/LaBackDoor/stackforge},
  license = {GPL-3.0}
}

Or in plain text:

Stackforge: High-Performance Packet Manipulation in Rust with Python Bindings. https://github.com/LaBackDoor/stackforge

License

This project is licensed under the GNU General Public License v3.0 — see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stackforge-0.6.0.tar.gz (7.0 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stackforge-0.6.0-cp313-cp313-manylinux_2_34_x86_64.whl (888.9 kB view details)

Uploaded CPython 3.13manylinux: glibc 2.34+ x86-64

File details

Details for the file stackforge-0.6.0.tar.gz.

File metadata

  • Download URL: stackforge-0.6.0.tar.gz
  • Upload date:
  • Size: 7.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stackforge-0.6.0.tar.gz
Algorithm Hash digest
SHA256 9fac57d047d009a98c10adeb67f8c78320923fef346d9027ffe9bd789c086cf1
MD5 805d9e1ad47c63d9abe5aacc0a7e7e78
BLAKE2b-256 f4d9ca85cb5f161e6ccaff2d07b5dd88a0654b2aca25b02dc3d857b0919b1c9f

See more details on using hashes here.

Provenance

The following attestation bundles were made for stackforge-0.6.0.tar.gz:

Publisher: release.yml on LaBackDoor/stackforge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stackforge-0.6.0-cp313-cp313-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for stackforge-0.6.0-cp313-cp313-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 a6d06a0d3b552c8e3ab311ccfbcb31b9747598eeaef045a1503416a24ba29891
MD5 2c9e09511c8e74cacc8738b473206f1a
BLAKE2b-256 f242d64857da84d5cb3ff726e8e76c89b61f24ebf1aeafabcfe7c3aacf62049b

See more details on using hashes here.

Provenance

The following attestation bundles were made for stackforge-0.6.0-cp313-cp313-manylinux_2_34_x86_64.whl:

Publisher: release.yml on LaBackDoor/stackforge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page