Skip to main content

Distributed Python job queue with asyncio and redis

Project description

SAQ

SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.

It uses aioredis >= 2.0.

It is similar to RQ and heavily inspired by ARQ. Unlike RQ, it is async and thus significantly faster if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.

SAQ optionally comes with a simple UI for monitor workers and jobs.

SAQ Web UI

Install

# minimal install
pip install saq

# web + hiredis
pip install saq[web,hiredis]

Usage

usage: saq [-h] [--workers WORKERS] [--verbose] [--web] settings

Start Simple Async Queue Worker

positional arguments:
  settings           Namespaced variable containing worker settings eg: eg module_a.settings

options:
  -h, --help         show this help message and exit
  --workers WORKERS  Number of worker processes
  --verbose, -v      Logging level: 0: ERROR, 1: INFO, 2: DEBUG
  --web              Start web app

Example

import asyncio

from saq import Queue

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def startup(ctx):
    await ctx["db"] = create_db()

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 10,
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
}

To start the worker, assuming the previous is available in the python path

saq module.file.settings

To enqueue jobs

# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)

Demo

Start the worker

saq examples.simple.settings --web

Navigate to the web ui

Enqueue jobs

python examples/simple.py

Comparison to ARQ

SAQ is heavily inspired by ARQ but has several enhancements.

  1. Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY
    1. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds
    2. SAQ is up to 8x faster than ARQ
  2. Web interface for monitoring queues and workers
  3. Heartbeat monitor for abandoned jobs
  4. More robust failure handling
    1. Storage of stack traces
    2. Sweeping stuck jobs
    3. Handling of cancelled jobs different from failed jobs (machine redeployments)
  5. Before and after job hooks
  6. Easily run multiple workers to leverage more cores

Development

python -m venv env
source env/bin/activate
pip install -e .[dev,web]
docker run -p 6379:6379 redis
./run_checks.sh

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

saq-0.1.2.tar.gz (36.3 kB view details)

Uploaded Source

Built Distribution

saq-0.1.2-py3-none-any.whl (33.9 kB view details)

Uploaded Python 3

File details

Details for the file saq-0.1.2.tar.gz.

File metadata

  • Download URL: saq-0.1.2.tar.gz
  • Upload date:
  • Size: 36.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.1

File hashes

Hashes for saq-0.1.2.tar.gz
Algorithm Hash digest
SHA256 4c60aa7e6daf41adf15061c33ddbfd73e7eb700efe86de5a4de931edd7d910d8
MD5 ea5551c39eb1251291395b433ecb05d3
BLAKE2b-256 25ac5321aa1d5bff4c698f5c737b17422c59952b73c09f352464ac7a8fd43e2d

See more details on using hashes here.

Provenance

File details

Details for the file saq-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: saq-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 33.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.1

File hashes

Hashes for saq-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e0d5d8c2dcbf6c0ebb790fe4943c546328524c8a7a16876b1831b2d4b591f273
MD5 6d9533f7d058e75da7db7bcb185bd6d1
BLAKE2b-256 167e046b675f5434396dcf750d50048854278d7c47f259a303b3ca12192ff864

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page