Skip to main content

A powerful Python-based workflow automation tool

Project description

OmniTask Documentation

Overview

OmniTask is a powerful Python-based workflow automation tool that enables the creation and execution of dynamic task chains. It provides a flexible framework for building complex workflows with features like task dependencies, output chaining, and dynamic task loading.

Project Structure

OmniTask/
├── src/
│   ├── core/
│   │   ├── task.py         # Base Task class and core functionality
│   │   ├── workflow.py     # Workflow management and execution
│   │   └── registry.py     # Task registration and discovery
│   ├── models/
│   │   └── task_result.py  # Task result data structure
│   └── utils/
│       └── path_parser.py  # Path parsing utilities
├── tasks/
│   ├── read_file.py       # File reading task
│   ├── http_task.py       # HTTP request task
│   └── print_task.py      # Output printing task
└── main.py                # Main application entry point

Core Components

Task System

The task system is built around the Task base class, which provides the foundation for all custom tasks.

Key Features:

  • Task Definition: Each task must define a unique task_name
  • Library Dependencies: Tasks can specify required Python packages
  • Configuration: Tasks accept configuration parameters
  • Output Handling: Tasks can access outputs from previous tasks
  • Relative Paths: Support for accessing previous task outputs using prev, prev2, etc.

Example Task Implementation:

from src.core.task import Task
from src.models.task_result import TaskResult
# do not import anything else here

class CustomTask(Task):
    task_name = "custom_task"
    library_dependencies = {"required_package"}

    async def execute(self) -> TaskResult:
        import required_package
        
        prev_data = self.get_output("prev")
        
        result = {
            "processed_data": process_data(prev_data)
        }
        
        return TaskResult(success=True, output=result)

Task Registry

The TaskRegistry class handles task discovery and management.

Features:

  • Dynamic Task Loading: Loads tasks from local directories, files, or remote URLs
  • Dependency Management: Installs required Python packages for tasks
  • Task Creation: Creates task instances with proper configuration
  • Remote Task Loading: Supports loading tasks from HTTP/HTTPS sources

Example Registry Usage:

registry = TaskRegistry()
registry.load_tasks_from_source("tasks")  # Load from directory
registry.load_tasks_from_source("https://example.com/tasks/remote_task.py")  # Load from URL
registry.load_tasks_from_source("/path/to/local/task.py")  # Load from file

Workflow Management

The Workflow class manages task execution and dependency resolution.

Features:

  • Registry Integration: Built-in TaskRegistry for task management
  • Task Creation: Direct task creation through workflow
  • Dependency Management: Define task dependencies
  • Execution Order: Automatic determination of task execution order
  • Output Chaining: Automatic passing of task outputs to dependent tasks
  • Error Handling: Graceful handling of task failures

Example Workflow Creation:

import asyncio
from src.core.workflow import Workflow
from src.core.registry import TaskRegistry

async def main():
    # Create registry and load tasks
    registry = TaskRegistry()
    registry.load_tasks_from_source("tasks")
    
    # Create workflow with registry
    workflow = Workflow("my_workflow", registry)
    
    # Create tasks directly through workflow
    task1 = workflow.create_task("task1", "instance1", {"config": "value"})
    task2 = workflow.create_task("task2", "instance2")
    
    # Set dependencies
    task2.add_dependency("instance1")
    
    # Run workflow
    result = await workflow.run()

Best Practices

  1. Task Design

    • Keep tasks focused and single-purpose
    • Import dependencies inside execute() method
    • Handle errors gracefully
    • Document task inputs and outputs
    • Use type hints for better code clarity
  2. Workflow Design

    • Use a single TaskRegistry instance
    • Create workflow with registry injection
    • Use workflow's create_task method
    • Plan task dependencies carefully
    • Use meaningful task names
    • Consider error handling and recovery
    • Monitor execution times
  3. Configuration

    • Use configuration for flexible task behavior
    • Document configuration options
    • Provide sensible defaults
  4. Output Handling

    • Use consistent output formats
    • Include timestamps in outputs
    • Handle missing or invalid outputs gracefully
    • Use relative paths (prev, prev2) for task output access

Error Handling

The system provides several levels of error handling:

  1. Task Level

    • Tasks should catch and handle their own errors
    • Return appropriate TaskResult with error information
    • Import dependencies safely inside execute()
  2. Workflow Level

    • Stops execution on task failure
    • Provides error information in results
    • Maintains execution order integrity
  3. Output Access

    • Validates task outputs before access
    • Provides clear error messages for missing outputs
    • Handles relative path errors gracefully

Future Enhancements

  1. Planned Features

    • Parallel task execution
    • Task retry mechanisms
    • Workflow persistence
    • Enhanced monitoring and logging
    • Task timeout handling
  2. Potential Improvements

    • Web interface for workflow management
    • Task scheduling capabilities
    • Enhanced error recovery
    • Workflow templates
    • Plugin system for custom tasks

Contributing

  1. Development Setup

    • Clone the repository
    • Install dependencies
    • Follow coding standards
    • Write tests for new features
  2. Code Style

    • Follow PEP 8 guidelines
    • Use type hints
    • Document all public interfaces
    • Write clear commit messages

Installation

You can install OmniTask using pip:

pip install omniTask

Quick Start

import asyncio
from omniTask import Workflow, TaskRegistry

async def main():
    registry = TaskRegistry()
    workflow = Workflow("my_workflow", registry)
    
    # Register and use functions
    workflow.register_function(my_function)
    task = workflow.create_function_task("my_function", "task1")
    
    result = await workflow.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omnitask-0.1.1.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omnitask-0.1.1-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file omnitask-0.1.1.tar.gz.

File metadata

  • Download URL: omnitask-0.1.1.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for omnitask-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e143a404b5d3b3b4a166f1e5ba1ea0001fcd18f5e9c43467c5e49ec5bca0ef68
MD5 2f64bbe22effc3085829be85fea05c30
BLAKE2b-256 44dd65eb745b89384afe4187ff8bb3e8332ff18e021386a6ade81ba88f60c489

See more details on using hashes here.

Provenance

The following attestation bundles were made for omnitask-0.1.1.tar.gz:

Publisher: publish.yml on sarperavci/omniTask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file omnitask-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: omnitask-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for omnitask-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 06e43da9b344e1c62dd860b08a20ea357fbf65d7301901cd76a6aecb451db27a
MD5 e062fc786e7241fc14e62738207fd37c
BLAKE2b-256 cd058371486a6f06933c65900e9afce22c2a75fea878f96c7fa38c066c49ccf2

See more details on using hashes here.

Provenance

The following attestation bundles were made for omnitask-0.1.1-py3-none-any.whl:

Publisher: publish.yml on sarperavci/omniTask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page