No project description provided
Project description
Process Orchestrator
- Summary
- Pipeline and Steps
- Orchestrator: Managing and Executing Pipelines
- Configuring and Using the Orchestrator
Summary
This project aims to develop a process orchestrator. The goal is to break down a large process into smaller, manageable subprocesses, where each one focuses on a specific task and is carried out through a series of steps within a "pipeline".
The core of the proposal is to allow developers to create their own "pipelines", including the corresponding steps, in the code in a simple and efficient way.
In summary, this project seeks to facilitate and optimize the management of large processes by dividing them into smaller, specific subprocesses, thus facilitating their implementation and monitoring.
Pipeline and Steps
To illustrate the application of this orchestrator, consider the example of the backup automation process. This process can be divided into several subprocesses, each of which encompasses a specific task:
- Backup generation.
- Transfer of the file to secure storage.
- Notification of the process completion.
Step 1: Define the Pipeline
Start by defining your pipeline. This involves subclassing Pipeline
and specifying the sequence of steps it should execute. Use the @register_pipeline
decorator to register the pipeline class.
from prorch.pipeline.pipeline import Pipeline
from prorch.decorators.decorators import register_pipeline
@register_pipeline
class BackupAutomationPipeline(Pipeline):
name = "BackupAutomationPipeline"
steps = [BackupGeneration, BackupTransfer, ProcessCompletionNotification]
Step 2: Implement the Steps
Each step in the backup process is implemented as a subclass of Step. Inside each step, you define its behavior through the on_start and on_continue methods, utilizing the OnStartMixin and OnContinueMixin for additional functionalities.
from prorch.step.step import Step
from prorch.mixins.on_start import OnStartMixin
from prorch.mixins.on_continue import OnContinueMixin
from prorch.decorators.decorators import register_step
@register_step
class BackupGeneration(OnStartMixin, OnContinueMixin, Step):
name = "BackupGeneration"
def on_start(self):
# Logic for initiating the backup generation
print("Starting the backup generation process.")
def on_continue(self):
# Logic to proceed with backup generation
print("Backup generation completed.")
self.finish() # not necessary in first execution
Continue implementing the remaining steps (BackupTransfer, and ProcessCompletionNotification) in a similar manner, making sure to define the specific logic for each step's on_start and on_continue methods.
Step 3: Execute the Pipeline
With the steps defined, you can now execute the pipeline. The pipeline execution can be triggered through your application logic, scheduling mechanisms, or an event-driven trigger, depending on your project's requirements.
Ensure you have instantiated the BackupAutomationPipeline and call its execution method according to the framework's execution model.
# Example of pipeline execution
pipeline = BackupAutomationPipeline(repository_class=TinyDBRepository)
pipeline.start()
Customize each step according to your specifications and operational environment.
Orchestrator: Managing and Executing Pipelines
The Orchestrator
class is designed to manage and execute pipelines that are marked as pending. It works by interfacing with a repository that stores the pipeline data, making use of the IRepository
interface to ensure a standard structure for repository implementations.
Implementing the IRepository Interface
To utilize the Orchestrator
, an implementation of the IRepository
interface is required. This interface is crucial for storing and managing pipeline information in a structured way. The IRepository
interface mandates the implementation of several methods to support basic CRUD (Create, Read, Update, Delete) operations on the pipeline data. The methods required to be overridden in any implementing class are:
-
get(uuid: str) -> Dict
: Retrieves an item by its unique identifier (UUID). -
save(data: Dict) -> Dict
: Saves an item to the repository and returns information about the saved item. -
update(uuid: str, data: Dict) -> None
: Updates an existing item identified by its UUID. -
search(filter: List[Union[str, List]]) -> List[Dict]
: Searches for items matching specified criteria.
Following the interface's requirements, below is an example of how to implement the IRepository
using TinyDB, a lightweight document-oriented database.
from tinydb import TinyDB, where
from typing import Dict, List, Union
from prorch.dataclasses.orchestrator import SavedItem
from prorch.interfaces.repository import IRepository
from prorch.utils.constants import Metadata
class TinyDBRepository(IRepository):
def __init__(self, model: str):
super().__init__(model)
self._db = TinyDB("database.json")
self._table = self._db.table(Metadata.TABLE_NAMES[model])
def get(self, uuid: str) -> Dict:
return self._table.get(where("uuid") == uuid)
def save(self, data: Dict) -> SavedItem:
id = self._table.insert(data)
return SavedItem(identifier=id)
def update(self, uuid: str, data: Dict) -> None:
self._table.update(data, where("uuid") == uuid)
def search(self, filter: List[Union[str, List]]) -> List[Dict]:
field, value = filter
return self._table.search(where(field) == value)
This implementation outlines how to adhere to the IRepository
interface using TinyDB, showcasing the necessary methods and their purpose within the context of managing pipeline data.
Configuring and Using the Orchestrator
With a repository implemented, you can configure and instantiate the Orchestrator to manage and execute pipelines. Here's a simple example using TinyDBRepository as the repository class:
from prorch.dataclasses.orchestrator import OrchestratorConfig
from your_repository_implementation import TinyDBRepository # Ensure to replace this with your actual repository class
from prorch.orchestrator import Orchestrator
# Configuration for the orchestrator
orchestrator_config = OrchestratorConfig(repository_class=TinyDBRepository)
# Instantiate the orchestrator
orchestrator = Orchestrator(config=orchestrator_config)
# Execute all pending pipelines
orchestrator.execute_pipelines()
This setup demonstrates how to prepare the Orchestrator for running pipelines that are marked as pending within your repository. The Orchestrator fetches all active pipelines and executes them, facilitating automated process management in your application.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file prorch-1.0.1.tar.gz
.
File metadata
- Download URL: prorch-1.0.1.tar.gz
- Upload date:
- Size: 10.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4bedc6a2a685ebfffb425e2330574b46ec8d43b76c9186102e7cbaba78d84ff5 |
|
MD5 | 4edecaea45766f64c5faaef4aa887e52 |
|
BLAKE2b-256 | bee9ed51dea5464e5384f678996c4534392e4a383209ddd5c6d25f53e3021281 |
File details
Details for the file prorch-1.0.1-py3-none-any.whl
.
File metadata
- Download URL: prorch-1.0.1-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 330750c36b267f1fc9885cb5741a6fad3bc7700daebd7656248c4a6eacca621f |
|
MD5 | 9029dd50cfd1c840b43ff5e95d051a51 |
|
BLAKE2b-256 | 9da916085bd6f17d40b69afd115941e61020d7dbd585aff1b73f9265f47965ea |