Hyrex is the open-source COLD task orchestration framework built on Postgres.
Project description
Hyrex Python SDK
Hyrex is a modern, open-source task orchestration framework built on PostgreSQL. It provides powerful features for distributed task processing, workflow management, and asynchronous job execution.
Features
- Task Orchestration: Define and execute distributed tasks with automatic retries, timeouts, and error handling
- Workflow Support: Build complex DAG-based workflows with dependencies
- Queue Management: Route tasks to different queues with separate worker pools
- Key-Value Store: Built-in distributed key-value storage for task coordination
- Task Context: Rich execution context with task metadata and hierarchy tracking
- Cron Scheduling: Schedule recurring tasks with cron expressions
- Hyrex Studio: Web-based UI for monitoring and debugging (available at https://local.hyrex.studio)
- Type Safety: Full type hints and Pydantic model validation
Installation
pip install hyrex
Quick Start
1. Initialize a New Project
Use the interactive hyrex init command to set up a new project:
hyrex init
This will guide you through:
- Choosing a project name
- Selecting between PostgreSQL (self-hosted) or Hyrex Cloud
- Creating project files (
.env,hyrex_app.py,tasks.py,requirements.txt,Dockerfile)
For manual database initialization (if needed):
export HYREX_DATABASE_URL="postgresql://user:password@localhost/dbname"
hyrex init-db
2. Project Structure
After running hyrex init, you'll have:
your-project/
├── .env # Environment configuration
├── hyrex_app.py # Hyrex app configuration
├── tasks.py # Task definitions
├── requirements.txt # Python dependencies
└── Dockerfile # Container configuration
Example tasks.py:
from hyrex import HyrexRegistry
from pydantic import BaseModel
hy = HyrexRegistry()
class EmailContext(BaseModel):
to: str
subject: str
body: str
@hy.task
def send_email(context: EmailContext):
print(f"Sending email to {context.to}")
# Your email logic here
return {"sent": True}
3. Send Tasks
Queue tasks for execution:
# Send a task to the default queue
send_email.send(EmailContext(
to="user@example.com",
subject="Welcome!",
body="Thanks for signing up"
))
# Send with custom configuration
send_email.with_config(
queue="high-priority",
max_retries=3,
timeout_seconds=30
).send(EmailContext(...))
4. Run Workers
Start workers to process tasks:
hyrex run-worker hyrex_app:app
Core Features
Task Decorator
The @task decorator transforms functions into distributed tasks:
@hy.task(
queue="processing", # Target queue (str or HyrexQueue object)
max_retries=3, # Maximum retry attempts (default: 0)
timeout_seconds=300, # Task timeout in seconds
priority=5, # Priority 1-10 (higher = more important)
on_error=error_handler, # Error callback function
retry_backoff=lambda n: n*10 # Backoff strategy function
)
def process_data(context: ProcessContext):
# Task implementation
pass
Task Context
Access rich execution context within tasks:
from hyrex import get_hyrex_context
@hy.task
def contextual_task():
context = get_hyrex_context()
print(f"Task ID: {context.task_id}")
print(f"Task Name: {context.task_name}")
print(f"Attempt: {context.attempt_number}/{context.max_retries}")
print(f"Queue: {context.queue}")
print(f"Parent Task: {context.parent_id}")
print(f"Root Task: {context.root_id}")
Key-Value Store
Use HyrexKV for distributed state management:
from hyrex import HyrexKV
@hy.task
def process_with_state(user_id: str):
# Store state
HyrexKV.set(f"user:{user_id}:status", "processing")
# Retrieve state
status = HyrexKV.get(f"user:{user_id}:status")
# Delete state
HyrexKV.delete(f"user:{user_id}:status")
Note: HyrexKV stores string values up to 1MB in size.
Workflows
Build complex DAG-based workflows:
@hy.task
def extract_data():
return {"data": "extracted"}
@hy.task
def transform_data():
return {"data": "transformed"}
@hy.task
def load_data():
return {"data": "loaded"}
class ETLWorkflowArgs(BaseModel):
source: str
destination: str
@hy.workflow(
queue="etl",
timeout_seconds=3600,
workflow_arg_schema=ETLWorkflowArgs
)
def etl_workflow():
# Define workflow DAG
extract_data >> transform_data >> load_data
# Parallel execution
extract_data >> [transform_data, validate_data] >> load_data
# With custom config
extract_data >> transform_data.with_config(queue="cpu-intensive") >> load_data
Send workflows:
etl_workflow.send(ETLWorkflowArgs(
source="s3://input",
destination="s3://output"
))
Access workflow context:
from hyrex import get_hyrex_workflow_context
@hy.task
def workflow_task():
wf_context = get_hyrex_workflow_context()
# Access workflow arguments
args = wf_context.workflow_args
# Access other task results
extract_result = wf_context.durable_runs.get("extract_data")
if extract_result:
extract_result.refresh() # Get latest status
Dynamic Task Configuration
Use with_config() to modify task behavior at runtime:
# Define base task
@hy.task(queue="default", max_retries=1)
def flexible_task(data: str):
return process(data)
# Override configuration when sending
flexible_task.with_config(
queue="high-priority",
max_retries=5,
timeout_seconds=60,
priority=10
).send("important-data")
Cron Scheduling
Schedule recurring tasks:
@hy.task(cron="0 2 * * *") # Daily at 2 AM
def daily_cleanup():
# Cleanup logic
pass
# Tasks with default arguments can also be scheduled
@hy.task(cron="0 0 * * 0") # Weekly on Sunday
def weekly_backup(retention_days: int = 30):
# Backup logic with configurable retention
pass
@hy.workflow(cron="0 0 * * 0") # Weekly on Sunday
def weekly_report():
generate_report >> send_report
Note: Cron-scheduled tasks must have no arguments or all arguments must have default values.
Error Handling
Implement custom error handlers:
def handle_task_error(error: Exception):
# Log error, send alerts, etc.
print(f"Task failed: {error}")
@hy.task(
on_error=handle_task_error,
max_retries=3,
retry_backoff=lambda attempt: 2 ** attempt # Exponential backoff
)
def risky_task():
# Task that might fail
pass
CLI Commands
hyrex init- Interactive project initialization wizardhyrex init-db- Initialize the database schemahyrex run-worker <module:instance>- Start a worker processhyrex studio- Start Hyrex Studio server
Monitoring with Hyrex Studio
Hyrex Studio provides a web interface for monitoring your tasks and workflows:
-
Start the studio server:
hyrex studio -
Open https://local.hyrex.studio in your browser
-
Monitor task execution, view logs, and inspect your data
Configuration
Hyrex uses environment variables for configuration:
HYREX_DATABASE_URL- PostgreSQL connection string (required)STUDIO_PORT- Port for Hyrex Studio (default: 1337)STUDIO_VERBOSE- Enable verbose logging for Studio (default: false)
Advanced Usage
Registry Inheritance
Share task definitions across modules:
# common_tasks.py
common_registry = HyrexRegistry()
@common_registry.task
def shared_task():
pass
# main.py
from common_tasks import common_registry
hy = HyrexRegistry()
hy.add_registry(common_registry) # Include all tasks from common_registry
Task Composition
Build complex task hierarchies:
@hy.task
def parent_task(count: int):
# Spawn child tasks
for i in range(count):
child_task.send(index=i)
# Tasks maintain parent-child relationships
# visible in context.parent_id and context.root_id
Idempotency
Ensure tasks run only once:
@hy.task
def idempotent_task(user_id: str):
# Process user
pass
# Using idempotency key
idempotent_task.with_config(
idempotency_key=f"process-user-{user_id}"
).send(user_id="123")
Requirements
- Python 3.11+
- PostgreSQL 12+
- Required Python packages are automatically installed with pip
License
Apache License 2.0
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hyrex-0.10.21.tar.gz.
File metadata
- Download URL: hyrex-0.10.21.tar.gz
- Upload date:
- Size: 119.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
83267fb5941e45e3044a19e78aa32084a6ee83b07d975426e68efaa59839e44c
|
|
| MD5 |
628a87035fe0c9bbc268c5d682fe7552
|
|
| BLAKE2b-256 |
f83fa181efd9eb97df858e0d0a9289fb4a8b50b6a4b802b7604214ac4cfb6a52
|
File details
Details for the file hyrex-0.10.21-py3-none-any.whl.
File metadata
- Download URL: hyrex-0.10.21-py3-none-any.whl
- Upload date:
- Size: 195.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
545f2517c92e61f257bec1a069ca400ec68273049b516e475d1e1519ad79a759
|
|
| MD5 |
21bada0f84927ecf17dff18d8d140b62
|
|
| BLAKE2b-256 |
74f1119598c76be7d461acfb0891d2416739c67ac8262b5d7c5a7de2bd3ba050
|