Skip to main content

Hyrex is the open-source COLD task orchestration framework built on Postgres.

Project description

Hyrex Python SDK

Hyrex is a modern, open-source task orchestration framework built on PostgreSQL. It provides powerful features for distributed task processing, workflow management, and asynchronous job execution.

Features

  • Task Orchestration: Define and execute distributed tasks with automatic retries, timeouts, and error handling
  • Workflow Support: Build complex DAG-based workflows with dependencies
  • Queue Management: Route tasks to different queues with separate worker pools
  • Key-Value Store: Built-in distributed key-value storage for task coordination
  • Task Context: Rich execution context with task metadata and hierarchy tracking
  • Cron Scheduling: Schedule recurring tasks with cron expressions
  • Hyrex Studio: Web-based UI for monitoring and debugging (available at https://local.hyrex.studio)
  • Type Safety: Full type hints and Pydantic model validation

Installation

pip install hyrex

Quick Start

1. Initialize a New Project

Use the interactive hyrex init command to set up a new project:

hyrex init

This will guide you through:

  • Choosing a project name
  • Selecting between PostgreSQL (self-hosted) or Hyrex Cloud
  • Creating project files (.env, hyrex_app.py, tasks.py, requirements.txt, Dockerfile)

For manual database initialization (if needed):

export HYREX_DATABASE_URL="postgresql://user:password@localhost/dbname"
hyrex init-db

2. Project Structure

After running hyrex init, you'll have:

your-project/
├── .env                # Environment configuration
├── hyrex_app.py       # Hyrex app configuration
├── tasks.py           # Task definitions
├── requirements.txt   # Python dependencies
└── Dockerfile         # Container configuration

Example tasks.py:

from hyrex import HyrexRegistry
from pydantic import BaseModel

hy = HyrexRegistry()

class EmailContext(BaseModel):
    to: str
    subject: str
    body: str

@hy.task
def send_email(context: EmailContext):
    print(f"Sending email to {context.to}")
    # Your email logic here
    return {"sent": True}

3. Send Tasks

Queue tasks for execution:

# Send a task to the default queue
send_email.send(EmailContext(
    to="user@example.com",
    subject="Welcome!",
    body="Thanks for signing up"
))

# Send with custom configuration
send_email.with_config(
    queue="high-priority",
    max_retries=3,
    timeout_seconds=30
).send(EmailContext(...))

4. Run Workers

Start workers to process tasks:

hyrex run-worker hyrex_app:app

Core Features

Task Decorator

The @task decorator transforms functions into distributed tasks:

@hy.task(
    queue="processing",           # Target queue (str or HyrexQueue object)
    max_retries=3,               # Maximum retry attempts (default: 0)
    timeout_seconds=300,         # Task timeout in seconds
    priority=5,                  # Priority 1-10 (higher = more important)
    on_error=error_handler,      # Error callback function
    retry_backoff=lambda n: n*10 # Backoff strategy function
)
def process_data(context: ProcessContext):
    # Task implementation
    pass

Task Context

Access rich execution context within tasks:

from hyrex import get_hyrex_context

@hy.task
def contextual_task():
    context = get_hyrex_context()

    print(f"Task ID: {context.task_id}")
    print(f"Task Name: {context.task_name}")
    print(f"Attempt: {context.attempt_number}/{context.max_retries}")
    print(f"Queue: {context.queue}")
    print(f"Parent Task: {context.parent_id}")
    print(f"Root Task: {context.root_id}")

Key-Value Store

Use HyrexKV for distributed state management:

from hyrex import HyrexKV

@hy.task
def process_with_state(user_id: str):
    # Store state
    HyrexKV.set(f"user:{user_id}:status", "processing")

    # Retrieve state
    status = HyrexKV.get(f"user:{user_id}:status")

    # Delete state
    HyrexKV.delete(f"user:{user_id}:status")

Note: HyrexKV stores string values up to 1MB in size.

Workflows

Build complex DAG-based workflows:

@hy.task
def extract_data():
    return {"data": "extracted"}

@hy.task
def transform_data():
    return {"data": "transformed"}

@hy.task
def load_data():
    return {"data": "loaded"}

class ETLWorkflowArgs(BaseModel):
    source: str
    destination: str

@hy.workflow(
    queue="etl",
    timeout_seconds=3600,
    workflow_arg_schema=ETLWorkflowArgs
)
def etl_workflow():
    # Define workflow DAG
    extract_data >> transform_data >> load_data

    # Parallel execution
    extract_data >> [transform_data, validate_data] >> load_data

    # With custom config
    extract_data >> transform_data.with_config(queue="cpu-intensive") >> load_data

Send workflows:

etl_workflow.send(ETLWorkflowArgs(
    source="s3://input",
    destination="s3://output"
))

Access workflow context:

from hyrex import get_hyrex_workflow_context

@hy.task
def workflow_task():
    wf_context = get_hyrex_workflow_context()

    # Access workflow arguments
    args = wf_context.workflow_args

    # Access other task results
    extract_result = wf_context.durable_runs.get("extract_data")
    if extract_result:
        extract_result.refresh()  # Get latest status

Dynamic Task Configuration

Use with_config() to modify task behavior at runtime:

# Define base task
@hy.task(queue="default", max_retries=1)
def flexible_task(data: str):
    return process(data)

# Override configuration when sending
flexible_task.with_config(
    queue="high-priority",
    max_retries=5,
    timeout_seconds=60,
    priority=10
).send("important-data")

Cron Scheduling

Schedule recurring tasks:

@hy.task(cron="0 2 * * *")  # Daily at 2 AM
def daily_cleanup():
    # Cleanup logic
    pass

# Tasks with default arguments can also be scheduled
@hy.task(cron="0 0 * * 0")  # Weekly on Sunday
def weekly_backup(retention_days: int = 30):
    # Backup logic with configurable retention
    pass

@hy.workflow(cron="0 0 * * 0")  # Weekly on Sunday
def weekly_report():
    generate_report >> send_report

Note: Cron-scheduled tasks must have no arguments or all arguments must have default values.

Error Handling

Implement custom error handlers:

def handle_task_error(error: Exception):
    # Log error, send alerts, etc.
    print(f"Task failed: {error}")

@hy.task(
    on_error=handle_task_error,
    max_retries=3,
    retry_backoff=lambda attempt: 2 ** attempt  # Exponential backoff
)
def risky_task():
    # Task that might fail
    pass

CLI Commands

  • hyrex init - Interactive project initialization wizard
  • hyrex init-db - Initialize the database schema
  • hyrex run-worker <module:instance> - Start a worker process
  • hyrex studio - Start Hyrex Studio server

Monitoring with Hyrex Studio

Hyrex Studio provides a web interface for monitoring your tasks and workflows:

  1. Start the studio server:

    hyrex studio
    
  2. Open https://local.hyrex.studio in your browser

  3. Monitor task execution, view logs, and inspect your data

Configuration

Hyrex uses environment variables for configuration:

  • HYREX_DATABASE_URL - PostgreSQL connection string (required)
  • STUDIO_PORT - Port for Hyrex Studio (default: 1337)
  • STUDIO_VERBOSE - Enable verbose logging for Studio (default: false)

Advanced Usage

Registry Inheritance

Share task definitions across modules:

# common_tasks.py
common_registry = HyrexRegistry()

@common_registry.task
def shared_task():
    pass

# main.py
from common_tasks import common_registry

hy = HyrexRegistry()
hy.add_registry(common_registry)  # Include all tasks from common_registry

Task Composition

Build complex task hierarchies:

@hy.task
def parent_task(count: int):
    # Spawn child tasks
    for i in range(count):
        child_task.send(index=i)

    # Tasks maintain parent-child relationships
    # visible in context.parent_id and context.root_id

Idempotency

Ensure tasks run only once:

@hy.task
def idempotent_task(user_id: str):
    # Process user
    pass

# Using idempotency key
idempotent_task.with_config(
    idempotency_key=f"process-user-{user_id}"
).send(user_id="123")

Requirements

  • Python 3.11+
  • PostgreSQL 12+
  • Required Python packages are automatically installed with pip

License

Apache License 2.0

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hyrex-0.10.21.tar.gz (119.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hyrex-0.10.21-py3-none-any.whl (195.6 kB view details)

Uploaded Python 3

File details

Details for the file hyrex-0.10.21.tar.gz.

File metadata

  • Download URL: hyrex-0.10.21.tar.gz
  • Upload date:
  • Size: 119.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.3

File hashes

Hashes for hyrex-0.10.21.tar.gz
Algorithm Hash digest
SHA256 83267fb5941e45e3044a19e78aa32084a6ee83b07d975426e68efaa59839e44c
MD5 628a87035fe0c9bbc268c5d682fe7552
BLAKE2b-256 f83fa181efd9eb97df858e0d0a9289fb4a8b50b6a4b802b7604214ac4cfb6a52

See more details on using hashes here.

File details

Details for the file hyrex-0.10.21-py3-none-any.whl.

File metadata

  • Download URL: hyrex-0.10.21-py3-none-any.whl
  • Upload date:
  • Size: 195.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.3

File hashes

Hashes for hyrex-0.10.21-py3-none-any.whl
Algorithm Hash digest
SHA256 545f2517c92e61f257bec1a069ca400ec68273049b516e475d1e1519ad79a759
MD5 21bada0f84927ecf17dff18d8d140b62
BLAKE2b-256 74f1119598c76be7d461acfb0891d2416739c67ac8262b5d7c5a7de2bd3ba050

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page