Skip to main content

A lightweight, Pydantic-powered, distributed event-driven state machine and typed node graph runtime.

Project description

🕸️ CommandNet

CommandNet is a lightweight, distributed, event-driven state machine and typed node graph runtime for Python 3.11+.

It enables you to build durable, asynchronous workflows using strictly typed Python classes and Pydantic models. Unlike heavy orchestrators, CommandNet provides a minimal core that executes graph-based logic across distributed workers using your choice of database and message broker.


🚀 Installation

Install CommandNet via pip:

pip install commandnet

✨ Key Features

  • Type-Safe Transitions: The execution graph is inferred directly from Python type hints (-> Union[Type[NodeA], Type[NodeB]]). No external JSON/YAML definitions.
  • Pydantic State Management: Context is automatically serialized and rehydrated into Pydantic models with full validation.
  • Distributed by Design: Built-in row-level locking and idempotency support for safe execution across horizontally scaled workers.
  • Fan-out / Fan-in (Parallel): Native support for triggering multiple concurrent sub-tasks and merging results back into the parent state.
  • Native Scheduling: Schedule nodes to run after a specific delay with built-in idempotency keys to prevent duplicate execution.
  • Static Validation: Validate your entire workflow graph (types and connectivity) before a single event is processed.

🛠️ Quick Start

1. Define Your Context

The "Context" is the persistent state of your subject, defined using Pydantic.

from pydantic import BaseModel

class WorkflowCtx(BaseModel):
    user_id: str
    status: str = "pending"
    attempts: int = 0

2. Define Your Nodes

Nodes are the building blocks of your graph. The return type hint of the run method defines the edges of your DAG.

from typing import Union, Type, Optional
from commandnet import Node

class ProcessPayment(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> None:
        print(f"Processing for {ctx.user_id}...")
        ctx.status = "complete"
        return None # Terminal state

class CheckRisk(Node[WorkflowCtx, None]):
    # The return type explicitly defines the possible next nodes
    async def run(self, ctx: WorkflowCtx, payload: None) -> Union[Type[ProcessPayment], None]:
        ctx.attempts += 1
        if ctx.attempts > 3:
            return None # Failure/Stop
        return ProcessPayment

3. Advanced Routing (Parallel & Scheduled)

CommandNet supports complex workflow patterns beyond simple linear transitions.

Parallel Fan-out

from commandnet import Parallel, ParallelTask

class StartAnalysis(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> Parallel:
        return Parallel(
            branches=[
                ParallelTask(node_cls=SubTaskNode, sub_context_path="sub_data_1"),
                ParallelTask(node_cls=SubTaskNode, sub_context_path="sub_data_2")
            ],
            join_node=FinalMergeNode
        )

Delayed Scheduling

from commandnet import Schedule

class RetryNode(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> Schedule:
        return Schedule(
            node_cls=CheckRisk,
            delay_seconds=300,
            idempotency_key=f"retry-{ctx.attempts}"
        )

🏗️ Infrastructure Integration

CommandNet is unopinionated about your stack. You simply implement two abstract interfaces:

  1. Persistence: Handles locking state in your DB (Postgres, Redis, DynamoDB).
  2. EventBus: Handles moving events between workers (RabbitMQ, NATS, SQS).
from commandnet import Engine

# Implement these interfaces for your specific stack
db = MyPostgresAdapter()
bus = MyRabbitMQAdapter()

engine = Engine(persistence=db, event_bus=bus)

# Start the worker loop
await engine.start_worker()

# Trigger an execution
await engine.trigger_subject("subject-123", CheckRisk, WorkflowCtx(user_id="user_abc"))

🔍 Static Analysis & Safety

Prevent runtime failures by validating your graph during CI/CD or at startup. The GraphAnalyzer checks for disconnected nodes and ensures that if NodeA transitions to NodeB, they share compatible Context types.

from commandnet import GraphAnalyzer

# This will raise a TypeError if types don't match or a ValueError if edges are broken
GraphAnalyzer.validate(CheckRisk)

# Generate a dictionary representation of your DAG
dag = GraphAnalyzer.build_graph(CheckRisk)
print(dag) # {'CheckRisk': ['ProcessPayment'], 'ProcessPayment': []}

⚖️ Design Philosophy

  1. Code as Truth: If your IDE can navigate it, CommandNet can run it. No "magic strings."
  2. Stateless Execution: Workers don't keep local state. Every node execution starts with a fresh database fetch and lock.
  3. Zero Magic: No hidden background threads or global singletons. You control the Engine lifecycle.
  4. Ownership: CommandNet provides the orchestration logic; you provide the infrastructure.

📄 License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

commandnet-0.4.0.tar.gz (9.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

commandnet-0.4.0-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file commandnet-0.4.0.tar.gz.

File metadata

  • Download URL: commandnet-0.4.0.tar.gz
  • Upload date:
  • Size: 9.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for commandnet-0.4.0.tar.gz
Algorithm Hash digest
SHA256 0bd09eb54a9416905bb829296284c7cf9f44f701e9aa36a90caa38831f137c0c
MD5 0a9a6844f00a5081a8c6f186d7ad623b
BLAKE2b-256 378bc0dfdaff91cb42d1220b8968441ca4aca586bc35ebc4e27003f055e0630b

See more details on using hashes here.

File details

Details for the file commandnet-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: commandnet-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 11.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for commandnet-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d96d16e1f13134f3369a43c21cb225bac23dd88131e21d15f754a1e146104f7c
MD5 6841b547d788c83bf9b2cc487e6c634c
BLAKE2b-256 8e74d9c742031948091b538b69d73f6b394ad541e99b647bc8c9f4eb611b74b1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page