Skip to main content

A simple Celery-like database-backed task queue for Django

Project description

A short, simple, boring and reliable Celery replacement for my Django projects.

  • Database is the only backend. The task is a simple Django model, it uses the same transaction and connection as other models. No more transaction.on_commit hooks to schedule tasks.

  • ETA (estimated time of arrival) is a first-class citizen. It does not depend on whether backends support the feature or not.

  • Django admin for monitoring. You can view pending tasks, future, failed, and “dirty” (crashed in the middle of work).

  • Dead letter queue built in. You get access to failed tasks and can retry them from Django admin.

  • Tasks do not produce any results, there is no get or join and there is no “result backend”. This is not a distrubuted await. If you need to store results, pass a unique key into the task and store the result in some DIY model.

  • No workflows and chaining of jobs.

  • The worker is a single-threaded process, you start several of those to scale. I have seen too many issues with autoscaling workers, worker processes killed by OS, workers stuck: simple is better.

  • No prefetching or visibility timeouts. The worker picks the first available task and processes it.

  • Easy to get the metrics from Django shell and export to your favorite monitoring tool

  • Task records are removed after successful execution. Unlike Celery SQLAlchemy’s backend, records are removed so you don’t have to care about archiving. It also keeps the table small, properly indexed and efficient.

  • Failed tasks are kept in the database. A human decision is required to remove them. If not, wrap your task in “catch all exceptions” block.

Celery API

The main API is the Celery API (shared_task) with delay, apply_async and s. Just to make switching between implementations easier.

from django_taskq.celery import shared_task

@shared_task(autoretry_for(MyException,), retry_kwargs={"max_retries": 10, "countdown": 5})
def my_task(foo, bar=None):
    ...
my_task.delay(1,bar=2)
my_task.appy_async((1,2))
my_task.s(1,2).apply_async()

NOT Celery API

This task queue is unintrusive, all you get is the execution of a function. How you organize the code after that is up to you. There are no Celery bound tasks and task inheritance, naming, task requests, special logging, etc. You get the idea.

Some smaller decisions:

  • Retry can’t change the args/kwargs. That is not a retry but a new task.

Admin page

Django admin page shows tasks in following groups:

  • Failed tasks – Tasks that failed after retries and countdowns. You should inspect them and remove by hand or with a script. You can execute them again as well.

  • Dirty tasks – Tasks that got started but failed without reaching a final state due to killed prcesses or crashing machines. Review then and either delete or execute again.

  • Active tasks – Tasks being executed right now. You might catch some longer-running tasks here

  • Pending tasks – Tasks that should be executed now but are not due to lack of available workers. You might start some extra ones to catch up.

  • Future tasks – Tasks scheduled to be executed in the future.

Internals

Adding a new task to the queue is just creating a new instance of the Task model.

Executing a task is a bit more expensive:

  1. A task is picked up from a queue and the state is updated to “started” within a single transaction.

  2. Python code is executed, a background thread updates “alive at” field every second (“a liveness probe”).

  3. Successful tasks are deleted from the table. Failed tasks are marked as such and retried (based on configuration).

This is a bit more expensive than necessary but:

  • we can recognize running tasks - the task is “started” and the record is updated in the last couple seconds.

  • we can recognize “dirty” tasks that got killed or lost database connection in the middle - the task is “started” and the record has not been updated for a while.

In an ideal world tasks should be idempotent but things happen and I prefer to know which tasks crashed and double-check if some cleanup is necessary.

Performance

A single process can execute around 150 dummy tasks per second which is more than enough. After years of struggling with Celery, correctness and observability are more important. On the other hand, to handle more “tasks” you probably want to store many events not tasks and have a single task that processes them in batches.

Recipes

Exactly once, at most once, at least once, idempotency:

Implementing these semantics presents too many design questions to answer on the task level. Instead, treat the tasks as function calls that are decoupled in time. We do not enforce these sematics on functions, we write code inside functions to perform the necessary checks.

Within the task do this:

  1. Lock the application model

  2. Check that all conditions still apply

  3. Perform the action

Storing results:

Instead of the task storing it’s results and returning that to the caller or trigerring another task to process it either:

  • Store the result directly in the target application model

  • Call a function or another task to process the result explicitly

Scheduling tasks:

Call a Python script from the Unix crontab. Use Kubernetes CronJobs.

Do that every minute and check conditions in the code: maybe instead of UTC clock you have to follow the business day calendar or multiple time zones.

Scaling workers:

Start multiple Docker containers, start multiple Kubernetes pods/scale deployment. Or use something like supervisord to start multiple processes.

Boosting performance:

Instead of executing thousands of tasks (function calls with specific arguments) consider recording thousands of events (domain-specific model) and executing a task once in a while that processes all available events in bulk.

Or do not record any events, just schedule a task that queries models matching certain criteria and doing processing for all of them.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_taskq-1.0.2.6.tar.gz (14.7 kB view details)

Uploaded Source

File details

Details for the file django_taskq-1.0.2.6.tar.gz.

File metadata

  • Download URL: django_taskq-1.0.2.6.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for django_taskq-1.0.2.6.tar.gz
Algorithm Hash digest
SHA256 af57328e22a6987595f1219dccb8133b0f791e2a50131f6f13ac128e091507a6
MD5 723bf61585a7d550ff88e4684cb4dd8e
BLAKE2b-256 4a224230703e06964ece7d4675af5fa3cb3ff1b9947ef2d2d55ea4fa679a7fff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page