Skip to main content

No project description provided

Project description

Process Orchestrator

Summary

This project aims to develop a process orchestrator. The goal is to break down a large process into smaller, manageable subprocesses, where each one focuses on a specific task and is carried out through a series of steps within a "pipeline".

The core of the proposal is to allow developers to create their own "pipelines", including the corresponding steps, in the code in a simple and efficient way.

In summary, this project seeks to facilitate and optimize the management of large processes by dividing them into smaller, specific subprocesses, thus facilitating their implementation and monitoring.

Pipeline and Steps

To illustrate the application of this orchestrator, consider the example of the backup automation process. This process can be divided into several subprocesses, each of which encompasses a specific task:

  1. Backup generation.
  2. Transfer of the file to secure storage.
  3. Notification of the process completion.

Step 1: Define the Pipeline

Start by defining your pipeline. This involves subclassing Pipeline and specifying the sequence of steps it should execute. Use the @register_pipeline decorator to register the pipeline class.

from prorch.pipeline.pipeline import Pipeline
from prorch.decorators.decorators import register_pipeline

@register_pipeline
class  BackupAutomationPipeline(Pipeline):
	name = "BackupAutomationPipeline"
	steps = [BackupGeneration, BackupTransfer, ProcessCompletionNotification]

Step 2: Implement the Steps

Each step in the backup process is implemented as a subclass of Step. Inside each step, you define its behavior through the on_start and on_continue methods, utilizing the OnStartMixin and OnContinueMixin for additional functionalities.

from prorch.step.step import Step
from prorch.mixins.on_start import OnStartMixin
from prorch.mixins.on_continue import OnContinueMixin
from prorch.decorators.decorators import register_step


@register_step
class  BackupGeneration(OnStartMixin,  OnContinueMixin,  Step):
name = "BackupGeneration"

def  on_start(self):
	# Logic for initiating the backup generation
	print("Starting the backup generation process.")

def  on_continue(self):
	# Logic to proceed with backup generation
	print("Backup generation completed.")
	self.finish() # not necessary in first execution

Continue implementing the remaining steps (BackupTransfer, and ProcessCompletionNotification) in a similar manner, making sure to define the specific logic for each step's on_start and on_continue methods.

Step 3: Execute the Pipeline

With the steps defined, you can now execute the pipeline. The pipeline execution can be triggered through your application logic, scheduling mechanisms, or an event-driven trigger, depending on your project's requirements.

Ensure you have instantiated the BackupAutomationPipeline and call its execution method according to the framework's execution model.

# Example of pipeline execution
pipeline = BackupAutomationPipeline(repository_class=TinyDBRepository)
pipeline.start()

Customize each step according to your specifications and operational environment.

Orchestrator: Managing and Executing Pipelines

The Orchestrator class is designed to manage and execute pipelines that are marked as pending. It works by interfacing with a repository that stores the pipeline data, making use of the IRepository interface to ensure a standard structure for repository implementations.

Implementing the IRepository Interface

To utilize the Orchestrator, an implementation of the IRepository interface is required. This interface is crucial for storing and managing pipeline information in a structured way. The IRepository interface mandates the implementation of several methods to support basic CRUD (Create, Read, Update, Delete) operations on the pipeline data. The methods required to be overridden in any implementing class are:

  • get(uuid: str) -> Dict: Retrieves an item by its unique identifier (UUID).

  • save(data: Dict) -> Dict: Saves an item to the repository and returns information about the saved item.

  • update(uuid: str, data: Dict) -> None: Updates an existing item identified by its UUID.

  • search(filter: List[Union[str, List]]) -> List[Dict]: Searches for items matching specified criteria.

Following the interface's requirements, below is an example of how to implement the IRepository using TinyDB, a lightweight document-oriented database.

from tinydb import TinyDB, where
from typing import Dict, List, Union

from prorch.dataclasses.orchestrator import SavedItem
from prorch.interfaces.repository import IRepository
from prorch.utils.constants import Metadata

class TinyDBRepository(IRepository):
    def __init__(self, model: str):
        super().__init__(model)
        self._db = TinyDB("database.json")
        self._table = self._db.table(Metadata.TABLE_NAMES[model])

    def get(self, uuid: str) -> Dict:
        return self._table.get(where("uuid") == uuid)

    def save(self, data: Dict) -> SavedItem:
        id = self._table.insert(data)

        return SavedItem(identifier=id)

    def update(self, uuid: str, data: Dict) -> None:
        self._table.update(data, where("uuid") == uuid)

    def search(self, filter: List[Union[str, List]]) -> List[Dict]:
        field, value = filter

        return self._table.search(where(field) == value)

This implementation outlines how to adhere to the IRepository interface using TinyDB, showcasing the necessary methods and their purpose within the context of managing pipeline data.

Configuring and Using the Orchestrator

With a repository implemented, you can configure and instantiate the Orchestrator to manage and execute pipelines. Here's a simple example using TinyDBRepository as the repository class:

from prorch.dataclasses.orchestrator import OrchestratorConfig
from your_repository_implementation import TinyDBRepository # Ensure to replace this with your actual repository class

from prorch.orchestrator import Orchestrator

# Configuration for the orchestrator
orchestrator_config = OrchestratorConfig(repository_class=TinyDBRepository)

# Instantiate the orchestrator
orchestrator = Orchestrator(config=orchestrator_config)

# Execute all pending pipelines
orchestrator.execute_pipelines()

This setup demonstrates how to prepare the Orchestrator for running pipelines that are marked as pending within your repository. The Orchestrator fetches all active pipelines and executes them, facilitating automated process management in your application.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prorch-1.0.1.tar.gz (10.3 kB view details)

Uploaded Source

Built Distribution

prorch-1.0.1-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file prorch-1.0.1.tar.gz.

File metadata

  • Download URL: prorch-1.0.1.tar.gz
  • Upload date:
  • Size: 10.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.19

File hashes

Hashes for prorch-1.0.1.tar.gz
Algorithm Hash digest
SHA256 4bedc6a2a685ebfffb425e2330574b46ec8d43b76c9186102e7cbaba78d84ff5
MD5 4edecaea45766f64c5faaef4aa887e52
BLAKE2b-256 bee9ed51dea5464e5384f678996c4534392e4a383209ddd5c6d25f53e3021281

See more details on using hashes here.

File details

Details for the file prorch-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: prorch-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.19

File hashes

Hashes for prorch-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 330750c36b267f1fc9885cb5741a6fad3bc7700daebd7656248c4a6eacca621f
MD5 9029dd50cfd1c840b43ff5e95d051a51
BLAKE2b-256 9da916085bd6f17d40b69afd115941e61020d7dbd585aff1b73f9265f47965ea

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page