Skip to main content

Distributed Python job queue with asyncio and redis

Project description

SAQ

SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis or postgres. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.

Documentation

It uses redis-py >= 4.2.

It is similar to RQ and heavily inspired by ARQ. Unlike RQ, it is async and thus significantly faster if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.

SAQ optionally comes with a simple UI for monitor workers and jobs.

SAQ Web UI

Install

# minimal install for redis
pip install saq[redis]

# minimal install for postgres
pip install saq[postgres]

# web + hiredis
pip install saq[web,hiredis]

Usage

usage: saq [-h] [--workers WORKERS] [--verbose] [--web]
           [--extra-web-settings EXTRA_WEB_SETTINGS]
           [--port PORT] [--check]
           settings

Start Simple Async Queue Worker

positional arguments:
  settings              Namespaced variable containing
                        worker settings eg: eg
                        module_a.settings

options:
  -h, --help            show this help message and exit
  --workers WORKERS     Number of worker processes
  --verbose, -v         Logging level: 0: ERROR, 1: INFO,
                        2: DEBUG
  --web                 Start web app. By default, this
                        only monitors the current
                        worker's queue. To monitor
                        multiple queues, see '--extra-
                        web-settings'
  --extra-web-settings EXTRA_WEB_SETTINGS, -e EXTRA_WEB_SETTINGS
                        Additional worker settings to
                        monitor in the web app
  --port PORT           Web app port, defaults to 8080
  --check               Perform a health check

environment variables:
  AUTH_USER     basic auth user, defaults to admin
  AUTH_PASSWORD basic auth password, if not specified, no auth will be used

Example

import asyncio

from saq import CronJob, Queue


class DBHelper:
    """Helper class for demo purposes"""

    async def disconnect(self):
        print("Disconnecting from the database")

    async def connect(self):
        print("Connectiong...")

    def __str__(self):
        return "Your DBHelper at work"

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def cron(ctx):
    print("i am a cron job")

async def startup(ctx):
    helper = DBHelper()
    await helper.connect()
    ctx["db"] = helper

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 10,
    "cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
}

To start the worker, assuming the previous is available in the python path

saq module.file.settings

Note: module.file.settings can also be a callable returning the settings dictionary.

To enqueue jobs

# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# run a job and return the result
print(await queue.apply("test", a=2))

# run a job with custom polling interval to check status more frequently
print(await queue.apply("test", a=2, poll_interval=0.1))

# Run multiple jobs concurrently and collect the results into a list
print(await queue.map("test", [{"a": 3}, {"a": 4}]))

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)

Demo

Start the worker

python -m saq examples.simple.settings --web

Navigate to the web ui

Enqueue jobs

python examples/simple.py

Comparison to ARQ

SAQ is heavily inspired by ARQ but has several enhancements.

  1. Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY
    1. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds
    2. SAQ is up to 8x faster than ARQ
  2. Web interface for monitoring queues and workers
  3. Heartbeat monitor for abandoned jobs
  4. More robust failure handling
    1. Storage of stack traces
    2. Sweeping stuck jobs
    3. Handling of cancelled jobs different from failed jobs (machine redeployments)
  5. Before and after job hooks
  6. Easily run multiple workers to leverage more cores

Development

python -m venv env
source env/bin/activate
pip install -e ".[dev,web]"
docker run -d -p 6379:6379 redis
docker run -d -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust postgres
make style test

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

saq-0.26.4.tar.gz (75.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

saq-0.26.4-py3-none-any.whl (63.2 kB view details)

Uploaded Python 3

File details

Details for the file saq-0.26.4.tar.gz.

File metadata

  • Download URL: saq-0.26.4.tar.gz
  • Upload date:
  • Size: 75.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.20

File hashes

Hashes for saq-0.26.4.tar.gz
Algorithm Hash digest
SHA256 e29a3c8b7df722c28f6b8db9566571fc0bbdb32d0fcccbaa3af9517f46a6011c
MD5 5c51fb4305a062d7ff10ecc241d28f0a
BLAKE2b-256 dab338c164b8766e6ac92e9faf961f4dd1103580227edbd95da18f08c07233de

See more details on using hashes here.

File details

Details for the file saq-0.26.4-py3-none-any.whl.

File metadata

  • Download URL: saq-0.26.4-py3-none-any.whl
  • Upload date:
  • Size: 63.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.20

File hashes

Hashes for saq-0.26.4-py3-none-any.whl
Algorithm Hash digest
SHA256 7a1f81ae2a95b6c34b170bc9b8cf84423052d9612f62cf0ec426c1fd27e35b2c
MD5 62bb2317cf917fb68f8856ee7c81748c
BLAKE2b-256 7e82cf178bee890a1b7e17950c778c752b7dc342c93f254bc3143a4375378a52

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page