Skip to main content

A lightweight, Pydantic-powered, distributed event-driven state machine and typed node graph runtime.

Project description

🕸️ CommandNet

CommandNet is a lightweight, distributed, event-driven state machine and typed node graph runtime for Python 3.11+.

It enables you to build durable, asynchronous workflows using strictly typed Python classes and Pydantic models. Unlike heavy orchestrators, CommandNet provides a minimal core that executes graph-based logic across distributed workers using your choice of database and message broker.


🚀 Installation

Install CommandNet via pip:

pip install commandnet

✨ Key Features

  • Type-Safe Transitions: The execution graph is inferred directly from Python type hints (-> Union[Type[NodeA], Type[NodeB]]). No external JSON/YAML definitions.
  • Pydantic State Management: Context is automatically serialized and rehydrated into Pydantic models with full validation.
  • Distributed by Design: Built-in row-level locking and idempotency support for safe execution across horizontally scaled workers.
  • Fan-out / Fan-in (Parallel): Native support for triggering multiple concurrent sub-tasks and merging results back into the parent state.
  • Native Scheduling: Schedule nodes to run after a specific delay with built-in idempotency keys to prevent duplicate execution.
  • Static Validation: Validate your entire workflow graph (types and connectivity) before a single event is processed.

🛠️ Quick Start

1. Define Your Context

The "Context" is the persistent state of your agent, defined using Pydantic.

from pydantic import BaseModel

class WorkflowCtx(BaseModel):
    user_id: str
    status: str = "pending"
    attempts: int = 0

2. Define Your Nodes

Nodes are the building blocks of your graph. The return type hint of the run method defines the edges of your DAG.

from typing import Union, Type, Optional
from commandnet import Node

class ProcessPayment(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> None:
        print(f"Processing for {ctx.user_id}...")
        ctx.status = "complete"
        return None # Terminal state

class CheckRisk(Node[WorkflowCtx, None]):
    # The return type explicitly defines the possible next nodes
    async def run(self, ctx: WorkflowCtx, payload: None) -> Union[Type[ProcessPayment], None]:
        ctx.attempts += 1
        if ctx.attempts > 3:
            return None # Failure/Stop
        return ProcessPayment

3. Advanced Routing (Parallel & Scheduled)

CommandNet supports complex workflow patterns beyond simple linear transitions.

Parallel Fan-out

from commandnet import Parallel, ParallelTask

class StartAnalysis(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> Parallel:
        return Parallel(
            branches=[
                ParallelTask(node_cls=SubTaskNode, sub_context_path="sub_data_1"),
                ParallelTask(node_cls=SubTaskNode, sub_context_path="sub_data_2")
            ],
            join_node=FinalMergeNode
        )

Delayed Scheduling

from commandnet import Schedule

class RetryNode(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> Schedule:
        return Schedule(
            node_cls=CheckRisk,
            delay_seconds=300,
            idempotency_key=f"retry-{ctx.attempts}"
        )

🏗️ Infrastructure Integration

CommandNet is unopinionated about your stack. You simply implement two abstract interfaces:

  1. Persistence: Handles locking state in your DB (Postgres, Redis, DynamoDB).
  2. EventBus: Handles moving events between workers (RabbitMQ, NATS, SQS).
from commandnet import Engine

# Implement these interfaces for your specific stack
db = MyPostgresAdapter()
bus = MyRabbitMQAdapter()

engine = Engine(persistence=db, event_bus=bus)

# Start the worker loop
await engine.start_worker()

# Trigger an execution
await engine.trigger_agent("agent-123", CheckRisk, WorkflowCtx(user_id="user_abc"))

🔍 Static Analysis & Safety

Prevent runtime failures by validating your graph during CI/CD or at startup. The GraphAnalyzer checks for disconnected nodes and ensures that if NodeA transitions to NodeB, they share compatible Context types.

from commandnet import GraphAnalyzer

# This will raise a TypeError if types don't match or a ValueError if edges are broken
GraphAnalyzer.validate(CheckRisk)

# Generate a dictionary representation of your DAG
dag = GraphAnalyzer.build_graph(CheckRisk)
print(dag) # {'CheckRisk': ['ProcessPayment'], 'ProcessPayment': []}

⚖️ Design Philosophy

  1. Code as Truth: If your IDE can navigate it, CommandNet can run it. No "magic strings."
  2. Stateless Execution: Workers don't keep local state. Every node execution starts with a fresh database fetch and lock.
  3. Zero Magic: No hidden background threads or global singletons. You control the Engine lifecycle.
  4. Ownership: CommandNet provides the orchestration logic; you provide the infrastructure.

📄 License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

commandnet-0.3.0.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

commandnet-0.3.0-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file commandnet-0.3.0.tar.gz.

File metadata

  • Download URL: commandnet-0.3.0.tar.gz
  • Upload date:
  • Size: 8.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for commandnet-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c4fe95b2bc3d02091a03cc53b8a107e68f0f2c4f31fa1aece14d38b99093ffa5
MD5 0e6b7d69fd48eb1fa86c7cde186a7f2b
BLAKE2b-256 4d000c79b580476b49735d622e6fd25b1549aa43601fd9462b53747f6cb7ad0b

See more details on using hashes here.

File details

Details for the file commandnet-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: commandnet-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for commandnet-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0bdc4a3eef6f7a62bb2808066ce15ba964c3dde163551f308a853e0c3e938e0d
MD5 1e69ddb0a8f1686a49eb8d0c69233f5e
BLAKE2b-256 d309963799317483620c4f2ef77e48ae09221a78e329f5346afad8674d19e675

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page