Skip to main content

A lightweight, Pydantic-powered, distributed event-driven state machine and typed node graph runtime.

Project description

🕸️ CommandNet

CommandNet is a lightweight, distributed, event-driven state machine and typed node graph runtime for Python 3.11+.

It enables you to build durable, asynchronous workflows using strictly typed Python classes and Pydantic models. Unlike heavy orchestrators, CommandNet provides a minimal core that executes graph-based logic across distributed workers using your choice of database and message broker.


🚀 Installation

Install CommandNet via pip:

pip install commandnet

✨ Key Features

  • Type-Safe Transitions: The execution graph is inferred directly from Python type hints (-> Union[Type[NodeA], Type[NodeB]]). No external JSON/YAML definitions.
  • Pydantic State Management: Context is automatically serialized and rehydrated into Pydantic models with full validation.
  • Distributed by Design: Built-in row-level locking and idempotency support for safe execution across horizontally scaled workers.
  • Fan-out / Fan-in (Parallel): Native support for triggering multiple concurrent sub-tasks and merging results back into the parent state.
  • Native Scheduling: Schedule nodes to run after a specific delay with built-in idempotency keys to prevent duplicate execution.
  • Static Validation: Validate your entire workflow graph (types and connectivity) before a single event is processed.

🛠️ Quick Start

1. Define Your Context

The "Context" is the persistent state of your subject, defined using Pydantic.

from pydantic import BaseModel

class WorkflowCtx(BaseModel):
    user_id: str
    status: str = "pending"
    attempts: int = 0

2. Define Your Nodes

Nodes are the building blocks of your graph. The return type hint of the run method defines the edges of your DAG.

from typing import Union, Type, Optional
from commandnet import Node

class ProcessPayment(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> None:
        print(f"Processing for {ctx.user_id}...")
        ctx.status = "complete"
        return None # Terminal state

class CheckRisk(Node[WorkflowCtx, None]):
    # The return type explicitly defines the possible next nodes
    async def run(self, ctx: WorkflowCtx, payload: None) -> Union[Type[ProcessPayment], None]:
        ctx.attempts += 1
        if ctx.attempts > 3:
            return None # Failure/Stop
        return ProcessPayment

3. Advanced Routing (Parallel & Scheduled)

CommandNet supports complex workflow patterns beyond simple linear transitions.

Parallel Fan-out

from commandnet import Parallel, ParallelTask

class StartAnalysis(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> Parallel:
        return Parallel(
            branches=[
                ParallelTask(node_cls=SubTaskNode, sub_context_path="sub_data_1"),
                ParallelTask(node_cls=SubTaskNode, sub_context_path="sub_data_2")
            ],
            join_node=FinalMergeNode
        )

Delayed Scheduling

from commandnet import Schedule

class RetryNode(Node[WorkflowCtx, None]):
    async def run(self, ctx: WorkflowCtx, payload: None) -> Schedule:
        return Schedule(
            node_cls=CheckRisk,
            delay_seconds=300,
            idempotency_key=f"retry-{ctx.attempts}"
        )

🏗️ Infrastructure Integration

CommandNet is unopinionated about your stack. You simply implement two abstract interfaces:

  1. Persistence: Handles locking state in your DB (Postgres, Redis, DynamoDB).
  2. EventBus: Handles moving events between workers (RabbitMQ, NATS, SQS).
from commandnet import Engine

# Implement these interfaces for your specific stack
db = MyPostgresAdapter()
bus = MyRabbitMQAdapter()

engine = Engine(persistence=db, event_bus=bus)

# Start the worker loop
await engine.start_worker()

# Trigger an execution
await engine.trigger_subject("subject-123", CheckRisk, WorkflowCtx(user_id="user_abc"))

🔍 Static Analysis & Safety

Prevent runtime failures by validating your graph during CI/CD or at startup. The GraphAnalyzer checks for disconnected nodes and ensures that if NodeA transitions to NodeB, they share compatible Context types.

from commandnet import GraphAnalyzer

# This will raise a TypeError if types don't match or a ValueError if edges are broken
GraphAnalyzer.validate(CheckRisk)

# Generate a dictionary representation of your DAG
dag = GraphAnalyzer.build_graph(CheckRisk)
print(dag) # {'CheckRisk': ['ProcessPayment'], 'ProcessPayment': []}

⚖️ Design Philosophy

  1. Code as Truth: If your IDE can navigate it, CommandNet can run it. No "magic strings."
  2. Stateless Execution: Workers don't keep local state. Every node execution starts with a fresh database fetch and lock.
  3. Zero Magic: No hidden background threads or global singletons. You control the Engine lifecycle.
  4. Ownership: CommandNet provides the orchestration logic; you provide the infrastructure.

📄 License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

commandnet-0.6.2.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

commandnet-0.6.2-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file commandnet-0.6.2.tar.gz.

File metadata

  • Download URL: commandnet-0.6.2.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for commandnet-0.6.2.tar.gz
Algorithm Hash digest
SHA256 e76b40c0c155327476e3ce89ebf6c9e7513151cdb5bdcc80ffe0d16fcab26dea
MD5 e6cf788d7e4e1b007b8c9460e47ece19
BLAKE2b-256 091ee9b693b6e9a529b36ad1762d55f3fcb9583796cb76509bac62831d33e9d9

See more details on using hashes here.

File details

Details for the file commandnet-0.6.2-py3-none-any.whl.

File metadata

  • Download URL: commandnet-0.6.2-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for commandnet-0.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 199d28232186367afa5d91ae6098cfd8af64e29efeb62b5a334722cb85c9ce36
MD5 16aa27228343331050b49e1b04829eb7
BLAKE2b-256 a186836f3115b5b0fdd5491b4865c514138af2dbd8b98be5ce14046f6fc64129

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page