Skip to main content

A task management system for complex distributed orchestration

Project description

Pynenc

Pynenc

A task management system for complex distributed orchestration

Package version Supported Python versions GitHub last commit GitHub contributors GitHub issues GitHub license GitHub Repo stars GitHub forks


Documentation: https://docs.pynenc.org

Source Code: https://github.com/pynenc/pynenc


Pynenc is a Python task orchestration framework for distributed workers. It gives each task invocation a tracked lifecycle, lets the orchestrator enforce concurrency and retry rules, and keeps enough state to inspect what happened after the fact.

🆕 What's New in v0.3.0

  • Explicit workflow tasks: @app.workflow now marks the task that defines a workflow or sub-workflow root
  • Root workflow operations: deterministic orchestration lives under wf.root.uuid(), wf.root.random(), wf.root.utc_now(), and wf.root.execute_task(...)
  • Ordinary task behavior: top-level @app.task calls are standalone task invocations unless they are called from inside a workflow
  • Workflow scope errors: invalid root-only workflow calls raise DeterministicOperationScopeError with invocation and workflow context
  • Pynmon workflow markers: timeline views outline workflow-defining invocations so workflow roots and sub-workflow roots are easier to spot

See the Changelog for the complete list of changes.

Key Features

  • Modular Plugin Architecture: Pynenc is built with modularity at its core, supporting various backend implementations through a plugin system:

    • Memory Backend: Built-in development/testing mode for local execution (single-host only)
    • SQLite Backend: Built-in backend for testing on a single host (compatible with any runner sharing the same database file)
    • Redis Plugin (pynenc-redis): Production-ready distributed task management
    • MongoDB Plugin (pynenc-mongodb): Document-based storage with full feature support
    • RabbitMQ Plugin (pynenc-rabbitmq): Message queue-based broker for distributed task orchestration

    The plugin system allows easy extension with additional databases, message queues, and services, enabling customization for different operational needs and environments.

  • Intuitive Orchestration: Simplifies the setup and management of tasks in distributed systems, focusing on usability and practicality.

  • Flexible Configuration with PynencBuilder: A fluent builder interface allows users to configure apps programmatically with method chaining. Backend plugins automatically extend the builder with their own methods:

    from pynenc.builder import PynencBuilder
    
    # Production setup with Redis (requires pynenc-redis plugin)
    app = (
        PynencBuilder()
        .app_id("my_app")
        .redis(url="redis://localhost:6379")  # Plugin-provided method
        .multi_thread_runner(min_threads=2, max_threads=8)
        .logging_level("info")
        .build()
    )
    
    # Development/testing setup (no plugins required)
    app = PynencBuilder().app_id("my_app").memory().dev_mode().build()
    
  • Invocation Status State Machine: Type-safe, declarative status management with:

    • Ownership tracking for invocations across distributed runners
    • Automatic recovery of stuck PENDING and RUNNING invocations
    • Runner heartbeat monitoring to detect inactive runners
    • Comprehensive state transitions with validation

    Pynenc invocation status state machine

  • Configurable Concurrency Management: Pynenc supports concurrency control at several levels:

    • Task-Level Concurrency: Ensures only one instance of a specific task is in a running state at any given time.
    • Argument-Level Concurrency: Limits concurrent execution based on the arguments of the task, allowing only one task with a unique set of arguments to be running or pending.
    • Key Argument-Level Concurrency: Pick a subset of arguments (e.g. key_arguments=("account_id",)) and serialise only invocations that share that key — perfect for per-tenant external API calls. Different keys still run fully in parallel. See the concurrency_demo sample for a runnable FastAPI-based walkthrough.

    This gives the orchestrator enough information to avoid duplicate work, protect shared resources, and keep unrelated work running in parallel.

  • Real-Time Monitoring with Pynmon: Built-in web-based monitoring interface featuring:

    • SVG-based timeline visualization of invocations and state transitions
    • Runner health monitoring with heartbeat tracking
    • Workflow visualization with parent-child relationships
    • Task details with execution history and context
    • Trigger event browser with event-to-invocation linking and timeline overlays
    • HTMX-powered real-time updates
  • Trigger System: Enables declarative task scheduling and event-driven workflows:

    • Trigger Conditions: Schedule tasks using cron expressions, react to events, task status changes, results, or exceptions.
    • Definition-Time Reactions: Declare the reaction on the task that should run next instead of wiring callbacks every time the upstream task is called.
    • Flexible Argument Handling:
      • ArgumentProvider: Dynamically generate arguments for triggered tasks from the context of the condition (static values or using custom functions).
      • ArgumentFilter: Filter task execution based on original task arguments (exact match dictionary or custom validation function).
      • ResultFilter: Conditionally trigger tasks based on specific result values of the preceding task.
      • Event Payload Filtering: Selectively process events based on payload content.
    • Composable Conditions: Combine multiple conditions with AND/OR logic for complex triggering rules.
    • Runnable Example: See the trigger_demo sample for cron, events, status chains, exception compensation, and composite status/result conditions.
  • Workflow System: Durable orchestration for multi-step task processes:

    • Explicit Workflow Tasks: Use @app.workflow for orchestration and @app.task for ordinary activity work.
    • Deterministic Root Operations: wf.root.uuid(), wf.root.random(), and wf.root.utc_now() replay values when a workflow invocation is retried.
    • Workflow Identity: Workflow roots and sub-workflow roots have durable ids and parent workflow links.
    • Workflow Data: wf.set_data() and wf.get_data() store workflow-scoped milestones and decisions.
    • Child Invocation Replay: wf.root.execute_task(...) records child calls so retries can reuse completed work when arguments are stable.
  • Core Services & Automatic Recovery: Built-in recovery tasks automatically detect and re-queue stuck invocations:

    • Pending Recovery: Invocations stuck in PENDING beyond a configurable timeout are re-routed through the broker.
    • Running Recovery: Invocations owned by runners that stopped sending heartbeats are detected and re-queued.
    • Atomic Service Scheduling: A time-slot distribution algorithm ensures only one runner executes global services (trigger evaluation, recovery) per cycle, preventing race conditions in multi-runner deployments.
  • Automatic Task Prioritization: The broker prioritizes tasks by counting how many other tasks depend on them. The task blocking the most others is selected first.

  • Automatic Task Pausing: Tasks waiting for dependencies are paused, freeing their runner slots. Higher-priority tasks (those with more dependents waiting) run instead, preventing thread-pool exhaustion and deadlocks.

  • Incremental Migration with @app.direct_task: For codebases adopting pynenc gradually, the @app.direct_task decorator preserves the calling contract of a regular Python function — the caller waits, gets back the value, exception handling is unchanged. Combined with PYNENC__DEV_MODE_FORCE_SYNC_TASKS=True, decorated functions run inline during development; remove the variable in production to distribute to workers. No call site has to be rewritten.

    @app.direct_task
    def analyze(data: str) -> dict:
        return expensive_computation(data)
    
    result = analyze(my_data)  # returns the value directly — no Invocation
    

    See the direct_task_demo sample and the usage guide for the migration pattern in detail.

Installation

Installing Pynenc is a simple process. The core package provides the framework, and you'll need to install backend plugins separately:

Core (supports Python 3.11+)

pip install pynenc

Backend Plugins

Choose the backend that fits your needs:

Redis Backend (recommended for production):

pip install pynenc-redis

MongoDB Backend:

pip install pynenc-mongodb

RabbitMQ Backend:

pip install pynenc-rabbitmq

Optional Features

Include the monitoring web app:

pip install pynenc[monitor]

Complete Installation Examples

For a Redis-based setup with monitoring:

pip install pynenc pynenc-redis pynenc[monitor]

For a MongoDB-based setup:

pip install pynenc pynenc-mongodb

For a RabbitMQ-based setup:

pip install pynenc pynenc-rabbitmq

For development/testing (memory or SQLite backend only):

pip install pynenc

This modular approach allows you to install only the components you need, keeping your dependencies minimal and focused.

For more detailed instructions and advanced installation options, please refer to the Pynenc Documentation.

Quick Start Example

To get started with Pynenc, here's a simple example that demonstrates the creation of a distributed task for adding two numbers. Follow these steps to quickly set up a basic task and execute it.

  1. Define a Task: Create a file named tasks.py and define a simple addition task:

    from pynenc import Pynenc
    
    app = Pynenc()
    
    @app.task
    def add(x: int, y: int) -> int:
        add.logger.info(f"{add.task_id=} Adding {x} + {y}")
        return x + y
    
    @app.direct_task
    def direct_add(x: int, y: int) -> int:
        return x + y
    
  2. Start Your Runner or Run Synchronously:

    Before executing the task, decide if you want to run it asynchronously with a runner or synchronously for testing or development purposes.

    • Asynchronously: Start a runner in a separate terminal or script:

      pynenc runner start
      

    The CLI auto-discovers the app when the current directory contains exactly one importable file with a Pynenc() instance. If your project has multiple apps, select one explicitly:

    pynenc --app tasks.app runner start
    

    Check for the basic_redis_example (requires pynenc-redis plugin)

    • Synchronously: For test or local demonstration, to try synchronous execution, you can set the environment variable PYNENC__DEV_MODE_FORCE_SYNC_TASKS=True to force tasks to run in the same thread.
  3. Execute the Task:

     # Standard task (returns invocation)
     result = add(1, 2).result  # 3
    
     # Direct task (returns result directly)
     direct_result = direct_add(1, 2)  # 3
    

Running a local task in memory with ThreadRunner

Using the Trigger System

Triggers are declared on the task that should react. The caller of the upstream task does not need to remember callback wiring or chain construction.

from typing import Any

from pynenc import Pynenc
from pynenc.invocation.status import InvocationStatus
from pynenc.trigger.conditions.event import EventContext
from pynenc.trigger.trigger_builder import TriggerBuilder

app = Pynenc()  # configure trigger_cls and trigger_task_modules for runners


def args_from_feed_event(ctx: EventContext) -> dict[str, Any]:
    return {
        "source": ctx.payload.get("source", "default"),
        "count": ctx.payload.get("count", 3),
    }


@app.task(
    triggers=[
        TriggerBuilder()
        .on_cron("*/15 * * * *")
        .with_args_static({"source": "scheduled", "count": 3}),
        TriggerBuilder()
        .on_event("feed_updated")
        .with_args_from_event(args_from_feed_event),
    ]
)
def ingest_feed(source: str, count: int) -> dict[str, Any]:
    return {"source": source, "count": count}


@app.task
def enrich_article(article_id: str, kind: str) -> dict[str, str]:
    return {"article_id": article_id, "kind": kind, "status": "enriched"}


def args_from_enrich_status(ctx: Any) -> dict[str, Any]:
    return {"article_id": ctx.arguments.kwargs["article_id"]}


@app.task(
    triggers=TriggerBuilder()
    .on_status(
        enrich_article,
        statuses=[InvocationStatus.SUCCESS],
        call_arguments={"kind": "breaking_news"},
    )
    .with_args_from_status(args_from_enrich_status)
)
def notify_subscribers(article_id: str) -> str:
    return f"notified:{article_id}"

Argument providers and custom filters must be module-level named functions so trigger backends can serialize their reference. Avoid lambdas in trigger definitions.

For a complete guide, see the Trigger System documentation and the trigger_demo sample.

Monitoring with Pynmon

Pynenc includes Pynmon, a built-in web-based monitoring interface that provides real-time visibility into your distributed task execution — no external tooling required.

Pynmon dashboard showing application overview, invocation status, component architecture, and configuration

Execution Timeline

See exactly what ran across every runner and worker, at every moment. Status transitions are color-coded with connections between parent and child invocations. Click any invocation to inspect its full status history and the runner context that executed it.

Pynmon timeline comparing ThreadRunner, ProcessRunner, PersistentProcessRunner, and MultiThreadRunner side by side

Family Tree & Invocation Details

Navigate the full hierarchy of task calls as an interactive graph. Selecting a node cross-highlights it on the timeline, and vice versa — making it trivial to understand both the logical structure and the physical execution of complex workflows.

Pynmon family tree overlaid on timeline with cross-highlighting between graph and execution view

Log Explorer

Paste your Pynenc log lines and the Log Explorer augments them with full context — parsing runner contexts, invocation IDs, and task references, resolving each to its detail page. It generates a mini-timeline of all invocations mentioned in the logs and highlights runners and workers with direct links.

Pynmon Log Explorer parsing log lines with augmented context, mini-timeline, and links to invocation and runner details

Trigger Events

Every event published with app.trigger.emit_event(...) is stored alongside the invocations it produced. The /events view in Pynmon lists emitted events, lets you filter by event code, time range, or whether they matched a registered condition, and links each event to the trigger runs and downstream invocations it created. Open an invocation that was launched by a trigger to see the originating event in its detail page, and watch the same events appear as markers on the timeline. Retention can be tuned through event_retention_days, event_max_records, trigger_run_max_records, and event_auto_purge_enabled config fields.

Starting the Monitor

The monitor requires a Pynenc app defined in your codebase:

pynenc monitor --host 127.0.0.1 --port 8000

If more than one app is available, specify the one to inspect:

pynenc --app your_app_module monitor --host 127.0.0.1 --port 8000

Then open http://127.0.0.1:8000 in your browser to access the dashboard.

Installing the Monitor

The monitoring web app is an optional feature:

pip install pynenc[monitor]

Requirements

  • Python 3.11+
  • Core package: No external infrastructure needed — includes memory and SQLite backends for development and testing
  • Production: Install a backend plugin (pynenc-redis, pynenc-mongodb, or pynenc-rabbitmq) and ensure the corresponding service is running

The plugin architecture lets you swap backends without changing application code.

Contributing

Contributions are welcome! See CONTRIBUTING.md for development setup, testing instructions, and pull request guidelines.

Community & Support

License

Pynenc is released under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynenc-0.3.0.tar.gz (4.9 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pynenc-0.3.0-py3-none-any.whl (1.2 MB view details)

Uploaded Python 3

File details

Details for the file pynenc-0.3.0.tar.gz.

File metadata

  • Download URL: pynenc-0.3.0.tar.gz
  • Upload date:
  • Size: 4.9 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.26 {"installer":{"name":"uv","version":"0.11.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pynenc-0.3.0.tar.gz
Algorithm Hash digest
SHA256 215efcb37bf72d2682aa40d1978150e5db1814ba5b0d536f6decd3e32486c279
MD5 9ba8b2098d2d4521519a73945422e40a
BLAKE2b-256 60430dde32158e4515a65074643d72ff06b7081b98d274009b11861fa650314b

See more details on using hashes here.

File details

Details for the file pynenc-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pynenc-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 1.2 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.26 {"installer":{"name":"uv","version":"0.11.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pynenc-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f93af7081e8464971d7205e7d4132da545091da22f6cbf02f9e96c50e0efe71d
MD5 d669d2c15aa95b3a668501c74d46bc39
BLAKE2b-256 99934f5294bcd7b719aeaca1f22e4ecc13272466e60dd21609d1c1f0a8e4c0a0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page