Skip to main content

Schedule tasks in an SQL Database and define Async Celery Tasks.

Project description

celery-sqlalchemy-kit

About

This kit enables you to store periodic celery tasks in an SQLAlchemy compatible database. The schedules can be set as crontabs or time-intervals. Scheduled tasks in the database can be set active or inactive to control whether they should run.
This kit also allows your celery workers to run asynchronous tasks.

This package is under active development and used in production at cap-on.

NOTE: This package was developed and tested with a PostgreSQL backend. Theoretically, any other SQLAlchemy compatible database, that supports JSON type, can be used. But this has not been tested yet.

Getting Started

Requirements

  • python >= 3.10
  • celery >= 5.2.7
  • sqlalchemy >= 1.4.46 - 2.0 should work as well
  • psycopg2 >= 2.9.3 / mysql-connector / other connector depending on database
  • redis >= 4.5.1 / other broker/backend for celery

Earlier versions should work as well, they just have not been tested yet.

Installation

You can install this package from PyPi:

pip install celery-sqlalchemy-kit

SQL Table 'routines'

Using this package will create a table that contains your scheduled celery tasks. The structure of table 'routines' is as follows:

Column Type Nullable
id uuid not null
name character varying(50) not null
task character varying(50) not null
schedule json not null
last_run_at timestamp without time zone
total_run_count integer
active boolean not null
kwargs json
options json

Usage & Configuration

To demonstrate how to use the features of this package, there are examples in the 'example'-directory. These are being explained in the following.

1. Configuration

First you need to instantiate celery, as you would normally do. Have a look at the celery documentation.
If you define your tasks in another file, include the path to their file via include.

celery = Celery(  
    "celery", 
    include=["celery-sqlalchemy-kit.example.custom_tasks"], 
    backend=your_result_backend, 
    broker=your_broker_url
)  

To use this package you have to configure some variables of your celery instance like so:

celery.conf.update(  
    {  
        "scheduler_db_uri": scheduler_db_uri,  
        "scheduler_max_interval": scheduler_max_interval,  
        "scheduler_sync_every": scheduler_sync_every,  
        "celery_max_retry": celery_max_retry,  
        "celery_retry_delay": celery_retry_delay,  
        "create_table": True,  
  },  
)
variable explanation default
scheduler_db_uri db uri used by scheduler (must be synchronous) "psycopg2:///schedule.db"
scheduler_max_interval maximum time to sleep between re-checking the schedule 300 (seconds)
scheduler_sync_every How often to sync the schedule 3 * 60 (seconds)
celery_max_retry How often to retry a task when it fails 3
celery_retry_delay How long to wait before next retry of failed task is started 300 (seconds)
create_table If set True, table 'routines' for scheduled tasks is created automatically with sqlalchemy. If you wish to use alembic, set to False True

Make sure to use the correct scheduler_db_uri of your project allowing the RoutineScheduler to create a table named routines and save your scheduled tasks in it. These variables are also available as environment variables in upper case (except for create_table).

2.1. Create scheduled tasks

To create tasks that run after your desired schedule, you have to inherit from class SyncTask like so:

class CeleryTestTask(SyncTask):  
    name = "celery test"  
    schedule = 15   
  
    def run(self, *args, **kwargs):  
        # do stuff

The task that you want to be executed, has to be defined as the runmethod from your task class. Make sure to define a name for your task and set the variable schedule. For the schedule you have to options:

schedule type explanation example syntax in db
time interval run your task every ... seconds schedule = 15 {"timedelta": 15}
crontab define schedule as crontab, by creating a dict schedule = {"minute": 0, "hour": 9, "day_of_month": 15} {"minute": 0, "hour": 9, "day_of_month": 15}

Now when you start your program, the scheduled tasks are added to your database in routines table and executed by a celery worker within the schedule you defined.

NOTE: In your database the schedules of tasks are saved as type JSON. Make sure to keep the correct syntax.

2.2. Create asynchronous scheduled tasks

This works the same way as with synchronous tasks, except that your custom tasks has to inherit from class AsnycTask and the task logic has to be implemented within the async method execute:

class CeleryTestTask(AsyncTask):  
    name = "celery test"  
    schedule = 15   
  
    async def execute(self, *args, **kwargs):  
        # do stuff

3. Change schedule / (in-) activate tasks

If you wish to change the schedule of a task, just update the corresponding db entry. The next time the RoutineScheduler synchronizes, it will acknowledge the new schedule. Same thing with activating or inactivating tasks. To activate a task, set column active in your db to t (True). To inactivate a task, set column active in your db to f (False).
A task that is inactive will not be executed as long as you change it to active again.

4. Source of truth for schedule

  • New scheduled task in code, that is not in db: new db entry is created automatically
  • Scheduled task in code as well as db: schedule in db is used to run task
  • Tasks that are deleted in code will be removed from db after redeploy
  • Inactive tasks in db will not be executed

5. Run celery worker and beat

To start celery worker:

$ celery -A celery_instance worker --loglevel=INFO

To start celery beat:

$ celery -A celery_instance beat --loglevel=INFO --scheduler=scheduler.RoutineScheduler

celery_instance is the file containing your celery instance. Use the correct path of your project.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_sqlalchemy_kit-0.1.1.tar.gz (13.8 kB view details)

Uploaded Source

Built Distribution

celery_sqlalchemy_kit-0.1.1-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file celery_sqlalchemy_kit-0.1.1.tar.gz.

File metadata

  • Download URL: celery_sqlalchemy_kit-0.1.1.tar.gz
  • Upload date:
  • Size: 13.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.9

File hashes

Hashes for celery_sqlalchemy_kit-0.1.1.tar.gz
Algorithm Hash digest
SHA256 46567e73f9dd6e5bab8a4a3d573e689020eb93d7a337eed76244f230dc53787a
MD5 1548aac77061f8d76d8bbc2e72847bf8
BLAKE2b-256 54adf914005189509b389389e8665d9b1e04a45b663299727121edcc2475ce78

See more details on using hashes here.

File details

Details for the file celery_sqlalchemy_kit-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_sqlalchemy_kit-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d80251e62d2a1680b810ad520d9ad847a2da4aa12327fdbb5a9fa7b908014aff
MD5 097dac5184694020f0862b73e1791577
BLAKE2b-256 abad6997e1aba8835968762b9f63d6879b98f0fefa844541fe11a1c8b3354dfd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page