Skip to main content

Workflow scheduling with PostgreSQL

Project description

Fairchild

A PostgreSQL-backed job queue and simple workflow engine. Inspired by Oban and Faktory, among others.

Installation

pip install fairchild

Requires PostgreSQL 12+.

Quick Start

  1. Define a task:
# tasks.py
from fairchild import task, Record

@task(queue="default")
def send_email(to: str, subject: str, body: str):
    # Your email sending logic here
    print(f"Sending email to {to}: {subject}")
    return Record({"sent": True})
  1. Set up the database and enqueue a job:
import asyncio
from fairchild import Fairchild
import tasks  # Import to register tasks

async def main():
    fairchild = Fairchild("postgresql://localhost/myapp")
    await fairchild.connect()
    
    # Create the jobs table
    await fairchild.install()
    
    # Enqueue a job
    tasks.send_email.enqueue(
        to="user@example.com",
        subject="Hello",
        body="Welcome to Fairchild!"
    )

asyncio.run(main())
  1. Run a worker:
export FAIRCHILD_DATABASE_URL="postgresql://localhost/myapp"
fairchild worker --import tasks

Defining Tasks

Use the @task decorator to define a task:

from fairchild import task

@task(
    queue="default",      # Queue name (default: "default")
    max_attempts=3,       # Retry attempts on failure (default: 3)
    priority=5,           # 0-9, lower = higher priority (default: 5)
    tags=["email"],       # Tags for filtering/categorization
)
def my_task(arg1: str, arg2: int):
    # Task logic here
    pass

Returning Results

Use Record() to persist a task's result for use by downstream workflow jobs:

from fairchild import task, Record

@task()
def fetch_data(url: str):
    data = requests.get(url).json()
    return Record(data)  # Stored in the database

Enqueuing Jobs

Basic Enqueue

my_task.enqueue(arg1="hello", arg2=42)

Schedule for Later

# Run in 30 minutes
my_task.enqueue_in(minutes=30, arg1="hello", arg2=42)

# Run at a specific time
from datetime import datetime, timezone
my_task.enqueue_at(
    datetime(2024, 1, 15, 10, 0, tzinfo=timezone.utc),
    arg1="hello",
    arg2=42
)

With Options

my_task.enqueue(
    arg1="hello",
    arg2=42,
    _priority=1,          # Override default priority
    _queue="high",        # Override default queue
)

Dynamic Workflows

Fairchild uses a futures-based approach to workflows. Instead of explicitly declaring a DAG, you write natural Python code—call tasks, get futures, pass them around. Dependencies are inferred automatically.

How It Works

When a task calls another task from inside a worker:

  1. A child job is spawned (not executed immediately)
  2. A Future is returned representing the pending result
  3. If you pass that Future to another task, a dependency is created
  4. The downstream task won't run until the upstream job completes

Basic Example

from fairchild import task, Record

@task()
def fetch_data(url: str):
    data = requests.get(url).json()
    return Record(data)

@task()
def process(data: dict):
    # Process the data
    return Record({"processed": True})

@task()
def orchestrator():
    # Calling a task returns a Future
    data = fetch_data(url="https://api.example.com/data")
    
    # Passing the Future creates a dependency
    # process() won't run until fetch_data() completes
    result = process(data=data)
    
    return Record({"started": True})

Fan-Out / Fan-In (Map-Reduce)

The futures model makes parallel processing with aggregation natural:

@task()
def multiply(x: int, y: int):
    return Record(x * y)

@task()
def sum_results(values: list):
    return Record(sum(values))

@task()
def orchestrator(items: list[int]):
    # Fan-out: spawn parallel tasks, collect futures
    futures = []
    for item in items:
        future = multiply(x=item, y=2)
        futures.append(future)
    
    # Fan-in: pass all futures to aggregator
    # sum_results won't run until ALL multiply jobs complete
    total = sum_results(values=futures)
    
    return Record({"spawned": len(items) + 1})

When orchestrator([1, 2, 3]) runs, it creates this DAG:

orchestrator
    ├── multiply(1, 2) ──┐
    ├── multiply(2, 2) ──┼── sum_results([...])
    └── multiply(3, 2) ──┘

Nested Workflows

Since it's just function calls, workflows can be arbitrarily nested:

@task()
def process_batch(batch_id: int):
    # This task can spawn its own sub-workflow
    futures = [process_item(item_id=i) for i in get_items(batch_id)]
    return aggregate(results=futures)

@task()
def run_all_batches():
    # Top-level orchestrator spawns batch processors
    futures = [process_batch(batch_id=i) for i in range(10)]
    return final_summary(batch_results=futures)

Accessing Results

When a Future is passed to a downstream task, Fairchild automatically resolves it to the actual value before the task runs:

@task()
def fetch_price(symbol: str):
    price = get_stock_price(symbol)
    return Record({"symbol": symbol, "price": price})

@task()
def calculate_total(prices: list):
    # By the time this runs, prices is a list of actual values,
    # not Futures - Fairchild resolves them automatically
    total = sum(p["price"] for p in prices)
    return Record({"total": total})

@task()
def portfolio_value(symbols: list[str]):
    futures = [fetch_price(symbol=s) for s in symbols]
    return calculate_total(prices=futures)

Workers

CLI

# Basic worker
fairchild worker --import myapp.tasks

# Multiple queues with concurrency
fairchild worker --import myapp.tasks --queues default,high,low --concurrency 10

# Specific queues only
fairchild worker --import myapp.tasks --queues critical

Programmatic

from fairchild import Fairchild
from fairchild.worker import WorkerPool

async def main():
    fairchild = Fairchild("postgresql://localhost/myapp")
    await fairchild.connect()
    
    pool = WorkerPool(
        fairchild,
        queues=["default", "high"],
        concurrency=5,
    )
    
    await pool.start()

asyncio.run(main())

Web UI

Fairchild includes a web dashboard for monitoring jobs and workflows.

fairchild ui --import myapp.tasks --port 8080

Then open http://localhost:8080

The UI provides:

  • Dashboard: Job stats, queues, recent jobs, jobs-per-minute chart
  • Workflow view: DAG visualization, job states, timing
  • Job details: Arguments, results, errors, timeline

Theming

The UI supports light and dark modes. It respects your system preference and includes a manual toggle.

HTTP API

The web UI also exposes a JSON API.

Enqueue a Job

POST /api/jobs
Content-Type: application/json

{
    "task": "myapp.tasks.send_email",
    "args": {
        "to": "user@example.com",
        "subject": "Hello"
    },
    "priority": 1,
    "scheduled_at": "2024-01-15T10:00:00Z"
}

Response:

{
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "task": "myapp.tasks.send_email",
    "queue": "default",
    "state": "available",
    "scheduled_at": "2024-01-15T10:00:00+00:00"
}

Other Endpoints

  • GET /api/stats - Job counts by state
  • GET /api/jobs - List jobs (supports ?state= and ?queue= filters)
  • GET /api/jobs/{id} - Job details
  • GET /api/queues - Queue statistics
  • GET /api/workflows - List workflows
  • GET /api/workflows/{id} - Workflow details with all jobs

CLI Reference

fairchild install

Create the fairchild_jobs table:

fairchild install

fairchild migrate

Run pending migrations:

fairchild migrate

fairchild worker

Start a worker process:

fairchild worker [OPTIONS]

Options:
  --import TEXT        Python module(s) to import (registers tasks)
  --queues TEXT        Comma-separated queue names (default: all)
  --concurrency INT    Number of concurrent jobs (default: 10)

fairchild ui

Start the web UI:

fairchild ui [OPTIONS]

Options:
  --import TEXT   Python module(s) to import (registers tasks)
  --host TEXT     Host to bind (default: localhost)
  --port INT      Port to bind (default: 8080)

fairchild enqueue

Enqueue a job from the command line:

fairchild enqueue myapp.tasks.my_task --args '{"key": "value"}'

fairchild run

Run a task locally for testing (does not enqueue):

fairchild run [OPTIONS] TASK_NAME

Options:
  -i, --import TEXT   Python module(s) to import (registers tasks)
  -a, --arg TEXT      Task argument as key=value

Examples:

# Simple invocation
fairchild run -i myapp.tasks myapp.tasks.hello -a name=World

# Multiple arguments
fairchild run -i myapp.tasks myapp.tasks.add -a a=2 -a b=3

This runs the task function directly in the current process without involving the database or workers. Useful for testing and debugging—full tracebacks are printed on errors.

Testing

Running Tests Locally

  1. Create a test database:
createdb fairchild_test
  1. Run the tests with your development database URL - the tests will automatically use _test instead of _development:
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest

Or run specific test files:

# Unit tests only (no database required)
uv run pytest tests/test_task.py tests/test_job.py tests/test_record.py

# Integration tests (requires database)
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest tests/test_integration.py

# Web UI tests (requires database)
FAIRCHILD_DATABASE_URL=postgres://postgres@localhost/fairchild_development uv run pytest tests/test_web_ui.py

Note: Integration tests automatically convert _development to _test in the database URL to protect your development data.

Configuration

Environment Variables

Variable Description Default
FAIRCHILD_DATABASE_URL PostgreSQL connection URL (required)

Database URL Format

postgresql://user:password@host:port/database

Examples:

# Local development
export FAIRCHILD_DATABASE_URL="postgresql://localhost/myapp_development"

# With credentials
export FAIRCHILD_DATABASE_URL="postgresql://myuser:mypass@localhost/myapp"

# Remote server
export FAIRCHILD_DATABASE_URL="postgresql://user:pass@db.example.com:5432/myapp"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fairchild-0.0.8.tar.gz (51.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fairchild-0.0.8-py3-none-any.whl (49.7 kB view details)

Uploaded Python 3

File details

Details for the file fairchild-0.0.8.tar.gz.

File metadata

  • Download URL: fairchild-0.0.8.tar.gz
  • Upload date:
  • Size: 51.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fairchild-0.0.8.tar.gz
Algorithm Hash digest
SHA256 c9e18d2be080b9f7e7f111e80e004dcd6ffd39a966839aa97871f0e0d9bf1560
MD5 4b702e842ecfe1dc1190c4bac9ba30d8
BLAKE2b-256 b476db4f807a1e2560e255a3a016f4f38361004578a083b9a3d8ffe391cc6a55

See more details on using hashes here.

File details

Details for the file fairchild-0.0.8-py3-none-any.whl.

File metadata

  • Download URL: fairchild-0.0.8-py3-none-any.whl
  • Upload date:
  • Size: 49.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fairchild-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 a3769aec4126a4ac90453a998f43866a7df13e04b9f86399eef0684b103c1eb3
MD5 82047008995db08acfaf70f082720a4e
BLAKE2b-256 56ece0d04f51a8812f30550a8aea81aa71f1e4137a5273e51f806071b8b4782c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page