Workflow scheduling with PostgreSQL
Project description
Fairchild
A PostgreSQL-backed job queue and simple workflow engine. Inspired by Oban and Faktory, among others.
Installation
pip install fairchild
Requires PostgreSQL 12+.
Quick Start
- Define a task:
# tasks.py
from fairchild import task, Record
@task(queue="default")
def send_email(to: str, subject: str, body: str):
# Your email sending logic here
print(f"Sending email to {to}: {subject}")
return Record({"sent": True})
- Set up the database and enqueue a job:
import asyncio
from fairchild import Fairchild
import tasks # Import to register tasks
async def main():
fairchild = Fairchild("postgresql://localhost/myapp")
await fairchild.connect()
# Create the jobs table
await fairchild.install()
# Enqueue a job
tasks.send_email.enqueue(
to="user@example.com",
subject="Hello",
body="Welcome to Fairchild!"
)
asyncio.run(main())
- Run a worker:
export FAIRCHILD_DATABASE_URL="postgresql://localhost/myapp"
fairchild worker --import tasks
Defining Tasks
Use the @task decorator to define a task:
from fairchild import task
@task(
queue="default", # Queue name (default: "default")
max_attempts=3, # Retry attempts on failure (default: 3)
priority=5, # 0-9, lower = higher priority (default: 5)
tags=["email"], # Tags for filtering/categorization
)
def my_task(arg1: str, arg2: int):
# Task logic here
pass
Returning Results
Use Record() to persist a task's result for use by downstream workflow jobs:
from fairchild import task, Record
@task()
def fetch_data(url: str):
data = requests.get(url).json()
return Record(data) # Stored in the database
Enqueuing Jobs
Basic Enqueue
my_task.enqueue(arg1="hello", arg2=42)
Schedule for Later
# Run in 30 minutes
my_task.enqueue_in(minutes=30, arg1="hello", arg2=42)
# Run at a specific time
from datetime import datetime, timezone
my_task.enqueue_at(
datetime(2024, 1, 15, 10, 0, tzinfo=timezone.utc),
arg1="hello",
arg2=42
)
With Options
my_task.enqueue(
arg1="hello",
arg2=42,
_priority=1, # Override default priority
_queue="high", # Override default queue
)
Dynamic Workflows
Fairchild uses a futures-based approach to workflows. Instead of explicitly declaring a DAG, you write natural Python code—call tasks, get futures, pass them around. Dependencies are inferred automatically.
How It Works
When a task calls another task from inside a worker:
- A child job is spawned (not executed immediately)
- A
Futureis returned representing the pending result - If you pass that
Futureto another task, a dependency is created - The downstream task won't run until the upstream job completes
Basic Example
from fairchild import task, Record
@task()
def fetch_data(url: str):
data = requests.get(url).json()
return Record(data)
@task()
def process(data: dict):
# Process the data
return Record({"processed": True})
@task()
def orchestrator():
# Calling a task returns a Future
data = fetch_data(url="https://api.example.com/data")
# Passing the Future creates a dependency
# process() won't run until fetch_data() completes
result = process(data=data)
return Record({"started": True})
Fan-Out / Fan-In (Map-Reduce)
The futures model makes parallel processing with aggregation natural:
@task()
def multiply(x: int, y: int):
return Record(x * y)
@task()
def sum_results(values: list):
return Record(sum(values))
@task()
def orchestrator(items: list[int]):
# Fan-out: spawn parallel tasks, collect futures
futures = []
for item in items:
future = multiply(x=item, y=2)
futures.append(future)
# Fan-in: pass all futures to aggregator
# sum_results won't run until ALL multiply jobs complete
total = sum_results(values=futures)
return Record({"spawned": len(items) + 1})
When orchestrator([1, 2, 3]) runs, it creates this DAG:
orchestrator
├── multiply(1, 2) ──┐
├── multiply(2, 2) ──┼── sum_results([...])
└── multiply(3, 2) ──┘
Nested Workflows
Since it's just function calls, workflows can be arbitrarily nested:
@task()
def process_batch(batch_id: int):
# This task can spawn its own sub-workflow
futures = [process_item(item_id=i) for i in get_items(batch_id)]
return aggregate(results=futures)
@task()
def run_all_batches():
# Top-level orchestrator spawns batch processors
futures = [process_batch(batch_id=i) for i in range(10)]
return final_summary(batch_results=futures)
Accessing Results
When a Future is passed to a downstream task, Fairchild automatically resolves it to the actual value before the task runs:
@task()
def fetch_price(symbol: str):
price = get_stock_price(symbol)
return Record({"symbol": symbol, "price": price})
@task()
def calculate_total(prices: list):
# By the time this runs, prices is a list of actual values,
# not Futures - Fairchild resolves them automatically
total = sum(p["price"] for p in prices)
return Record({"total": total})
@task()
def portfolio_value(symbols: list[str]):
futures = [fetch_price(symbol=s) for s in symbols]
return calculate_total(prices=futures)
Workers
CLI
# Basic worker
fairchild worker --import myapp.tasks
# Multiple queues with concurrency
fairchild worker --import myapp.tasks --queues default,high,low --concurrency 10
# Specific queues only
fairchild worker --import myapp.tasks --queues critical
Programmatic
from fairchild import Fairchild
from fairchild.worker import WorkerPool
async def main():
fairchild = Fairchild("postgresql://localhost/myapp")
await fairchild.connect()
pool = WorkerPool(
fairchild,
queues=["default", "high"],
concurrency=5,
)
await pool.start()
asyncio.run(main())
Web UI
Fairchild includes a web dashboard for monitoring jobs and workflows.
fairchild ui --import myapp.tasks --port 8080
Then open http://localhost:8080
The UI provides:
- Dashboard: Job stats, queues, recent jobs, jobs-per-minute chart
- Workflow view: DAG visualization, job states, timing
- Job details: Arguments, results, errors, timeline
Theming
The UI supports light and dark modes. It respects your system preference and includes a manual toggle.
HTTP API
The web UI also exposes a JSON API.
Enqueue a Job
POST /api/jobs
Content-Type: application/json
{
"task": "myapp.tasks.send_email",
"args": {
"to": "user@example.com",
"subject": "Hello"
},
"priority": 1,
"scheduled_at": "2024-01-15T10:00:00Z"
}
Response:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"task": "myapp.tasks.send_email",
"queue": "default",
"state": "available",
"scheduled_at": "2024-01-15T10:00:00+00:00"
}
Other Endpoints
GET /api/stats- Job counts by stateGET /api/jobs- List jobs (supports?state=and?queue=filters)GET /api/jobs/{id}- Job detailsGET /api/queues- Queue statisticsGET /api/workflows- List workflowsGET /api/workflows/{id}- Workflow details with all jobs
CLI Reference
fairchild install
Create the fairchild_jobs table:
fairchild install
fairchild migrate
Run pending migrations:
fairchild migrate
fairchild worker
Start a worker process:
fairchild worker [OPTIONS]
Options:
--import TEXT Python module(s) to import (registers tasks)
--queues TEXT Comma-separated queue names (default: all)
--concurrency INT Number of concurrent jobs (default: 10)
fairchild ui
Start the web UI:
fairchild ui [OPTIONS]
Options:
--import TEXT Python module(s) to import (registers tasks)
--host TEXT Host to bind (default: localhost)
--port INT Port to bind (default: 8080)
fairchild enqueue
Enqueue a job from the command line:
fairchild enqueue myapp.tasks.my_task --args '{"key": "value"}'
fairchild run
Run a task locally for testing (does not enqueue):
fairchild run [OPTIONS] TASK_NAME
Options:
-i, --import TEXT Python module(s) to import (registers tasks)
-a, --arg TEXT Task argument as key=value
Examples:
# Simple invocation
fairchild run -i myapp.tasks myapp.tasks.hello -a name=World
# Multiple arguments
fairchild run -i myapp.tasks myapp.tasks.add -a a=2 -a b=3
This runs the task function directly in the current process without involving the database or workers. Useful for testing and debugging—full tracebacks are printed on errors.
Testing
Running Tests Locally
- Create a test database:
createdb fairchild_test
- Run the tests with your development database URL - the tests will automatically use
_testinstead of_development:
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest
Or run specific test files:
# Unit tests only (no database required)
uv run pytest tests/test_task.py tests/test_job.py tests/test_record.py
# Integration tests (requires database)
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest tests/test_integration.py
# Web UI tests (requires database)
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest tests/test_web_ui.py
Note: Integration tests automatically convert _development to _test in the database URL to protect your development data.
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
FAIRCHILD_DATABASE_URL |
PostgreSQL connection URL | (required) |
Database URL Format
postgresql://user:password@host:port/database
Examples:
# Local development
export FAIRCHILD_DATABASE_URL="postgresql://localhost/myapp_development"
# With credentials
export FAIRCHILD_DATABASE_URL="postgresql://myuser:mypass@localhost/myapp"
# Remote server
export FAIRCHILD_DATABASE_URL="postgresql://user:pass@db.example.com:5432/myapp"
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fairchild-0.0.7.tar.gz.
File metadata
- Download URL: fairchild-0.0.7.tar.gz
- Upload date:
- Size: 51.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9a4cfc7e580256f0afe9ca8e2128de9f2deac61d86035ff38880266eab1f007b
|
|
| MD5 |
cb27ec5100f120662cb4dc636b518e43
|
|
| BLAKE2b-256 |
59b45338886978c6be4da4e52e64ccfa611e0d39cbe72bb79f9834efecbff792
|
File details
Details for the file fairchild-0.0.7-py3-none-any.whl.
File metadata
- Download URL: fairchild-0.0.7-py3-none-any.whl
- Upload date:
- Size: 49.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b30170ce20f1a87c675f0a18148bd722b70d1f74176748b6d2d9dd8ce95fff1
|
|
| MD5 |
8e4e0759b1541351fcee6741f873b691
|
|
| BLAKE2b-256 |
bf5b22eb098f2e52ae970323f8c1b4d0afbd7643210521db6be8ec61d691ccc9
|