Skip to main content

SW4RM Agentic Protocol — Reference Python SDK (experimental)

Project description

SW4RM Agentic Protocol

Python CI Rust CI JS CI Examples: ACK Demo

SW4RM is an open agentic protocol for building message-driven agents with guaranteed delivery, persistent state, and rich observability. This repository provides three SDKs that implement the protocol — Python, Rust, and JavaScript — including clients, lightweight runtimes, and helpers for ACK lifecycle, worktree/state handling, and more.

SDKs

  • Python: sdks/py_sdk — see sdks/py_sdk/README.md
  • Rust: sdks/rust_sdk — see sdks/rust_sdk/README.md
  • JavaScript: sdks/js_sdk — see sdks/js_sdk/README.md

CI Workflows

  • Python CI: Python 3.12, installs .[dev], runs scripts/smoke_protos.py, then pytest -q sdks/py_sdk/tests.
  • Rust CI: Installs protoc, runs cargo test --all --locked in sdks/rust_sdk.
  • JS CI: Node 20, runs npm ci && npm run build && npm test in sdks/js_sdk.
  • Examples ACK Demo: Runs examples/sdk-usage/run_all.sh ack-demo which auto-starts local JS reference services, launches an ACK agent, and exercises router send/receive with ACKs.

Reproduce locally

  • Python: python -m pip install -e ".[dev]" && pytest -q sdks/py_sdk/tests
  • Rust: cd sdks/rust_sdk && cargo test --all --locked
  • JS: cd sdks/js_sdk && npm ci && npm run build && npm test
  • ACK demo: bash examples/sdk-usage/run_all.sh ack-demo (uses defaults localhost:{50051,50052,50053} via SW4RM_* envs)

Python SDK Installation

  • Prerequisites:
    • Python >= 3.9
    • Optional: create and activate a virtual environment
      • python3 -m venv venv && source venv/bin/activate
  • Runtime install (local):
    • python -m pip install .
  • Dev install (with codegen):
    • python -m pip install -e ".[dev]"
    • Generate stubs: make protos (requires grpcio-tools)
      • Stubs are generated under sdks/py_sdk/sw4rm/protos

Core Features

  • Persistent Activity Buffer: Track messages across restarts with reconciliation
  • Worktree Management: Policy-driven binding with persistent state
  • ACK Lifecycle: Automatic acknowledgment handling with router integration
  • Message Processing: Handler-based routing with built-in error handling
  • Multiple Persistence: JSON file and pluggable storage backends
  • Production Ready: Comprehensive error handling, logging, and state management

Quick Start

Looking for a local all-in-one stack? See the DevCore Quickstart to run in-repo Registry, Router, Scheduler, and Negotiation services:

  • DevCore Quickstart: QUICKSTART.md (section "DevCore (Rust) Quickstart")

Basic Agent

import grpc
from sw4rm.clients.registry import RegistryClient
from sw4rm.clients.router import RouterClient
from sw4rm.protos import common_pb2 as common

# Connect to services
router_ch = grpc.insecure_channel("localhost:50051")
registry_ch = grpc.insecure_channel("localhost:50052")
registry = RegistryClient(registry_ch)
router = RouterClient(router_ch)

# Register agent
response = registry.register({
    "agent_id": "my-agent",
    "name": "MyAgent",
    "description": "Example agent",
    "capabilities": ["processing"],
    "communication_class": common.CommunicationClass.STANDARD,
})

Advanced Agent with Persistence

from sw4rm import constants as C
from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.worktree_state import PersistentWorktreeState
from sw4rm.ack_integration import ACKLifecycleManager, MessageProcessor

# Initialize persistent components
buffer = PersistentActivityBuffer(max_items=1000)
worktree = PersistentWorktreeState()
ack_manager = ACKLifecycleManager(router, buffer, "my-agent")
processor = MessageProcessor(ack_manager)

# Register message handlers
def handle_data(envelope):
    print(f"Processing: {envelope['message_id']}")
    return "processed"

processor.register_handler(C.DATA, handle_data)

# Process incoming messages with automatic ACKs
for item in router.stream_incoming("my-agent"):
    # Extract envelope from stream item (protobuf → dict)
    envelope_msg = getattr(item, "msg", item)
    envelope = {
        "message_id": getattr(envelope_msg, "message_id", ""),
        "message_type": getattr(envelope_msg, "message_type", 0),
        "content_type": getattr(envelope_msg, "content_type", ""),
        "payload": getattr(envelope_msg, "payload", b""),
        "producer_id": getattr(envelope_msg, "producer_id", ""),
        "correlation_id": getattr(envelope_msg, "correlation_id", ""),
        "sequence_number": getattr(envelope_msg, "sequence_number", 0),
    }
    result = processor.process_message(envelope)

API Reference

Core Components

PersistentActivityBuffer

Tracks messages with persistent storage across restarts.

from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.persistence import JSONFilePersistence

# Initialize with custom persistence
buffer = PersistentActivityBuffer(
    max_items=1000,
    persistence=JSONFilePersistence("my_activity.json")
)

# Track messages
record = buffer.record_outgoing(envelope)
buffer.ack(ack_message)

# Query state
unacked = buffer.unacked()
recent = buffer.recent(50)
needs_retry = buffer.reconcile()

PersistentWorktreeState

Manages worktree bindings with policy validation.

from sw4rm.worktree_state import PersistentWorktreeState

# Minimal custom policy implementing the expected hooks
class MyPolicy:
    def __init__(self, allowed_repos=None):
        self.allowed_repos = set(allowed_repos or [])

    def before_bind(self, repo_id, worktree_id, current):
        # Allow only specific repos
        return (not self.allowed_repos) or (repo_id in self.allowed_repos)

    def after_bind(self, binding):
        print(f"Bound to {binding.repo_id}/{binding.worktree_id}")

# Initialize with policy
worktree = PersistentWorktreeState(
    policy=MyPolicy(allowed_repos=["main-repo", "test-repo"])
)

# Manage bindings
success = worktree.bind("main-repo", "feature-branch", {"version": "1.2.3"})
current = worktree.current()
status = worktree.status()

ACKLifecycleManager

Automatic acknowledgment handling with router integration.

from sw4rm.ack_integration import ACKLifecycleManager
from sw4rm import constants as C

manager = ACKLifecycleManager(
    router_client=router,
    activity_buffer=buffer,
    agent_id="my-agent",
    auto_ack=True
)

# Send with automatic ACK tracking
result = manager.send_message_with_ack(envelope)

# Manual ACK sending
manager.send_ack(message_id, stage=C.FULFILLED, note="Processed successfully")

# Reconciliation
stale_messages = manager.reconcile_acks()

MessageProcessor

Handler-based message processing with automatic ACKs.

from sw4rm.ack_integration import MessageProcessor
from sw4rm import constants as C

processor = MessageProcessor(ack_manager)

# Register handlers
def handle_data(envelope):
    # Process DATA messages
    return "success"

def handle_control(envelope):
    # Process CONTROL messages  
    command = json.loads(envelope['payload'])
    return f"executed_{command['action']}"

processor.register_handler(C.DATA, handle_data)
processor.register_handler(C.CONTROL, handle_control)
processor.set_default_handler(lambda env: "unknown_message")

# Process with automatic ACK lifecycle
result = processor.process_message(envelope)

Client APIs

RegistryClient

from sw4rm.clients.registry import RegistryClient
from sw4rm.protos import common_pb2 as common

registry = RegistryClient(grpc_channel)

# Register agent
response = registry.register({
    "agent_id": "my-agent",
    "name": "My Agent",
    "capabilities": ["processing", "analysis"],
    "communication_class": common.CommunicationClass.STANDARD
})

# Send heartbeat
registry.heartbeat("my-agent", state=common.AgentState.RUNNING)

# Deregister
registry.deregister("my-agent", reason="shutdown")

RouterClient

from sw4rm.clients.router import RouterClient

router = RouterClient(grpc_channel)

# Send message
response = router.send_message(envelope_dict)

# Stream incoming messages
for item in router.stream_incoming("my-agent"):
    envelope = item.msg
    # Process envelope...

Utility Functions

Envelope Building

from sw4rm.envelope import build_envelope

envelope = build_envelope(
    producer_id="my-agent",
    message_type=C.DATA,
    content_type="application/json",
    payload=json.dumps(data).encode(),
    correlation_id="optional-correlation-id"
)

ACK Building

from sw4rm.acks import build_ack_envelope

ack = build_ack_envelope(
    producer_id="my-agent",
    ack_for_message_id="original-msg-id",
    ack_stage=C.FULFILLED,
    note="Processing completed"
)

Constants

from sw4rm import constants as C

# Message types
C.DATA                    # Data message
C.CONTROL                 # Control message
C.ACKNOWLEDGEMENT        # ACK message
C.WORKTREE_CONTROL       # Worktree operation
C.HEARTBEAT              # Heartbeat
C.NOTIFICATION           # Notification
C.HITL_INVOCATION        # HITL invocation
C.NEGOTIATION            # Negotiation
C.TOOL_CALL              # Tool call
C.TOOL_RESULT            # Tool result
C.TOOL_ERROR             # Tool error

# ACK stages
C.RECEIVED               # Message received
C.READ                   # Message read/parsed
C.FULFILLED              # Processing completed
C.REJECTED               # Processing rejected
C.FAILED                 # Processing failed
C.TIMED_OUT              # Processing timed out

# Error codes
C.VALIDATION_ERROR       # Invalid message format
C.PERMISSION_DENIED      # Unauthorized operation
C.INTERNAL_ERROR         # Internal processing error
C.ACK_TIMEOUT            # ACK not received in time
C.AGENT_UNAVAILABLE      # Agent not reachable
C.AGENT_SHUTDOWN         # Agent shutting down
C.NO_ROUTE               # No route to target
C.OVERSIZE_PAYLOAD       # Payload too large
C.TOOL_TIMEOUT           # Tool call timed out
C.FORCED_PREEMPTION      # Scheduler forced preemption
C.TTL_EXPIRED            # Message TTL expired

Message Semantics

  • Required fields: message_id, producer_id, correlation_id, sequence_number, message_type, content_type, payload.
  • Correlation: For negotiation flows, correlation_id equals the negotiation ID (per protocol spec).
  • Optional fields: idempotency_token, repo_id, worktree_id, ttl_ms, content_length, hlc_timestamp.
  • Envelope builder returns a dict matching proto fields; adapt to protobuf classes if stubs are present.

Examples

Complete Examples

  • Basic echo agent: examples/echo_agent.py - Simple registration and message echoing
  • Advanced agent: examples/advanced_agent.py - Full SDK feature demonstration
  • Test client: examples/test_client.py - Client for testing agent functionality

Running Examples

# Start advanced agent
python examples/advanced_agent.py --router localhost:50051 --registry localhost:50052 --data-dir ./my_agent_data

# Test the agent (in another terminal)
python examples/test_client.py --router localhost:50051 --registry localhost:50052 --target-agent advanced-1

# Run specific test
python examples/test_client.py --router localhost:50051 --registry localhost:50052 --test data --target-agent advanced-1

See examples/README.md for detailed example documentation.

For TypeScript/JS usage examples and an ACK flow demo, see examples/sdk-usage/README.md.

Development

Generate Protocol Buffers

python -m pip install -e ".[dev]"
make protos

Build Package

python -m pip install build twine
python -m build
python -m twine upload dist/*

Release

Use the provided Makefile targets for a reproducible release process.

  • Prerequisites

    • Install dev tooling: python -m pip install -e ".[dev]"
    • Generate protobuf stubs: make protos
  • Build artifacts

    • make release — generates stubs, verifies presence, builds wheel/sdist
  • Verify artifacts

    • make release-verify — validates wheel/sdist metadata and runs twine check
    • make smoke-wheel — reinstalls latest wheel into the repo venv and runs sw4rm-doctor
  • Optional: TestPyPI / PyPI

    • make publish-test — upload to TestPyPI (requires credentials)
    • make publish — upload to PyPI (requires credentials)
  • Tagging

    • make tag && make tag-push — create and push an annotated git tag from pyproject.toml version

Notes

  • Twine ≥ 5.x and pkginfo ≥ 1.10 are recommended to support modern Metadata-Version (e.g., 2.4).
  • See docs/PROGRESS_REPORT.md for a detailed Release Checklist.

Testing

  • Unified: make test (runs Python, Rust, JS tests + JS ACK demo)
  • Python only: make test-python
  • Rust only: make test-rust (requires protoc)
  • JS only: make test-js (Node >= 20)
  • Examples demo: make demo-examples (runs examples/sdk-usage/run_all.sh ack-demo)
# Run all tests and the ACK demo
make test

# Run examples against local services
# See QUICKSTART.md for how to start the in-repo services
python examples/advanced_agent.py --router localhost:50051 --registry localhost:50052
python examples/test_client.py --router localhost:50051 --registry localhost:50052

Architecture

The Python SDK is organized into layers:

  1. Protocol Layer: Generated protobuf stubs (sw4rm.protos)
  2. Client Layer: Service clients (sw4rm.clients)
  3. Runtime Layer: Core functionality (sw4rm.activity_buffer, sw4rm.worktree_state)
  4. Integration Layer: High-level APIs (sw4rm.ack_integration)
  5. Utility Layer: Helpers (sw4rm.envelope, sw4rm.acks)

Protocol highlights

  • Cooperative preemption and urgent lane semantics defined by Scheduler and CommunicationClass (see spec).
  • HITL escalation reasons and Reasoning Engine participation are supported via dedicated services.
  • Activity buffer persists advisory task/message context and supports reconciliation.

Production Considerations

State Management

  • Activity buffer automatically prunes old records (configurable limit)
  • Worktree state persists binding information across restarts
  • All persistence uses atomic file writes for consistency

Error Handling

  • Network failures trigger automatic retries where appropriate
  • Invalid messages are rejected with proper ACKs
  • Persistence failures fall back to in-memory operation

Performance

  • Activity buffer uses efficient in-memory indexing
  • Persistence operations are batched and asynchronous
  • Message processing uses handler-based dispatch

Monitoring

  • Built-in logging for all major operations
  • Activity buffer provides reconciliation API
  • Worktree policies support custom validation hooks

Git Commit Hooks

To enforce consistent commit messages across the repo:

  • Recommended setup:
    • Set versioned hooks path once per clone: git config core.hooksPath scripts/git-hooks

Contributing

  • Versioning: Keep all SDKs in lockstep with the protocol spec. The single source of truth is documentation/protocol/spec.md line Version: X.Y.Z (...). Python (pyproject.toml), JS (sdks/js_sdk/package.json), and Rust (sdks/rust_sdk/Cargo.toml) must equal the spec version.
  • Pre-commit hook: Local guard that blocks commits if versions aren’t SemVer or out of sync; also requires a bump when protocol/protos or an SDK changes.
    • Enable once per clone: git config core.hooksPath .githooks && chmod +x .githooks/pre-commit
  • Bump script: Updates spec + all SDKs together.
    • python scripts/bump_version.py X.Y.Z [--stage]
  • PR checks: CI enforces the same rules and does preflight builds.
    • Workflow: .github/workflows/version-guard.yml
  • Release tags: Publishing is tag-driven per language and runs in GitHub Actions.
    • PyPI: git tag py-vX.Y.Z && git push origin py-vX.Y.Z
    • npm: git tag npm-vX.Y.Z && git push origin npm-vX.Y.Z
    • crates.io: git tag rs-vX.Y.Z && git push origin rs-vX.Y.Z
  • Release scripts: Create tags locally (publishing happens in Actions).
    • One SDK: python scripts/release.py [py|npm|rs] X.Y.Z --push
    • All SDKs: python scripts/release_all.py X.Y.Z --push
  • Secrets storage: Use a GitHub Actions Environment named production for publish tokens.
    • Add environment secrets: PYPI_API_TOKEN, NPM_TOKEN, CRATES_IO_TOKEN under Settings → Environments → production.
    • Release workflows target this environment: .github/workflows/release-*.yml.
  • Tag prefixes and SemVer:
    • Tags must use py-v, npm-v, rs-v followed by X.Y.Z that matches all manifests and the spec.

    • SemVer only (no suffixes). CI and hooks will fail on mismatch.

    • Optionally install template and hooks via script: ./scripts/install_git_hooks.sh

  • Hooks enforce:
    • Subject: non-empty, ≤50 chars, imperative, no trailing period
    • Blank line after subject
    • Body lines wrapped at 72 characters (links exempt)
    • Pre-commit will block if core.hooksPath is misconfigured; bypass once with ALLOW_HOOKS_PATH_MISMATCH=1 git commit or git commit --no-verify.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sw4rm_sdk-0.4.0.tar.gz (118.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sw4rm_sdk-0.4.0-py3-none-any.whl (144.3 kB view details)

Uploaded Python 3

File details

Details for the file sw4rm_sdk-0.4.0.tar.gz.

File metadata

  • Download URL: sw4rm_sdk-0.4.0.tar.gz
  • Upload date:
  • Size: 118.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sw4rm_sdk-0.4.0.tar.gz
Algorithm Hash digest
SHA256 fb2411c423f2daa9870ced774f61ac03325360156f19f587d948e33bed49117b
MD5 376c36a2183c474a011736b1e2b72c15
BLAKE2b-256 8c43127b54ef1d166d15cf9084593a948b1762d32a562fa66cead2357a65a2ab

See more details on using hashes here.

File details

Details for the file sw4rm_sdk-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: sw4rm_sdk-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 144.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sw4rm_sdk-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 886d3c6c627815d60d52e6139ac78f80c3c6a24731a91073eaa4d6ab23c3bba7
MD5 40ef62307fc1787c55b015a9a877f566
BLAKE2b-256 53ae52e5542005a5dde471ee0b023c9d649b2e26dbebfdcae959c29b4f45a20d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page