Create and manage workflows using Celery tasks
Project description
Rhythm
Create and manage workflows using Celery tasks.
Prerequisites
Celery app should be configured with a mongo database backend.
Create Tasks with WorkflowTask class
import os
import time
from celery import Celery
from sca_rhythm import WorkflowTask
app = Celery("tasks")
@app.task(base=WorkflowTask, bind=True)
def task1(self, batch_id, **kwargs):
print(f'task - {os.getpid()} 1 starts with {batch_id}')
# do work
time.sleep(1)
# update progress to result backend
# sets the task's state as "PROGRESS"
self.update_progress({
done: 2873,
total: 100000
})
# do some more work
return batch_id, {'return_obj': 'foo'}
:warning: Task Constraints :warning:
- The task signature must contain
**kwargsfor the workflow orchestration to function. - The return type must be of list / tuple type and the first element of the return value is sent to the next task as its argument.
Create Workflows with Workflow class
from celery import Celery
from sca_rhythm import Workflow
steps = [
{
'name': 'inspect',
'task': 'tasks.inspect'
},
{
'name': 'archive',
'task': 'tasks.archive'
},
{
'name': 'stage',
'task': 'tasks.stage'
}
]
wf = Workflow(app, steps=steps, name='archive_batch')
wf.start('batch-id-test')
Pause / Resume Workflows
wf = Workflow(app, workflow_id='2f87decb-a431-472b-b26e-32c894993881')
wf.pause()
wf.resume()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
sca_rhythm-0.1.2.tar.gz
(5.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sca_rhythm-0.1.2.tar.gz.
File metadata
- Download URL: sca_rhythm-0.1.2.tar.gz
- Upload date:
- Size: 5.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.1 CPython/3.11.0 Darwin/21.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4d16020674503df0517c5991f2c27a09eee2dfa20f4f33922ae4e47d99be9bc2
|
|
| MD5 |
fdb15a481e9d68ece8fd5a713e96914d
|
|
| BLAKE2b-256 |
bc785f90f47f90cb45013f8e53cb6e25747b22c8aca6510f4509accc01e81693
|
File details
Details for the file sca_rhythm-0.1.2-py3-none-any.whl.
File metadata
- Download URL: sca_rhythm-0.1.2-py3-none-any.whl
- Upload date:
- Size: 6.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.1 CPython/3.11.0 Darwin/21.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd83aba5176f55b4f3582dbdd7d3510dfb44ac102169b3eb108960277912259d
|
|
| MD5 |
730722b9d6762d95c3c52c3ff83b8c0a
|
|
| BLAKE2b-256 |
7406b0e080b511b9382ac06c05e125bc2cc7d474ad11e13fff43630e62f81675
|