SW4RM Agentic Protocol — Reference Python SDK (experimental)
Project description
SW4RM Agentic Protocol
SW4RM is an open agentic protocol for building message-driven agents with guaranteed delivery, persistent state, and rich observability. This repository provides three SDKs that implement the protocol — Python, Rust, and JavaScript — including clients, lightweight runtimes, and helpers for ACK lifecycle, worktree/state handling, and more.
SDKs
- Python:
sdks/py_sdk— seesdks/py_sdk/README.md - Rust:
sdks/rust_sdk— seesdks/rust_sdk/README.md - JavaScript:
sdks/js_sdk— seesdks/js_sdk/README.md
CI Workflows
- Python CI: Python 3.12, installs
.[dev], runsscripts/smoke_protos.py, thenpytest -q sdks/py_sdk/tests. - Rust CI: Installs
protoc, runscargo test --all --lockedinsdks/rust_sdk. - JS CI: Node 20, runs
npm ci && npm run build && npm testinsdks/js_sdk. - Examples ACK Demo: Runs
examples/sdk-usage/run_all.sh ack-demowhich auto-starts local JS reference services, launches an ACK agent, and exercises router send/receive with ACKs.
Reproduce locally
- Python:
python -m pip install -e ".[dev]" && pytest -q sdks/py_sdk/tests - Rust:
cd sdks/rust_sdk && cargo test --all --locked - JS:
cd sdks/js_sdk && npm ci && npm run build && npm test - ACK demo:
bash examples/sdk-usage/run_all.sh ack-demo(uses defaultslocalhost:{50051,50052,50053}viaSW4RM_*envs)
Python SDK Installation
- Prerequisites:
- Python >= 3.9
- Optional: create and activate a virtual environment
python3 -m venv venv && source venv/bin/activate
- Runtime install (local):
python -m pip install .
- Dev install (with codegen):
python -m pip install -e ".[dev]"- Generate stubs:
make protos(requiresgrpcio-tools)- Stubs are generated under
sdks/py_sdk/sw4rm/protos
- Stubs are generated under
Core Features
- Persistent Activity Buffer: Track messages across restarts with reconciliation
- Worktree Management: Policy-driven binding with persistent state
- ACK Lifecycle: Automatic acknowledgment handling with router integration
- Message Processing: Handler-based routing with built-in error handling
- Multiple Persistence: JSON file and pluggable storage backends
- Production Ready: Comprehensive error handling, logging, and state management
Quick Start
Looking for a local all-in-one stack? See the DevCore Quickstart to run in-repo Registry, Router, Scheduler, and Negotiation services:
- DevCore Quickstart:
QUICKSTART.md(section "DevCore (Rust) Quickstart")
Basic Agent
import grpc
from sw4rm.clients.registry import RegistryClient
from sw4rm.clients.router import RouterClient
from sw4rm.protos import common_pb2 as common
# Connect to services
router_ch = grpc.insecure_channel("localhost:50051")
registry_ch = grpc.insecure_channel("localhost:50052")
registry = RegistryClient(registry_ch)
router = RouterClient(router_ch)
# Register agent
response = registry.register({
"agent_id": "my-agent",
"name": "MyAgent",
"description": "Example agent",
"capabilities": ["processing"],
"communication_class": common.CommunicationClass.STANDARD,
})
Advanced Agent with Persistence
from sw4rm import constants as C
from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.worktree_state import PersistentWorktreeState
from sw4rm.ack_integration import ACKLifecycleManager, MessageProcessor
# Initialize persistent components
buffer = PersistentActivityBuffer(max_items=1000)
worktree = PersistentWorktreeState()
ack_manager = ACKLifecycleManager(router, buffer, "my-agent")
processor = MessageProcessor(ack_manager)
# Register message handlers
def handle_data(envelope):
print(f"Processing: {envelope['message_id']}")
return "processed"
processor.register_handler(C.DATA, handle_data)
# Process incoming messages with automatic ACKs
for item in router.stream_incoming("my-agent"):
# Extract envelope from stream item (protobuf → dict)
envelope_msg = getattr(item, "msg", item)
envelope = {
"message_id": getattr(envelope_msg, "message_id", ""),
"message_type": getattr(envelope_msg, "message_type", 0),
"content_type": getattr(envelope_msg, "content_type", ""),
"payload": getattr(envelope_msg, "payload", b""),
"producer_id": getattr(envelope_msg, "producer_id", ""),
"correlation_id": getattr(envelope_msg, "correlation_id", ""),
"sequence_number": getattr(envelope_msg, "sequence_number", 0),
}
result = processor.process_message(envelope)
API Reference
Core Components
PersistentActivityBuffer
Tracks messages with persistent storage across restarts.
from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.persistence import JSONFilePersistence
# Initialize with custom persistence
buffer = PersistentActivityBuffer(
max_items=1000,
persistence=JSONFilePersistence("my_activity.json")
)
# Track messages
record = buffer.record_outgoing(envelope)
buffer.ack(ack_message)
# Query state
unacked = buffer.unacked()
recent = buffer.recent(50)
needs_retry = buffer.reconcile()
PersistentWorktreeState
Manages worktree bindings with policy validation.
from sw4rm.worktree_state import PersistentWorktreeState
# Minimal custom policy implementing the expected hooks
class MyPolicy:
def __init__(self, allowed_repos=None):
self.allowed_repos = set(allowed_repos or [])
def before_bind(self, repo_id, worktree_id, current):
# Allow only specific repos
return (not self.allowed_repos) or (repo_id in self.allowed_repos)
def after_bind(self, binding):
print(f"Bound to {binding.repo_id}/{binding.worktree_id}")
# Initialize with policy
worktree = PersistentWorktreeState(
policy=MyPolicy(allowed_repos=["main-repo", "test-repo"])
)
# Manage bindings
success = worktree.bind("main-repo", "feature-branch", {"version": "1.2.3"})
current = worktree.current()
status = worktree.status()
ACKLifecycleManager
Automatic acknowledgment handling with router integration.
from sw4rm.ack_integration import ACKLifecycleManager
from sw4rm import constants as C
manager = ACKLifecycleManager(
router_client=router,
activity_buffer=buffer,
agent_id="my-agent",
auto_ack=True
)
# Send with automatic ACK tracking
result = manager.send_message_with_ack(envelope)
# Manual ACK sending
manager.send_ack(message_id, stage=C.FULFILLED, note="Processed successfully")
# Reconciliation
stale_messages = manager.reconcile_acks()
MessageProcessor
Handler-based message processing with automatic ACKs.
from sw4rm.ack_integration import MessageProcessor
from sw4rm import constants as C
processor = MessageProcessor(ack_manager)
# Register handlers
def handle_data(envelope):
# Process DATA messages
return "success"
def handle_control(envelope):
# Process CONTROL messages
command = json.loads(envelope['payload'])
return f"executed_{command['action']}"
processor.register_handler(C.DATA, handle_data)
processor.register_handler(C.CONTROL, handle_control)
processor.set_default_handler(lambda env: "unknown_message")
# Process with automatic ACK lifecycle
result = processor.process_message(envelope)
Client APIs
RegistryClient
from sw4rm.clients.registry import RegistryClient
from sw4rm.protos import common_pb2 as common
registry = RegistryClient(grpc_channel)
# Register agent
response = registry.register({
"agent_id": "my-agent",
"name": "My Agent",
"capabilities": ["processing", "analysis"],
"communication_class": common.CommunicationClass.STANDARD
})
# Send heartbeat
registry.heartbeat("my-agent", state=common.AgentState.RUNNING)
# Deregister
registry.deregister("my-agent", reason="shutdown")
RouterClient
from sw4rm.clients.router import RouterClient
router = RouterClient(grpc_channel)
# Send message
response = router.send_message(envelope_dict)
# Stream incoming messages
for item in router.stream_incoming("my-agent"):
envelope = item.msg
# Process envelope...
Utility Functions
Envelope Building
from sw4rm.envelope import build_envelope
envelope = build_envelope(
producer_id="my-agent",
message_type=C.DATA,
content_type="application/json",
payload=json.dumps(data).encode(),
correlation_id="optional-correlation-id"
)
ACK Building
from sw4rm.acks import build_ack_envelope
ack = build_ack_envelope(
producer_id="my-agent",
ack_for_message_id="original-msg-id",
ack_stage=C.FULFILLED,
note="Processing completed"
)
Constants
from sw4rm import constants as C
# Message types
C.DATA # Data message
C.CONTROL # Control message
C.ACKNOWLEDGEMENT # ACK message
C.WORKTREE_CONTROL # Worktree operation
C.HEARTBEAT # Heartbeat
C.NOTIFICATION # Notification
C.HITL_INVOCATION # HITL invocation
C.NEGOTIATION # Negotiation
C.TOOL_CALL # Tool call
C.TOOL_RESULT # Tool result
C.TOOL_ERROR # Tool error
# ACK stages
C.RECEIVED # Message received
C.READ # Message read/parsed
C.FULFILLED # Processing completed
C.REJECTED # Processing rejected
C.FAILED # Processing failed
C.TIMED_OUT # Processing timed out
# Error codes
C.VALIDATION_ERROR # Invalid message format
C.PERMISSION_DENIED # Unauthorized operation
C.INTERNAL_ERROR # Internal processing error
C.ACK_TIMEOUT # ACK not received in time
C.AGENT_UNAVAILABLE # Agent not reachable
C.AGENT_SHUTDOWN # Agent shutting down
C.NO_ROUTE # No route to target
C.OVERSIZE_PAYLOAD # Payload too large
C.TOOL_TIMEOUT # Tool call timed out
C.FORCED_PREEMPTION # Scheduler forced preemption
C.TTL_EXPIRED # Message TTL expired
Message Semantics
- Required fields:
message_id,producer_id,correlation_id,sequence_number,message_type,content_type,payload. - Correlation: For negotiation flows,
correlation_idequals the negotiation ID (per protocol spec). - Optional fields:
idempotency_token,repo_id,worktree_id,ttl_ms,content_length,hlc_timestamp. - Envelope builder returns a dict matching proto fields; adapt to protobuf classes if stubs are present.
Examples
Complete Examples
- Basic echo agent:
examples/echo_agent.py- Simple registration and message echoing - Advanced agent:
examples/advanced_agent.py- Full SDK feature demonstration - Test client:
examples/test_client.py- Client for testing agent functionality
Running Examples
# Start advanced agent
python examples/advanced_agent.py --router localhost:50051 --registry localhost:50052 --data-dir ./my_agent_data
# Test the agent (in another terminal)
python examples/test_client.py --router localhost:50051 --registry localhost:50052 --target-agent advanced-1
# Run specific test
python examples/test_client.py --router localhost:50051 --registry localhost:50052 --test data --target-agent advanced-1
See examples/README.md for detailed example documentation.
For TypeScript/JS usage examples and an ACK flow demo, see examples/sdk-usage/README.md.
Development
Generate Protocol Buffers
python -m pip install -e ".[dev]"
make protos
Build Package
python -m pip install build twine
python -m build
python -m twine upload dist/*
Release
Use the provided Makefile targets for a reproducible release process.
-
Prerequisites
- Install dev tooling:
python -m pip install -e ".[dev]" - Generate protobuf stubs:
make protos
- Install dev tooling:
-
Build artifacts
make release— generates stubs, verifies presence, builds wheel/sdist
-
Verify artifacts
make release-verify— validates wheel/sdist metadata and runstwine checkmake smoke-wheel— reinstalls latest wheel into the repo venv and runssw4rm-doctor
-
Optional: TestPyPI / PyPI
make publish-test— upload to TestPyPI (requires credentials)make publish— upload to PyPI (requires credentials)
-
Tagging
make tag && make tag-push— create and push an annotated git tag frompyproject.tomlversion
Notes
- Twine ≥ 5.x and pkginfo ≥ 1.10 are recommended to support modern
Metadata-Version(e.g., 2.4). - See
docs/PROGRESS_REPORT.mdfor a detailed Release Checklist.
Testing
- Unified:
make test(runs Python, Rust, JS tests + JS ACK demo) - Python only:
make test-python - Rust only:
make test-rust(requiresprotoc) - JS only:
make test-js(Node >= 20) - Examples demo:
make demo-examples(runsexamples/sdk-usage/run_all.sh ack-demo)
# Run all tests and the ACK demo
make test
# Run examples against local services
# See QUICKSTART.md for how to start the in-repo services
python examples/advanced_agent.py --router localhost:50051 --registry localhost:50052
python examples/test_client.py --router localhost:50051 --registry localhost:50052
Architecture
The Python SDK is organized into layers:
- Protocol Layer: Generated protobuf stubs (
sw4rm.protos) - Client Layer: Service clients (
sw4rm.clients) - Runtime Layer: Core functionality (
sw4rm.activity_buffer,sw4rm.worktree_state) - Integration Layer: High-level APIs (
sw4rm.ack_integration) - Utility Layer: Helpers (
sw4rm.envelope,sw4rm.acks)
Protocol highlights
- Cooperative preemption and urgent lane semantics defined by Scheduler and CommunicationClass (see spec).
- HITL escalation reasons and Reasoning Engine participation are supported via dedicated services.
- Activity buffer persists advisory task/message context and supports reconciliation.
Production Considerations
State Management
- Activity buffer automatically prunes old records (configurable limit)
- Worktree state persists binding information across restarts
- All persistence uses atomic file writes for consistency
Error Handling
- Network failures trigger automatic retries where appropriate
- Invalid messages are rejected with proper ACKs
- Persistence failures fall back to in-memory operation
Performance
- Activity buffer uses efficient in-memory indexing
- Persistence operations are batched and asynchronous
- Message processing uses handler-based dispatch
Monitoring
- Built-in logging for all major operations
- Activity buffer provides reconciliation API
- Worktree policies support custom validation hooks
Git Commit Hooks
To enforce consistent commit messages across the repo:
- Recommended setup:
- Set versioned hooks path once per clone:
git config core.hooksPath scripts/git-hooks
- Set versioned hooks path once per clone:
Contributing
- Versioning: Keep all SDKs in lockstep with the protocol spec. The single source of truth is
documentation/protocol/spec.mdlineVersion: X.Y.Z (...). Python (pyproject.toml), JS (sdks/js_sdk/package.json), and Rust (sdks/rust_sdk/Cargo.toml) must equal the spec version. - Pre-commit hook: Local guard that blocks commits if versions aren’t SemVer or out of sync; also requires a bump when protocol/protos or an SDK changes.
- Enable once per clone:
git config core.hooksPath .githooks && chmod +x .githooks/pre-commit
- Enable once per clone:
- Bump script: Updates spec + all SDKs together.
python scripts/bump_version.py X.Y.Z [--stage]
- PR checks: CI enforces the same rules and does preflight builds.
- Workflow:
.github/workflows/version-guard.yml
- Workflow:
- Release tags: Publishing is tag-driven per language and runs in GitHub Actions.
- PyPI:
git tag py-vX.Y.Z && git push origin py-vX.Y.Z - npm:
git tag npm-vX.Y.Z && git push origin npm-vX.Y.Z - crates.io:
git tag rs-vX.Y.Z && git push origin rs-vX.Y.Z
- PyPI:
- Release scripts: Create tags locally (publishing happens in Actions).
- One SDK:
python scripts/release.py [py|npm|rs] X.Y.Z --push - All SDKs:
python scripts/release_all.py X.Y.Z --push
- One SDK:
- Secrets storage: Use a GitHub Actions Environment named
productionfor publish tokens.- Add environment secrets:
PYPI_API_TOKEN,NPM_TOKEN,CRATES_IO_TOKENunder Settings → Environments → production. - Release workflows target this environment:
.github/workflows/release-*.yml.
- Add environment secrets:
- Tag prefixes and SemVer:
-
Tags must use
py-v,npm-v,rs-vfollowed byX.Y.Zthat matches all manifests and the spec. -
SemVer only (no suffixes). CI and hooks will fail on mismatch.
-
Optionally install template and hooks via script:
./scripts/install_git_hooks.sh
-
- Hooks enforce:
- Subject: non-empty, ≤50 chars, imperative, no trailing period
- Blank line after subject
- Body lines wrapped at 72 characters (links exempt)
- Pre-commit will block if
core.hooksPathis misconfigured; bypass once withALLOW_HOOKS_PATH_MISMATCH=1 git commitorgit commit --no-verify.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sw4rm_sdk-0.3.0.tar.gz.
File metadata
- Download URL: sw4rm_sdk-0.3.0.tar.gz
- Upload date:
- Size: 43.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc6ca4731ee0cc8e51c65313f9b2ae9c757bc1254790be36dcf16c22152b33af
|
|
| MD5 |
ab16cd759c0e4b938b1702cd132faa72
|
|
| BLAKE2b-256 |
847d87dcc74a5bfe70eabc4d6b16062dd64bcf27fe886f41e80943f558b9e39d
|
File details
Details for the file sw4rm_sdk-0.3.0-py3-none-any.whl.
File metadata
- Download URL: sw4rm_sdk-0.3.0-py3-none-any.whl
- Upload date:
- Size: 50.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
08d4414cca81ab6fd5df0ab84079edac8c8215dab70c1a77c7cb741eaa6b0a96
|
|
| MD5 |
feb116263900e6b788cc4b041074d6ac
|
|
| BLAKE2b-256 |
af55cddfbdedeb702f2300a5c44c0d07da9183598fc0ec79434449ae2e69df8b
|