Schedule tasks in an SQL Database and define Async Celery Tasks.
Project description
celery-sqlalchemy-kit
About
This kit enables you to store periodic celery tasks in an SQLAlchemy compatible database.
The schedules can be set as crontabs or time-intervals.
Scheduled tasks in the database can be set active or inactive to control whether they should run.
This kit also allows your celery workers to run asynchronous tasks.
This package is under active development and used in production at cap-on.
NOTE: This package was developed and tested with a PostgreSQL backend. Theoretically, any other SQLAlchemy compatible database, that supports JSON type, can be used. But this has not been tested yet.
Getting Started
Requirements
- python >= 3.10
- celery >= 5.2.7
- sqlalchemy >= 1.4.46 - 2.0 should work as well
- psycopg2 >= 2.9.3 / mysql-connector / other connector depending on database
- redis >= 4.5.1 / other broker/backend for celery
Earlier versions should work as well, they just have not been tested yet.
Installation
You can install this package from PyPi:
pip install celery-sqlalchemy-kit
SQL Table 'routines'
Using this package will create a table that contains your scheduled celery tasks. The structure of table 'routines' is as follows:
Column | Type | Nullable |
---|---|---|
id | uuid | not null |
name | character varying(50) | not null |
task | character varying(50) | not null |
schedule | json | not null |
last_run_at | timestamp without time zone | |
total_run_count | integer | |
active | boolean | not null |
kwargs | json | |
options | json |
Usage & Configuration
To demonstrate how to use the features of this package, there are examples in the 'example'-directory. These are being explained in the following.
1. Configuration
First you need to instantiate celery, as you would normally do.
Have a look at the celery documentation.
If you define your tasks in another file, include the path to their file via include
.
celery = Celery(
"celery",
include=["celery-sqlalchemy-kit.example.custom_tasks"],
backend=your_result_backend,
broker=your_broker_url
)
To use this package you have to configure some variables of your celery instance like so:
celery.conf.update(
{
"scheduler_db_uri": scheduler_db_uri,
"scheduler_max_interval": scheduler_max_interval,
"scheduler_sync_every": scheduler_sync_every,
"celery_max_retry": celery_max_retry,
"celery_retry_delay": celery_retry_delay,
"create_table": True,
},
)
variable | explanation | default |
---|---|---|
scheduler_db_uri |
db uri used by scheduler (must be synchronous) | "psycopg2:///schedule.db" |
scheduler_max_interval |
maximum time to sleep between re-checking the schedule | 300 (seconds) |
scheduler_sync_every |
How often to sync the schedule | 3 * 60 (seconds) |
celery_max_retry |
How often to retry a task when it fails | 3 |
celery_retry_delay |
How long to wait before next retry of failed task is started | 300 (seconds) |
create_table |
If set True , table 'routines' for scheduled tasks is created automatically with sqlalchemy. If you wish to use alembic, set to False |
True |
Make sure to use the correct scheduler_db_uri
of your project allowing the RoutineScheduler
to create a table named routines
and save your scheduled tasks in it.
These variables are also available as environment variables in upper case (except for create_table
).
2.1. Create scheduled tasks
To create tasks that run after your desired schedule, you have to inherit from class SyncTask
like so:
class CeleryTestTask(SyncTask):
name = "celery test"
schedule = 15
def run(self, *args, **kwargs):
# do stuff
The task that you want to be executed, has to be defined as the run
method from your task class. Make sure to define a name for your task and set the variable schedule
. For the schedule you have to options:
schedule type | explanation | example | syntax in db |
---|---|---|---|
time interval | run your task every ... seconds | schedule = 15 | {"timedelta": 15} |
crontab | define schedule as crontab, by creating a dict | schedule = {"minute": 0, "hour": 9, "day_of_month": 15} | {"minute": 0, "hour": 9, "day_of_month": 15} |
Now when you start your program, the scheduled tasks are added to your database in routines
table and executed by a celery worker within the schedule you defined.
NOTE: In your database the schedules of tasks are saved as type JSON. Make sure to keep the correct syntax.
2.2. Create asynchronous scheduled tasks
This works the same way as with synchronous tasks, except that your custom tasks has to inherit from class AsnycTask
and the task logic has to be implemented within the async method execute
:
class CeleryTestTask(AsyncTask):
name = "celery test"
schedule = 15
async def execute(self, *args, **kwargs):
# do stuff
3. Change schedule / (in-) activate tasks
If you wish to change the schedule of a task, just update the corresponding db entry.
The next time the RoutineScheduler
synchronizes, it will acknowledge the new schedule.
Same thing with activating or inactivating tasks.
To activate a task, set column active
in your db to t
(True).
To inactivate a task, set column active
in your db to f
(False).
A task that is inactive will not be executed as long as you change it to active again.
4. Source of truth for schedule
- New scheduled task in code, that is not in db: new db entry is created automatically
- Scheduled task in code as well as db: schedule in db is used to run task
- Tasks that are deleted in code will be removed from db after redeploy
- Inactive tasks in db will not be executed
5. Run celery worker and beat
To start celery worker:
$ celery -A celery_instance worker --loglevel=INFO
To start celery beat:
$ celery -A celery_instance beat --loglevel=INFO --scheduler=scheduler.RoutineScheduler
celery_instance
is the file containing your celery instance.
Use the correct path of your project.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file celery_sqlalchemy_kit-0.1.0.tar.gz
.
File metadata
- Download URL: celery_sqlalchemy_kit-0.1.0.tar.gz
- Upload date:
- Size: 13.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6b962d524ec1fc969d3e4ab8402d878520bf2f89c4b8d6f3bc3f6b11090b2545 |
|
MD5 | ddd8d6aa40e0fd1229be97aee4bf322a |
|
BLAKE2b-256 | adff14b608e31304f7a77fd2810ae36d3235e88753fcd49a7a3431071867e646 |
File details
Details for the file celery_sqlalchemy_kit-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: celery_sqlalchemy_kit-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0fdad8621f64c68d785255951d3a95e5e840191be4523d705b88b3f1921d155e |
|
MD5 | a53e7466225a967dc49490a66ad78522 |
|
BLAKE2b-256 | df1c8263fc2961d3e9af62951acd712abe9fd35853cf3c4e77a86beb81061116 |