Skip to main content

Create and manage workflows using Celery tasks

Project description

Rhythm

Create and manage workflows using Celery tasks.

Prerequisites

Celery app should be configured with a mongo database backend.

Create Tasks with WorkflowTask class

import os
import time

from celery import Celery

from sca_rhythm import WorkflowTask

app = Celery("tasks")

@app.task(base=WorkflowTask, bind=True)
def task1(self, batch_id, **kwargs):
    print(f'task - {os.getpid()} 1 starts with {batch_id}')
    # do work
    time.sleep(1)

    # update progress to result backend
    # sets the task's state as "PROGRESS"
    self.update_progress({
        done: 2873,
        total: 100000
    })

    # do some more work
    return batch_id, {'return_obj': 'foo'}

:warning: Task Constraints :warning:

  1. The task signature must contain **kwargs for the workflow orchestration to function.
  2. The return type must be of list / tuple type and the first element of the return value is sent to the next task as its argument.

Create Workflows with Workflow class

from celery import Celery

from sca_rhythm import Workflow

steps = [
    {
        'name': 'inspect',
        'task': 'tasks.inspect'
    },
    {
        'name': 'archive',
        'task': 'tasks.archive'
    },
    {
        'name': 'stage',
        'task': 'tasks.stage'
    }
]

wf = Workflow(app, steps=steps, name='archive_batch')
wf.start('batch-id-test')

Pause / Resume Workflows

wf = Workflow(app, workflow_id='2f87decb-a431-472b-b26e-32c894993881')

wf.pause()

wf.resume()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sca_rhythm-0.1.2.tar.gz (5.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sca_rhythm-0.1.2-py3-none-any.whl (6.2 kB view details)

Uploaded Python 3

File details

Details for the file sca_rhythm-0.1.2.tar.gz.

File metadata

  • Download URL: sca_rhythm-0.1.2.tar.gz
  • Upload date:
  • Size: 5.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.11.0 Darwin/21.6.0

File hashes

Hashes for sca_rhythm-0.1.2.tar.gz
Algorithm Hash digest
SHA256 4d16020674503df0517c5991f2c27a09eee2dfa20f4f33922ae4e47d99be9bc2
MD5 fdb15a481e9d68ece8fd5a713e96914d
BLAKE2b-256 bc785f90f47f90cb45013f8e53cb6e25747b22c8aca6510f4509accc01e81693

See more details on using hashes here.

File details

Details for the file sca_rhythm-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: sca_rhythm-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 6.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.11.0 Darwin/21.6.0

File hashes

Hashes for sca_rhythm-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 bd83aba5176f55b4f3582dbdd7d3510dfb44ac102169b3eb108960277912259d
MD5 730722b9d6762d95c3c52c3ff83b8c0a
BLAKE2b-256 7406b0e080b511b9382ac06c05e125bc2cc7d474ad11e13fff43630e62f81675

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page