Skip to main content

Postgresql job scheduling

Project description

Jobs

A PL/PGSQL based work queue (Publisher/Consumer), with a python asyncio/asyncpg api

alpha software

Features

  • Implements a two layer API:

    A postgresql layer: tasks can be published from PL/PGSQL functions, or procedures. Also can be extended using triggers.

    A python layer (or any client with a postgresql driver). The default implementations is based on asyncio python, using the awesome asyncpg driver.

  • It's compatible with postgrest. All procedures, and tables, are scoped on an owned postgresql schema, and can be exposed throught it, with postgrest

  • Retry logic, schedule_at or timeout, are implemented on the publish method. A task, can be published, with a max_retries, param, or an especific timeout.

  • Internally uses two tables jobs.job_queue the table where pending and running tasks are scheduled, and jobs.job the table where ended tasks, are moved (success or failures).

  • By default, tasks are retyried three times, with backoff.

  • Timeout jobs, are expired, tasks by default had a 60s tiemout.

  • Tasks can be scheduled on the future, just provide a scheduled_at param.

  • There are views to monitor queue stats: jobs.all (all tasks), jobs.expired and jobs.running

  • Tasks could also be priorized, provide a priority number, greater priority, precedence over other tasks

  • consumer_topic, allows to consume tasks with a * (topic.element.%)

  • rudimentary benchs on my laptop showed that it can handle 1000 tasks/second, but anyway it depends on your postgres instance.

  • instead of a worker daemon, tasks could also be consumed from a cronjob, or a regular python or a kubernetes job. (It could be used to parallelize k8 jobs)

tradeofs

  • All jobs had to be aknowledged positive or negative (ack/nack)

Use from postgresql

SELECT job_id FROM
    jobs.publish(
        i_task -- method or function to be executed,
        i_body::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
        i_scheduled_at: timestamp = null, -- when the task should run
        i_timeout:numeric(7,2) -- timeout in seconds for the job
        i_priority:integer = null -- gretare number more priority
    )

On the worker side:

SELECT * from jobs.consume(
    num: integer -- number of desired jobs
);

returns a list of jobs to be processed,

Or selective consume a topic:

SELECT * from jobs.consume_topic('topic.xxx.%', 10)

jobs are marked as processing, and should be acnlowledged with:

SELECT FROM jobs.ack(job_id);

or to return a failed job.

SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)

Also you can batch enqueue multiple jobs in a single request, using

SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);

where jobs.bulk_job is

create type jobs.bulk_job as (
    task varchar,
    body jsonb,
    scheduled_at timestamp,
    timeout integer,
    priority integer,
    max_retries integer
);

Use from python

On this side, implementing a worker, should be something like

    db = await asyncpg.connect(dsn)
    while True:
        jobs = await jobs.consume(db, 1)
        for job in jobs:
            try:
                await jobs.run(db, job["job_id"])
                await jobs.ack(job["job_id"])
            except Exception as e:
                await jobs.nack(job["job_id"], str(e))
        await asyncio.sleep(1)

On the publisher side, jobs could be enqueued from between a postgresql transaction:

db = await asyncpg.connect(dsn)
async with db.transaction():
    # do whatever is needed,
    # queue a task
    await jobs.publish("package.file.sum", args=[1,2])

Installing the package

pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db

This will create the schema on the `jobs` postgresql schema

To run the worker,

jobs-worker postgresql://dsn

At the moment there are no too much things implemented there, just a single threaded worker, that needs a bit more of love :) If your application resides on a python package, tasks like yourpackage.file.method will be runnable as is.

Observavility and monitor

With psql, or exposing them throught postgresql_exporter

TODO

  • <input type="checkbox" disabled="" />

    connect notifications, using pg_notify, when tasks are queued, are picked, are completed. With this in place, it's easy enought to write o WS to send notifications to connected customers.

  • <input type="checkbox" disabled="" />

    improve the worker to run every job on an asyncio task

  • <input type="checkbox" disabled="" />

    handle better exceptions on the python side

  • <input type="checkbox" checked="" disabled="" />

    fix requirements file

  • <input type="checkbox" disabled="" />

    add github actions to run CI

  • <input type="checkbox" disabled="" />

    write better docs and some examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for pgjobs, version 0.2.1
Filename, size File type Python version Upload date Hashes
Filename, size pgjobs-0.2.1-py3-none-any.whl (13.5 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size pgjobs-0.2.1.tar.gz (12.1 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page