Skip to main content
This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (
Help us improve Python packaging - Donate today!

Schedules celery tasks to run in the potentially far future

Project Description

Schedules celery tasks to run in the potentially far future, using a separate storage backend (currently only redis is supported) in combination with a cronjob.


  • Configure the storage by adding a setting like longterm_scheduler_backend = 'redis://localhost:6739/1' to your celery configuration. (The storage also respects the built-in celery configuration settings redis_socket_timeout, redis_socket_connect_timeout and redis_max_connections.)
  • Configure your celery app to use a customized task class MYCELERY = celery.Celery(task_cls=celery_longterm_scheduler.Task)
  • Set up a cronjob to run celery longterm_scheduler (e.g. every 5 minutes)
  • Now you can schedule your tasks by calling mytask.apply_async(args, kwargs, eta=datetime) as normal. This returns a normal AsyncResult object, but only reading the .id is supported; any other methods or properties may fail explictly or implicitly.
  • You can completely delete a scheduled job by calling celery_longterm_scheduler.get_scheduler(MYCELERY).revoke('mytaskid') (we cannot hook into the celery built-in AsyncResult.revoke(), unfortunately). revoke() returns True on success and False if the given task cannot be found in the storage backend (e.g. because it has already come due and been executed).

Instead of sending a normal job to the celery broker (with added timing information), this creates a job entry in the scheduler storage backend. The cronjob then periodically checks the storage for any jobs that are due, and only then sends a normal celery job to the broker.


Why not use the celery built-in apply_async(eta=)? Because you cannot ever really delete a pending job. AsyncResult('mytaskid').revoke() can only add the task ID to the statedb, where it has to stay _forever_ so the job is recognized as revoked. For jobs that are scheduled to run in 6 months time or later, this would create an unmanageable, ever-growing statedb.

Why not use celerybeat? Because it is built for periodic jobs, and we need single-shot jobs. And then there’s not much to gain from the celerybeat implementation, especially since we want to use redis as storage (since we’re already using that as broker and result backend).


Redis schema

celery_longterm_scheduler assumes that it talks to a dedicated redis database. It creates an entry per scheduled job using SET jobid job-configuration (job-configuration is serialized with JSON) and uses a single sorted set named scheduled_task_id_by_time that contains the jobids scored by the unix timestamp (UTC) when they are due.

Run tests

Using tox and py.test. Maybe install tox (e.g. via pip install tox) and then simply run tox.

For the integration tests you need to have the redis binary installed (tests start their own server).

celery_longterm_scheduler changes

1.0.1 (2018-01-17)

  • Don’t try to schedule on apply_async(eta=None) calls

1.0.0 (2017-09-29)

  • Initial release

Release History

This version
History Node


History Node


Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, Size & Hash SHA256 Hash Help File Type Python Version Upload Date
(11.4 kB) Copy SHA256 Hash SHA256
Source None Jan 17, 2018

Supported By

Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Google Google Cloud Servers DreamHost DreamHost Log Hosting