Skip to main content

Scheduled tasks and background jobs for Datasette

Project description

datasette-cron

PyPI Changelog Tests License

Database-backed scheduled tasks for Datasette.

Plugins can register handler functions, then create tasks that run on a schedule. Tasks persist across restarts, support cron expressions and intervals, and record execution history.

Installation

pip install datasette-cron

Quick Start

A plugin registers a handler function and creates a task that runs on a schedule:

from datasette import hookimpl

@hookimpl
def cron_register_handlers(datasette):
    async def my_handler(datasette, config):
        db = datasette.get_database(config["database"])
        await db.execute_write("INSERT INTO log (message) VALUES ('tick')")

    return {"my-handler": my_handler}

@hookimpl
def startup(datasette):
    async def inner():
        scheduler = datasette._cron_scheduler
        await scheduler.add_task(
            name="log-every-minute",
            handler="myplugin:my-handler",
            schedule={"interval": 60},
            config={"database": "mydb"},
        )
    return inner

How It Works

  1. Startup: datasette-cron creates a Scheduler at datasette._cron_scheduler and collects handlers from all plugins via the cron_register_handlers hook
  2. First request: The scheduler loop starts (via asgi_wrapper), ticking every ~1 second
  3. Each tick: Queries datasette_cron_tasks for tasks where next_run_at <= now and enabled = 1
  4. Execution: Looks up the handler function, calls it with (datasette, config), records the result in datasette_cron_runs
  5. Next run: Advances next_run_at based on the schedule

Plugin Hook

cron_register_handlers(datasette)

Return a dict mapping handler names to callable functions:

@hookimpl
def cron_register_handlers(datasette):
    return {
        "check-feeds": check_feeds_handler,
        "cleanup": cleanup_handler,
    }

Handlers are registered with a plugin-derived prefix. If your plugin module is datasette_myplugin, handlers are accessible as myplugin:check-feeds and myplugin:cleanup.

Handler Signature

async def my_handler(datasette, config):
    """
    datasette: the Datasette instance
    config: dict from the task's config field
    """
    pass

Handlers can be sync or async.

Scheduler API

Access the scheduler via datasette._cron_scheduler after startup.

add_task()

Create or update a task (idempotent upsert). If the task already exists, next_run_at is preserved.

await scheduler.add_task(
    name="my-task",
    handler="myplugin:my-handler",
    schedule={"interval": 300},          # every 5 minutes
    config={"key": "value"},             # passed to handler
    timezone="America/New_York",         # optional
    overlap="skip",                      # "skip" prevents overlapping runs
    retry={"max_retries": 3, "backoff": "exponential"},
)

Schedule Types

Interval (seconds):

schedule={"interval": 60}        # every 60 seconds
schedule={"interval": 1}         # every second

Cron expression:

schedule="0 8 * * *"             # daily at 8am
schedule="*/5 * * * *"           # every 5 minutes

RFC 5545 RRULE:

schedule={"rrule": "FREQ=WEEKLY;BYDAY=MO"}

Other Methods

await scheduler.remove_task("my-task")
await scheduler.trigger_task("my-task")       # run immediately
await scheduler.enable_task("my-task")
await scheduler.disable_task("my-task")
await scheduler.update_task("my-task", schedule={"interval": 10})

Data Models

Query results from InternalDB return typed dataclasses:

from datasette_cron.models import CronTask, CronRun

task: CronTask = await scheduler.internal_db.get_task("my-task")
print(task.name, task.handler, task.next_run_at, task.last_status)

runs: list[CronRun] = await scheduler.internal_db.get_runs("my-task")
for run in runs:
    print(run.started_at, run.status, run.duration_ms)

CronTask

Field Type Description
name str Unique task identifier
handler str Handler reference (e.g., "myplugin:my-handler")
config dict JSON config passed to handler
schedule_type str "interval", "cron", or "rrule"
schedule_config str JSON schedule parameters
timezone str | None IANA timezone
overlap_policy str "skip" or "allow"
retry_max int Max retry attempts
retry_backoff str "exponential" or "linear"
enabled bool Whether task is active
next_run_at str | None ISO timestamp of next scheduled run
last_run_at str | None ISO timestamp of last run
last_status str | None "success" or "error"

CronRun

Field Type Description
id int Auto-increment ID
task_name str Which task this run belongs to
started_at str ISO timestamp
finished_at str | None ISO timestamp
status str "running", "success", or "error"
error_message str | None Error details on failure
attempt int Retry attempt number
duration_ms int | None Execution time in milliseconds

REST API

Method Endpoint Description
GET /-/api/cron/tasks List all tasks
GET /-/api/cron/tasks/{name} Task detail
GET /-/api/cron/tasks/{name}/runs Run history
POST /-/api/cron/tasks/{name}/trigger Trigger immediate run
POST /-/api/cron/tasks/{name}/enable Enable/disable task

All endpoints require the datasette-cron-access permission.

Database Tables

Stored in Datasette's internal database:

datasette_cron_tasks — task definitions and scheduling state

datasette_cron_runs — execution history with timing, status, and errors

Development

just dev           # start dev server
just test          # run tests
just format        # format code (backend + frontend)
just check         # lint + type check (backend + frontend)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datasette_cron-0.0.1a1.tar.gz (52.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datasette_cron-0.0.1a1-py3-none-any.whl (62.2 kB view details)

Uploaded Python 3

File details

Details for the file datasette_cron-0.0.1a1.tar.gz.

File metadata

  • Download URL: datasette_cron-0.0.1a1.tar.gz
  • Upload date:
  • Size: 52.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for datasette_cron-0.0.1a1.tar.gz
Algorithm Hash digest
SHA256 8a44021183e15f81ad9d4431f2b26cb1835a291982ee3e3eb1f8251f34b7d78e
MD5 182e7c1c9fc55879e71c56845a1db2a3
BLAKE2b-256 a850399b747a3332bfdbc1faebbba50e8b6d2552819b46ce0a90d0a0c99ad45f

See more details on using hashes here.

Provenance

The following attestation bundles were made for datasette_cron-0.0.1a1.tar.gz:

Publisher: publish.yml on datasette/datasette-cron

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file datasette_cron-0.0.1a1-py3-none-any.whl.

File metadata

File hashes

Hashes for datasette_cron-0.0.1a1-py3-none-any.whl
Algorithm Hash digest
SHA256 5b017f584caf0bc00f0eeaf7290cce0ea51b689ef13d0571ca1023d23664cc7d
MD5 e40e8735948a760e2678c361ae7076e0
BLAKE2b-256 3575eb381f3c66e631f045749fcc607a3f9e3586d2578fed5889779eb0516f36

See more details on using hashes here.

Provenance

The following attestation bundles were made for datasette_cron-0.0.1a1-py3-none-any.whl:

Publisher: publish.yml on datasette/datasette-cron

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page