Skip to main content

A task management system for complex distributed orchestration

Project description

Pynenc

Pynenc

A task management system for complex distributed orchestration

Package version Supported Python versions GitHub last commit GitHub contributors GitHub issues GitHub license GitHub Repo stars GitHub forks


Documentation: https://docs.pynenc.org

Source Code: https://github.com/pynenc/pynenc


Pynenc addresses the complex challenges of task management in distributed environments, offering a robust solution for developers looking to efficiently orchestrate asynchronous tasks across multiple systems. By combining intuitive configuration with advanced features like automatic task prioritization and cycle detection, Pynenc empowers developers to build scalable and reliable distributed applications with ease.

Key Features

  • Intuitive Orchestration: Simplifies the setup and management of tasks in distributed systems, focusing on usability and practicality.

  • Configurable Concurrency Management: Pynenc offers versatile concurrency control mechanisms at various levels. It includes:

    • Task-Level Concurrency: Ensures only one instance of a specific task is in a running state at any given time.
    • Argument-Level Concurrency: Limits concurrent execution based on the arguments of the task, allowing only one task with a unique set of arguments to be running or pending.
    • Key Argument-Level Concurrency: Further refines control by focusing on key arguments, ensuring uniqueness in task execution based on specified key arguments.

    This structured approach to concurrency management in Pynenc allows for precise control over task execution, ensuring efficient handling of tasks without overloading the system and adhering to specified constraints.

  • Comprehensive Trigger System: Enables declarative task scheduling and event-driven workflows:

    • Diverse Trigger Conditions: Schedule tasks using cron expressions, react to events, task status changes, results, or exceptions.
    • Flexible Argument Handling:
      • ArgumentProvider: Dynamically generate arguments for triggered tasks from the context of the condition (static values or using custom functions).
      • ArgumentFilter: Filter task execution based on original task arguments (exact match dictionary or custom validation function).
      • ResultFilter: Conditionally trigger tasks based on specific result values of the preceding task.
      • Event Payload Filtering: Selectively process events based on payload content.
    • Composable Conditions: Combine multiple conditions with AND/OR logic for complex triggering rules.
  • Advanced Workflow System: Sophisticated task orchestration with deterministic execution and state management:

    • Deterministic Execution: All non-deterministic operations (random numbers, UUIDs, timestamps) are made deterministic for perfect replay.
    • Workflow Identity: Unique workflow contexts with parent-child relationships and inheritance.
    • State Persistence: Automatic key-value storage for workflow data with failure recovery capabilities.
    • Task Integration: Integration with existing Pynenc tasks using force_new_workflow decorator option.
    • Failure Recovery: Workflows can resume from exact points of failure with identical replay behavior.
  • Flexible Configuration with PynencBuilder: A builder interface allows users to configure apps programmatically, including Redis vs memory mode, runners, logging levels, concurrency control, and argument formatting.

  • Automatic Task Prioritization: Pynenc prioritizes tasks by simply counting the number of dependencies each task has. The task with the most dependencies is selected first.

  • Automatic Task Pausing: Pynenc pauses tasks that are waiting for other tasks to complete. So those with higher priority (has more dependent task waiting for them) can run instead, instead of blocking a runner and preventing deadlocks.

  • Cycle Detection: Automatically detects cyclical dependencies among tasks and raises exceptions to prevent endless loops in task execution.

  • Modularity and Extensibility: Pynenc is built with modularity at its core, supporting various components such as orchestrators, brokers, state backends, runners, and serializers. Currently compatible with Redis and a development/test mode using an in-memory synchronous version, Pynenc is designed to be extensible. Future plans include support for additional databases, queues, and services, enabling easy customization and adaptation to different operational needs and environments.

  • Direct Task Execution: Use @app.direct_task for tasks that return results directly instead of invocations.

  • Monitoring: Includes a built-in monitoring web application for real-time task execution insights.

Installation

Installing Pynenc is a simple process that can be done using pip. Just run the following command in your terminal:

pip install pynenc

Include optional monitoring web app:

pip install pynenc[monitor]

This command will download and install Pynenc along with its dependencies. Once the installation is complete, you can start using Pynenc in your Python projects.

For more detailed instructions and advanced installation options, please refer to the Pynenc Documentation.

Quick Start Example

To get started with Pynenc, here's a simple example that demonstrates the creation of a distributed task for adding two numbers. Follow these steps to quickly set up a basic task and execute it.

  1. Define a Task: Create a file named tasks.py and define a simple addition task:

    from pynenc import Pynenc
    
    app = Pynenc()
    
    @app.task
    def add(x: int, y: int) -> int:
        add.logger.info(f"{add.task_id=} Adding {x} + {y}")
        return x + y
    
    @app.direct_task
    def direct_add(x: int, y: int) -> int:
        return x + y
    
  2. Start Your Runner or Run Synchronously:

    Before executing the task, decide if you want to run it asynchronously with a runner or synchronously for testing or development purposes.

    • Asynchronously: Start a runner in a separate terminal or script:

      pynenc --app=tasks.app runner start
      

      Check for the basic_redis_example

    • Synchronously: For test or local demonstration, to try synchronous execution, you can set the environment variable PYNENC__DEV_MODE_FORCE_SYNC_TASKS=True to force tasks to run in the same thread.

  3. Execute the Task:

     # Standard task (returns invocation)
     result = add(1, 2).result  # 3
    
     # Direct task (returns result directly)
     direct_result = direct_add(1, 2)  # 3
    

Using the Trigger System

Here's an example of creating and using triggers:

from pynenc import Pynenc
from datetime import datetime

app = Pynenc()

@app.task
def process_data(data: dict) -> dict:
    return {"processed": data, "timestamp": datetime.now().isoformat()}

@app.task
def notify_admin(result: dict, urgency: str = "normal") -> None:
    print(f"Admin notification ({urgency}): {result}")

# Create a trigger that runs when process_data completes successfully
trigger = app.trigger.on_success(process_data).run(notify_admin)

# Create a trigger with argument filtering - only trigger when data contains 'urgent'
trigger_urgent = (app.trigger
    .on_success(process_data)
    .with_argument_filter(lambda args: args.get('data', {}).get('priority') == 'urgent')
    .run(notify_admin, argument_provider=lambda ctx: [ctx.result, "high"])
)

# Create a cron-based scheduled task
scheduled_task = (app.trigger
    .on_cron("*/30 * * * *")  # Every 30 minutes
    .run(process_data, argument_provider={"data": {"source": "scheduled"}})
)

For a complete guide on how to set up and run pynenc, visit our samples library.

Running the Monitor

It always requires a Pynenc app defined in your codebase:

pynenc --app your_app_module monitor --host 127.0.0.1 --port 8000

Requirements

To use Pynenc in a distributed system, the current primary requirement is:

  • Redis: As of now, Pynenc requires a Redis server to handle distributed task management. Ensure that you have Redis installed and running in your environment.

Future Updates:

  • Pynenc is being developed to support additional databases and message queues. This will expand its compatibility and usability in various distributed systems.

Contact or Support

If you need help with Pynenc or want to discuss any aspects of its usage, feel free to reach out through the following channels:

  • GitHub Issues: For bug reports, feature requests, or other technical queries, please use our GitHub Issues page. You can create a new issue or contribute to existing discussions.

  • GitHub Discussions: For more general questions, ideas exchange, or discussions about Pynenc, consider using GitHub Discussions on our repository. It's a great place to connect with other users and the development team.

Remember, your feedback and contributions are essential in helping Pynenc grow and improve!

License

Pynenc is released under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynenc-0.0.24.tar.gz (194.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pynenc-0.0.24-py3-none-any.whl (264.2 kB view details)

Uploaded Python 3

File details

Details for the file pynenc-0.0.24.tar.gz.

File metadata

  • Download URL: pynenc-0.0.24.tar.gz
  • Upload date:
  • Size: 194.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.13

File hashes

Hashes for pynenc-0.0.24.tar.gz
Algorithm Hash digest
SHA256 408dbf74096aa04e5b0b094f98d24170b4e05d0320e4ab3e6b64add10cbd3334
MD5 ec6713d15f901fff5007e6217aed1518
BLAKE2b-256 e62a9aa556dcc39efe9b7f60e86581ee37054620ba478311c8d684d58d3e53a7

See more details on using hashes here.

File details

Details for the file pynenc-0.0.24-py3-none-any.whl.

File metadata

  • Download URL: pynenc-0.0.24-py3-none-any.whl
  • Upload date:
  • Size: 264.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.13

File hashes

Hashes for pynenc-0.0.24-py3-none-any.whl
Algorithm Hash digest
SHA256 c9c5f77bd742723c95c5ec67b881939c4ea537d99b279502862fd7dbce01b1c7
MD5 475cc8323ea8b1d61971adf79bd747cd
BLAKE2b-256 df3a304facd8734f742b05851670b7125f5f1b291d4564e8a837c087fdf1e14c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page