Skip to main content

A powerful Python-based workflow automation tool

Project description

OmniTask Documentation

Overview

OmniTask is a powerful Python-based workflow automation tool that enables the creation and execution of dynamic task chains. It provides a flexible framework for building complex workflows with features like task dependencies, output chaining, dynamic task loading, and dynamic task groups.

Workflow Templates

OmniTask supports defining workflows using YAML or JSON templates, making it easier to create and maintain workflows without writing Python code.

Template Format

name: my_workflow

tasks:
  task1:
    type: task_type
    config:
      param1: value1
      param2: value2

  task2:
    type: another_task_type
    for_each: task1.output.items
    config_template:
      param3: $.value
    max_concurrent: 5

dependencies:
  task2:
    - task1

Using Templates

from omniTask.core.template import WorkflowTemplate
from omniTask.core.registry import TaskRegistry

registry = TaskRegistry()
registry.load_tasks_from_directory("tasks")

template = WorkflowTemplate("workflow.yaml")
workflow = template.create_workflow(registry)
result = await workflow.run()

Examples

Bug Bounty Workflow

A comprehensive example demonstrating dynamic task groups and parallel execution:

  • Subdomain discovery
  • URL status checking with parallel execution
  • Result analysis and aggregation

View Bug Bounty Example

Text Processing Workflow

A simple example demonstrating text file processing with multiple tasks:

  • File operations (read/write)
  • Text statistics counting
  • Text case conversion

View Text Processing Example

Core Components

Task System

The task system is built around the Task base class, which provides the foundation for all custom tasks.

Key Features:

  • Task Definition: Each task must define a unique task_name
  • Library Dependencies: Tasks can specify required Python packages
  • Configuration: Tasks accept configuration parameters
  • Output Handling: Tasks can access outputs from previous tasks
  • Relative Paths: Support for accessing previous task outputs using prev, prev2, etc.

Example Task Implementation:

from omniTask.core.task import Task
from omniTask.models.task_result import TaskResult

class CustomTask(Task):
    task_name = "custom_task"
    library_dependencies = {"required_package"}

    async def execute(self) -> TaskResult:
        import required_package
        
        prev_data = self.get_output("prev")
        
        result = {
            "processed_data": process_data(prev_data)
        }
        
        return TaskResult(success=True, output=result)

Dynamic Task Groups

OmniTask supports dynamic task groups that can create and execute multiple tasks based on input data.

Features:

  • For-Each Execution: Create tasks for each item in a list
  • Parallel Execution: Control concurrent task execution
  • Template Configuration: Use templates for task configuration
  • Output Aggregation: Combine results from multiple tasks

Example Task Group Configuration:

task_group:
  type: url_checker
  for_each: subdomain_scanner.subdomains
  config_template:
    url: $.url
    timeout: 5
  max_concurrent: 5

Task Registry

The TaskRegistry class handles task discovery and management.

Features:

  • Dynamic Task Loading: Loads tasks from local directories, files, or remote URLs
  • Dependency Management: Installs required Python packages for tasks
  • Task Creation: Creates task instances with proper configuration
  • Remote Task Loading: Supports loading tasks from HTTP/HTTPS sources

Example Registry Usage:

registry = TaskRegistry()
registry.load_tasks_from_source("tasks")  # Load from directory
registry.load_tasks_from_source("https://example.com/tasks/remote_task.py")  # Load from URL
registry.load_tasks_from_source("/path/to/local/task.py")  # Load from file

Workflow Management

The Workflow class manages task execution and dependency resolution.

Features:

  • Registry Integration: Built-in TaskRegistry for task management
  • Task Creation: Direct task creation through workflow
  • Dependency Management: Define task dependencies
  • Execution Order: Automatic determination of task execution order
  • Output Chaining: Automatic passing of task outputs to dependent tasks
  • Error Handling: Graceful handling of task failures
  • Dynamic Task Groups: Support for creating and managing dynamic task groups

Example Workflow Creation:

import asyncio
from omniTask.core.workflow import Workflow
from omniTask.core.registry import TaskRegistry

async def main():
    registry = TaskRegistry()
    registry.load_tasks_from_source("tasks")
    
    workflow = Workflow("my_workflow", registry)
    
    task1 = workflow.create_task("task1", "instance1", {"config": "value"})
    task2 = workflow.create_task("task2", "instance2")
    
    task2.add_dependency("instance1")
    
    result = await workflow.run()

Best Practices

  1. Task Design

    • Keep tasks focused and single-purpose
    • Import dependencies inside execute() method
    • Handle errors gracefully
    • Document task inputs and outputs
    • Use type hints for better code clarity
  2. Workflow Design

    • Use a single TaskRegistry instance
    • Create workflow with registry injection
    • Use workflow's create_task method
    • Plan task dependencies carefully
    • Use meaningful task names
    • Consider error handling and recovery
    • Monitor execution times
    • Use templates for complex workflows
    • Leverage dynamic task groups for parallel processing
  3. Configuration

    • Use configuration for flexible task behavior
    • Document configuration options
    • Provide sensible defaults
    • Use templates for consistent configuration
    • Configure max_concurrent for task groups appropriately
  4. Output Handling

    • Use consistent output formats
    • Include timestamps in outputs
    • Handle missing or invalid outputs gracefully
    • Use relative paths (prev, prev2) for task output access
    • Aggregate results from dynamic task groups effectively

Error Handling

The system provides several levels of error handling:

  1. Task Level

    • Tasks should catch and handle their own errors
    • Return appropriate TaskResult with error information
    • Import dependencies safely inside execute()
  2. Workflow Level

    • Stops execution on task failure
    • Provides error information in results
    • Maintains execution order integrity
    • Handles dynamic task group failures gracefully
  3. Output Access

    • Validates task outputs before access
    • Provides clear error messages for missing outputs
    • Handles relative path errors gracefully
    • Manages aggregated outputs from task groups
  4. Template Level

    • Validates template structure
    • Provides clear error messages for invalid templates
    • Handles missing or invalid task configurations
    • Validates dynamic task group configurations

Future Enhancements

  1. Planned Features

    • Parallel task execution
    • Dynamic Task Handling
    • Task retry mechanisms
    • Workflow persistence
    • Enhanced monitoring and logging
    • Task timeout handling
  2. Potential Improvements

    • Web interface for workflow management
    • Task scheduling capabilities
    • Enhanced error recovery
    • Workflow templates
    • Plugin system for custom tasks

Contributing

  1. Development Setup

    • Clone the repository
    • Install dependencies
    • Follow coding standards
    • Write tests for new features
  2. Code Style

    • Follow PEP 8 guidelines
    • Use type hints
    • Document all public interfaces
    • Write clear commit messages

Installation

You can install OmniTask using pip:

pip install omniTask

Quick Start

import asyncio
from omniTask import Workflow, TaskRegistry, WorkflowTemplate

async def main():
    registry = TaskRegistry()
    workflow = Workflow("my_workflow", registry)
    
    workflow.register_function(my_function)
    task = workflow.create_function_task("my_function", "task1")
    
    result = await workflow.run()

    template = WorkflowTemplate("workflow.yaml")
    workflow = template.create_workflow(registry)
    result = await workflow.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omnitask-0.4.0.tar.gz (16.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omnitask-0.4.0-py3-none-any.whl (16.9 kB view details)

Uploaded Python 3

File details

Details for the file omnitask-0.4.0.tar.gz.

File metadata

  • Download URL: omnitask-0.4.0.tar.gz
  • Upload date:
  • Size: 16.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for omnitask-0.4.0.tar.gz
Algorithm Hash digest
SHA256 4838f41390784fec882bae6b39283b30ca160cb3949b90553b3d859263c40478
MD5 82c6b76ea9aa96fdd3de624b1c86e372
BLAKE2b-256 f3dc8e5c0cd666a7e5841171a5d31079294a7f8e9fa24c85ad3c25be09beef62

See more details on using hashes here.

Provenance

The following attestation bundles were made for omnitask-0.4.0.tar.gz:

Publisher: publish.yml on sarperavci/omniTask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file omnitask-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: omnitask-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 16.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for omnitask-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6ea347166512f265618da972caa83d75802257dbae89eff14124331650416f13
MD5 6edb34b180658dd22c3afd0e8a46c084
BLAKE2b-256 576c79c6908df8811b98e78a153fff227693c5e3a2059e7d5dea60f37767d494

See more details on using hashes here.

Provenance

The following attestation bundles were made for omnitask-0.4.0-py3-none-any.whl:

Publisher: publish.yml on sarperavci/omniTask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page