Create and manage workflows using Celery tasks
Project description
Rhythm
Rhythm allows you to design and control workflows made of Celery tasks. A workflow is a sequence of steps to run one after the other. Rhythm simplifies the process of executing workflows consisting of long-running tasks with reliability.
The following are the features of Rhythm workflows:
- If a workflow consisting of three steps (S1, S2, and S3) encounters a failure while executing S2 (even after retries by Celery), it is possible to resume the workflow later. Resuming the workflow with restart S2 with previous arguments and after its completion S3 will be run.
- A workflow can be paused and resumed later.
- You can keep track of which step is currently running, as well as its progress.
Installation
pip install sca-rhythm
see on pypi
Prerequisites
Celery app should be configured with a mongo database backend.
Create Tasks with WorkflowTask class
import os
import time
from celery import Celery
from sca_rhythm import WorkflowTask
app = Celery("tasks")
@app.task(base=WorkflowTask, bind=True)
def task1(self, batch_id, **kwargs):
print(f'task - {os.getpid()} 1 starts with {batch_id}')
# do work
time.sleep(1)
# update progress to result backend
# sets the task's state as "PROGRESS"
self.update_progress({
'done': 2873,
'total': 100000
})
# do some more work
return batch_id, {'return_obj': 'foo'}
:warning: Task Constraints :warning:
- The task signature must contain
**kwargsfor the workflow orchestration to function. - The return type must be of list / tuple type and the first element of the return value is sent to the next task as its argument.
Create Workflows with Workflow class
from celery import Celery
from sca_rhythm import Workflow
steps = [
{
'name': 'inspect',
'task': 'tasks.inspect'
'queue': 'q1',
},
{
'name': 'archive',
'task': 'tasks.archive',
'queue': 'q1',
},
{
'name': 'stage',
'task': 'tasks.stage',
'queue': 'q2',
'priority': 5
}
]
wf = Workflow(app, steps=steps, name='archive_batch', app_id='app')
wf.start('batch-id-test')
The provided code defines a workflow consisting of multiple steps, each representing a task to be executed in a specific order. The workflow is initiated with a unique identifier, and its steps are configured with task names, associated queues, and optional priorities.
Each step is represented as a dictionary with the following properties:
- name: A descriptive name for the step.
- task: The task to be executed, specified as a string containing the task's import path.
- queue: The Celery queue to which the task should be sent.
- priority (optional): An integer (between 0 and 9) indicating the priority of the task in the queue. If not provided, the priority is set to the step's position in the workflow. If there are more than 9 tasks, tasks in positions 10 and above will all recieve priority 9.
Priority Scheme: The priority scheme is designed to optimize the execution of tasks within the same workflow. Tasks with higher priorities are executed before those with lower priorities. If no priority is specified, the default priority is set to the step's position in the workflow. This scheme ensures that tasks within a workflow are executed sequentially with increasing priority, minimizing the likelihood of interweaving tasks from different workflows.
Pause and Resume Workflows
Pausing a workflow stop the current running task and resuming a workflow will restart the stopped task with the same arguments.
wf = Workflow(app, workflow_id='2f87decb-a431-472b-b26e-32c894993881')
wf.pause()
wf.resume()
Build & Publish
poetry install
poetry publish --build
Task Status
- PENDING: Task state is unknown (assumed pending since you know the id).
- STARTED: Task was started by a worker (task_track_started = True)
- SUCCESS: Task succeeded
- FAILURE: Task failed
- REVOKED: Task was revoked
- RETRY: Task is waiting for retry.
Workflow Status
The workflow status is a summative status that is determined by the status of the initial step that is not marked as " SUCCESS," which is referred to as a "pending step".
- PENDING - the pending step is the first step in the workflow and its status is pending.
- STARTED - the status of the pending step is one of STARTED, RETRY, PENDING.
- REVOKED - the pending step was revoked, the workflow can be resumed.
- FAILURE - the pending step was failed, the workflow can be resumed.
- SUCCESS - all steps have succeeded.
Status Groups:
- DONE: { SUCCESS, FAILURE, REVOKED }
- ACTIVE: !DONE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sca_rhythm-0.6.14.tar.gz.
File metadata
- Download URL: sca_rhythm-0.6.14.tar.gz
- Upload date:
- Size: 11.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.10.10 Darwin/23.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6622d6e6e3f2305db1f8c523f8da14e9f06fbd17a3241de7ff405564c3ce193
|
|
| MD5 |
acf5bf92467abc6c0aa1211c1f00725c
|
|
| BLAKE2b-256 |
9ad4ce7a182b240351cd53b58119e735eca4fd3e9b6f7f7d12539a15f5153fd5
|
File details
Details for the file sca_rhythm-0.6.14-py3-none-any.whl.
File metadata
- Download URL: sca_rhythm-0.6.14-py3-none-any.whl
- Upload date:
- Size: 10.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.10.10 Darwin/23.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e212c1e1c0044b76b2fad25ce49e19941d45046a46e31e012e1205686ca66f28
|
|
| MD5 |
9ec39b8bedadb357cf6ea0da5d33cf8e
|
|
| BLAKE2b-256 |
59c4f00a112ba11e47db0465940b2acb4090898e681670f644a46d2b9ea9c523
|