A task management system for complex distributed orchestration
Project description
Pynenc
A task management system for complex distributed orchestration
Documentation: https://docs.pynenc.org
Source Code: https://github.com/pynenc/pynenc
Pynenc addresses the complex challenges of task management in distributed environments, offering a robust solution for developers looking to efficiently orchestrate asynchronous tasks across multiple systems. By combining intuitive configuration with advanced features like automatic task prioritization and cycle detection, Pynenc empowers developers to build scalable and reliable distributed applications with ease.
Key Features
-
Intuitive Orchestration: Simplifies the setup and management of tasks in distributed systems, focusing on usability and practicality.
-
Configurable Concurrency Management: Pynenc offers versatile concurrency control mechanisms at various levels. It includes:
- Task-Level Concurrency: Ensures only one instance of a specific task is in a running state at any given time.
- Argument-Level Concurrency: Limits concurrent execution based on the arguments of the task, allowing only one task with a unique set of arguments to be running or pending.
- Key Argument-Level Concurrency: Further refines control by focusing on key arguments, ensuring uniqueness in task execution based on specified key arguments.
This structured approach to concurrency management in Pynenc allows for precise control over task execution, ensuring efficient handling of tasks without overloading the system and adhering to specified constraints.
-
Comprehensive Trigger System: Enables declarative task scheduling and event-driven workflows:
- Diverse Trigger Conditions: Schedule tasks using cron expressions, react to events, task status changes, results, or exceptions.
- Flexible Argument Handling:
- ArgumentProvider: Dynamically generate arguments for triggered tasks from the context of the condition (static values or using custom functions).
- ArgumentFilter: Filter task execution based on original task arguments (exact match dictionary or custom validation function).
- ResultFilter: Conditionally trigger tasks based on specific result values of the preceding task.
- Event Payload Filtering: Selectively process events based on payload content.
- Composable Conditions: Combine multiple conditions with AND/OR logic for complex triggering rules.
-
Advanced Workflow System: Sophisticated task orchestration with deterministic execution and state management:
- Deterministic Execution: All non-deterministic operations (random numbers, UUIDs, timestamps) are made deterministic for perfect replay.
- Workflow Identity: Unique workflow contexts with parent-child relationships and inheritance.
- State Persistence: Automatic key-value storage for workflow data with failure recovery capabilities.
- Task Integration: Integration with existing Pynenc tasks using
force_new_workflowdecorator option. - Failure Recovery: Workflows can resume from exact points of failure with identical replay behavior.
-
Flexible Configuration with
PynencBuilder: A builder interface allows users to configure apps programmatically, including Redis vs memory mode, runners, logging levels, concurrency control, and argument formatting. -
Automatic Task Prioritization: Pynenc prioritizes tasks by simply counting the number of dependencies each task has. The task with the most dependencies is selected first.
-
Automatic Task Pausing: Pynenc pauses tasks that are waiting for other tasks to complete. So those with higher priority (has more dependent task waiting for them) can run instead, instead of blocking a runner and preventing deadlocks.
-
Cycle Detection: Automatically detects cyclical dependencies among tasks and raises exceptions to prevent endless loops in task execution.
-
Modularity and Extensibility: Pynenc is built with modularity at its core, supporting various components such as orchestrators, brokers, state backends, runners, and serializers. Currently compatible with Redis and a development/test mode using an in-memory synchronous version, Pynenc is designed to be extensible. Future plans include support for additional databases, queues, and services, enabling easy customization and adaptation to different operational needs and environments.
-
Direct Task Execution: Use
@app.direct_taskfor tasks that return results directly instead of invocations. -
Monitoring: Includes a built-in monitoring web application for real-time task execution insights.
Installation
Installing Pynenc is a simple process that can be done using pip. Just run the following command in your terminal:
pip install pynenc
Include optional monitoring web app:
pip install pynenc[monitor]
This command will download and install Pynenc along with its dependencies. Once the installation is complete, you can start using Pynenc in your Python projects.
For more detailed instructions and advanced installation options, please refer to the Pynenc Documentation.
Quick Start Example
To get started with Pynenc, here's a simple example that demonstrates the creation of a distributed task for adding two numbers. Follow these steps to quickly set up a basic task and execute it.
-
Define a Task: Create a file named
tasks.pyand define a simple addition task:from pynenc import Pynenc app = Pynenc() @app.task def add(x: int, y: int) -> int: add.logger.info(f"{add.task_id=} Adding {x} + {y}") return x + y @app.direct_task def direct_add(x: int, y: int) -> int: return x + y
-
Start Your Runner or Run Synchronously:
Before executing the task, decide if you want to run it asynchronously with a runner or synchronously for testing or development purposes.
-
Asynchronously: Start a runner in a separate terminal or script:
pynenc --app=tasks.app runner start
Check for the basic_redis_example
-
Synchronously: For test or local demonstration, to try synchronous execution, you can set the environment variable
PYNENC__DEV_MODE_FORCE_SYNC_TASKS=Trueto force tasks to run in the same thread.
-
-
Execute the Task:
# Standard task (returns invocation) result = add(1, 2).result # 3 # Direct task (returns result directly) direct_result = direct_add(1, 2) # 3
Using the Trigger System
Here's an example of creating and using triggers:
from pynenc import Pynenc
from datetime import datetime
app = Pynenc()
@app.task
def process_data(data: dict) -> dict:
return {"processed": data, "timestamp": datetime.now().isoformat()}
@app.task
def notify_admin(result: dict, urgency: str = "normal") -> None:
print(f"Admin notification ({urgency}): {result}")
# Create a trigger that runs when process_data completes successfully
trigger = app.trigger.on_success(process_data).run(notify_admin)
# Create a trigger with argument filtering - only trigger when data contains 'urgent'
trigger_urgent = (app.trigger
.on_success(process_data)
.with_argument_filter(lambda args: args.get('data', {}).get('priority') == 'urgent')
.run(notify_admin, argument_provider=lambda ctx: [ctx.result, "high"])
)
# Create a cron-based scheduled task
scheduled_task = (app.trigger
.on_cron("*/30 * * * *") # Every 30 minutes
.run(process_data, argument_provider={"data": {"source": "scheduled"}})
)
For a complete guide on how to set up and run pynenc, visit our samples library.
Running the Monitor
It always requires a Pynenc app defined in your codebase:
pynenc --app your_app_module monitor --host 127.0.0.1 --port 8000
Requirements
To use Pynenc in a distributed system, the current primary requirement is:
- Redis: As of now, Pynenc requires a Redis server to handle distributed task management. Ensure that you have Redis installed and running in your environment.
Future Updates:
- Pynenc is being developed to support additional databases and message queues. This will expand its compatibility and usability in various distributed systems.
Contact or Support
If you need help with Pynenc or want to discuss any aspects of its usage, feel free to reach out through the following channels:
-
GitHub Issues: For bug reports, feature requests, or other technical queries, please use our GitHub Issues page. You can create a new issue or contribute to existing discussions.
-
GitHub Discussions: For more general questions, ideas exchange, or discussions about Pynenc, consider using GitHub Discussions on our repository. It's a great place to connect with other users and the development team.
Remember, your feedback and contributions are essential in helping Pynenc grow and improve!
License
Pynenc is released under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pynenc-0.0.24.tar.gz.
File metadata
- Download URL: pynenc-0.0.24.tar.gz
- Upload date:
- Size: 194.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
408dbf74096aa04e5b0b094f98d24170b4e05d0320e4ab3e6b64add10cbd3334
|
|
| MD5 |
ec6713d15f901fff5007e6217aed1518
|
|
| BLAKE2b-256 |
e62a9aa556dcc39efe9b7f60e86581ee37054620ba478311c8d684d58d3e53a7
|
File details
Details for the file pynenc-0.0.24-py3-none-any.whl.
File metadata
- Download URL: pynenc-0.0.24-py3-none-any.whl
- Upload date:
- Size: 264.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9c5f77bd742723c95c5ec67b881939c4ea537d99b279502862fd7dbce01b1c7
|
|
| MD5 |
475cc8323ea8b1d61971adf79bd747cd
|
|
| BLAKE2b-256 |
df3a304facd8734f742b05851670b7125f5f1b291d4564e8a837c087fdf1e14c
|