Skip to main content

Workflow scheduling with PostgreSQL

Project description

Fairchild

A PostgreSQL-backed job queue and simple workflow engine. Inspired by Oban and Faktory, among others.

Installation

pip install fairchild

Requires PostgreSQL 12+.

Quick Start

  1. Define a task:
# tasks.py
from fairchild import task, Record

@task(queue="default")
def send_email(to: str, subject: str, body: str):
    # Your email sending logic here
    print(f"Sending email to {to}: {subject}")
    return Record({"sent": True})
  1. Set up the database and enqueue a job:
import asyncio
from fairchild import Fairchild
import tasks  # Import to register tasks

async def main():
    fairchild = Fairchild("postgresql://localhost/myapp")
    await fairchild.connect()
    
    # Create the jobs table
    await fairchild.install()
    
    # Enqueue a job
    tasks.send_email.enqueue(
        to="user@example.com",
        subject="Hello",
        body="Welcome to Fairchild!"
    )

asyncio.run(main())
  1. Run a worker:
export FAIRCHILD_DATABASE_URL="postgresql://localhost/myapp"
fairchild worker --import tasks

Defining Tasks

Use the @task decorator to define a task:

from fairchild import task

@task(
    queue="default",      # Queue name (default: "default")
    max_attempts=3,       # Retry attempts on failure (default: 3)
    priority=5,           # 0-9, lower = higher priority (default: 5)
    tags=["email"],       # Tags for filtering/categorization
)
def my_task(arg1: str, arg2: int):
    # Task logic here
    pass

Returning Results

Use Record() to persist a task's result for use by downstream workflow jobs:

from fairchild import task, Record

@task()
def fetch_data(url: str):
    data = requests.get(url).json()
    return Record(data)  # Stored in the database

Enqueuing Jobs

Basic Enqueue

my_task.enqueue(arg1="hello", arg2=42)

Schedule for Later

# Run in 30 minutes
my_task.enqueue_in(minutes=30, arg1="hello", arg2=42)

# Run at a specific time
from datetime import datetime, timezone
my_task.enqueue_at(
    datetime(2024, 1, 15, 10, 0, tzinfo=timezone.utc),
    arg1="hello",
    arg2=42
)

With Options

my_task.enqueue(
    arg1="hello",
    arg2=42,
    _priority=1,          # Override default priority
    _queue="high",        # Override default queue
)

Dynamic Workflows

Fairchild uses a futures-based approach to workflows. Instead of explicitly declaring a DAG, you write natural Python code—call tasks, get futures, pass them around. Dependencies are inferred automatically.

How It Works

When a task calls another task from inside a worker:

  1. A child job is spawned (not executed immediately)
  2. A Future is returned representing the pending result
  3. If you pass that Future to another task, a dependency is created
  4. The downstream task won't run until the upstream job completes

Basic Example

from fairchild import task, Record

@task()
def fetch_data(url: str):
    data = requests.get(url).json()
    return Record(data)

@task()
def process(data: dict):
    # Process the data
    return Record({"processed": True})

@task()
def orchestrator():
    # Calling a task returns a Future
    data = fetch_data(url="https://api.example.com/data")
    
    # Passing the Future creates a dependency
    # process() won't run until fetch_data() completes
    result = process(data=data)
    
    return Record({"started": True})

Fan-Out / Fan-In (Map-Reduce)

The futures model makes parallel processing with aggregation natural:

@task()
def multiply(x: int, y: int):
    return Record(x * y)

@task()
def sum_results(values: list):
    return Record(sum(values))

@task()
def orchestrator(items: list[int]):
    # Fan-out: spawn parallel tasks, collect futures
    futures = []
    for item in items:
        future = multiply(x=item, y=2)
        futures.append(future)
    
    # Fan-in: pass all futures to aggregator
    # sum_results won't run until ALL multiply jobs complete
    total = sum_results(values=futures)
    
    return Record({"spawned": len(items) + 1})

When orchestrator([1, 2, 3]) runs, it creates this DAG:

orchestrator
    ├── multiply(1, 2) ──┐
    ├── multiply(2, 2) ──┼── sum_results([...])
    └── multiply(3, 2) ──┘

Nested Workflows

Since it's just function calls, workflows can be arbitrarily nested:

@task()
def process_batch(batch_id: int):
    # This task can spawn its own sub-workflow
    futures = [process_item(item_id=i) for i in get_items(batch_id)]
    return aggregate(results=futures)

@task()
def run_all_batches():
    # Top-level orchestrator spawns batch processors
    futures = [process_batch(batch_id=i) for i in range(10)]
    return final_summary(batch_results=futures)

Accessing Results

When a Future is passed to a downstream task, Fairchild automatically resolves it to the actual value before the task runs:

@task()
def fetch_price(symbol: str):
    price = get_stock_price(symbol)
    return Record({"symbol": symbol, "price": price})

@task()
def calculate_total(prices: list):
    # By the time this runs, prices is a list of actual values,
    # not Futures - Fairchild resolves them automatically
    total = sum(p["price"] for p in prices)
    return Record({"total": total})

@task()
def portfolio_value(symbols: list[str]):
    futures = [fetch_price(symbol=s) for s in symbols]
    return calculate_total(prices=futures)

Workers

CLI

# Basic worker
fairchild worker --import myapp.tasks

# Multiple queues with concurrency
fairchild worker --import myapp.tasks --queues default,high,low --concurrency 10

# Specific queues only
fairchild worker --import myapp.tasks --queues critical

Programmatic

from fairchild import Fairchild
from fairchild.worker import WorkerPool

async def main():
    fairchild = Fairchild("postgresql://localhost/myapp")
    await fairchild.connect()
    
    pool = WorkerPool(
        fairchild,
        queues=["default", "high"],
        concurrency=5,
    )
    
    await pool.start()

asyncio.run(main())

Web UI

Fairchild includes a web dashboard for monitoring jobs and workflows.

fairchild ui --import myapp.tasks --port 8080

Then open http://localhost:8080

The UI provides:

  • Dashboard: Job stats, queues, recent jobs, jobs-per-minute chart
  • Workflow view: DAG visualization, job states, timing
  • Job details: Arguments, results, errors, timeline

Theming

The UI supports light and dark modes. It respects your system preference and includes a manual toggle.

HTTP API

The web UI also exposes a JSON API.

Enqueue a Job

POST /api/jobs
Content-Type: application/json

{
    "task": "myapp.tasks.send_email",
    "args": {
        "to": "user@example.com",
        "subject": "Hello"
    },
    "priority": 1,
    "scheduled_at": "2024-01-15T10:00:00Z"
}

Response:

{
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "task": "myapp.tasks.send_email",
    "queue": "default",
    "state": "available",
    "scheduled_at": "2024-01-15T10:00:00+00:00"
}

Other Endpoints

  • GET /api/stats - Job counts by state
  • GET /api/jobs - List jobs (supports ?state= and ?queue= filters)
  • GET /api/jobs/{id} - Job details
  • GET /api/queues - Queue statistics
  • GET /api/workflows - List workflows
  • GET /api/workflows/{id} - Workflow details with all jobs

CLI Reference

fairchild install

Create the fairchild_jobs table:

fairchild install

fairchild migrate

Run pending migrations:

fairchild migrate

fairchild worker

Start a worker process:

fairchild worker [OPTIONS]

Options:
  --import TEXT        Python module(s) to import (registers tasks)
  --queues TEXT        Comma-separated queue names (default: all)
  --concurrency INT    Number of concurrent jobs (default: 10)

fairchild ui

Start the web UI:

fairchild ui [OPTIONS]

Options:
  --import TEXT   Python module(s) to import (registers tasks)
  --host TEXT     Host to bind (default: localhost)
  --port INT      Port to bind (default: 8080)

fairchild enqueue

Enqueue a job from the command line:

fairchild enqueue myapp.tasks.my_task --args '{"key": "value"}'

fairchild run

Run a task locally for testing (does not enqueue):

fairchild run [OPTIONS] TASK_NAME

Options:
  -i, --import TEXT   Python module(s) to import (registers tasks)
  -a, --arg TEXT      Task argument as key=value

Examples:

# Simple invocation
fairchild run -i myapp.tasks myapp.tasks.hello -a name=World

# Multiple arguments
fairchild run -i myapp.tasks myapp.tasks.add -a a=2 -a b=3

This runs the task function directly in the current process without involving the database or workers. Useful for testing and debugging—full tracebacks are printed on errors.

Testing

Running Tests Locally

  1. Create a test database:
createdb fairchild_test
  1. Run the tests with your development database URL - the tests will automatically use _test instead of _development:
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest

Or run specific test files:

# Unit tests only (no database required)
uv run pytest tests/test_task.py tests/test_job.py tests/test_record.py

# Integration tests (requires database)
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest tests/test_integration.py

# Web UI tests (requires database)
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest tests/test_web_ui.py

Note: Integration tests automatically convert _development to _test in the database URL to protect your development data.

Configuration

Environment Variables

Variable Description Default
FAIRCHILD_DATABASE_URL PostgreSQL connection URL (required)

Database URL Format

postgresql://user:password@host:port/database

Examples:

# Local development
export FAIRCHILD_DATABASE_URL="postgresql://localhost/myapp_development"

# With credentials
export FAIRCHILD_DATABASE_URL="postgresql://myuser:mypass@localhost/myapp"

# Remote server
export FAIRCHILD_DATABASE_URL="postgresql://user:pass@db.example.com:5432/myapp"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fairchild-0.0.3.tar.gz (48.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fairchild-0.0.3-py3-none-any.whl (45.5 kB view details)

Uploaded Python 3

File details

Details for the file fairchild-0.0.3.tar.gz.

File metadata

  • Download URL: fairchild-0.0.3.tar.gz
  • Upload date:
  • Size: 48.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fairchild-0.0.3.tar.gz
Algorithm Hash digest
SHA256 8cd22a43e9a36c0989df5ef5489547b7abf4057b56b9c3b3b9e37bf5e3e33807
MD5 d46f5b2df4e55120300fbfcd74e21cdc
BLAKE2b-256 4156adc634f492627f8a6a22768051c85dbd03604c7e8cda3c6f7382ffb4bfba

See more details on using hashes here.

File details

Details for the file fairchild-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: fairchild-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 45.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fairchild-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5463cca66be483e2457674d28c0f923c90d4746759bcaca2172b451a60a743e8
MD5 21daf86ddcc0a1c9b6e58ab37b125de0
BLAKE2b-256 4cefcda4f038a5857d3efe387e41e61c90b74c69091c82e356ba28ee0559427d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page