Skip to main content

Create and manage workflows using Celery tasks

Project description

Rhythm

Rhythm allows you to design and control workflows made of Celery tasks. A workflow is a sequence of steps to run one after the other. Rhythm simplifies the process of executing workflows consisting of long-running tasks with reliability.

The following are the features of Rhythm workflows:

  • If a workflow consisting of three steps (S1, S2, and S3) encounters a failure while executing S2 (even after retries by Celery), it is possible to resume the workflow later. Resuming the workflow with restart S2 with previous arguments and after its completion S3 will be run.
  • A workflow can be paused and resumed later.
  • You can keep track of which step is currently running, as well as its progress.

Installation

pip install sca-rhythm

see on pypi

Prerequisites

Celery app should be configured with a mongo database backend.

Create Tasks with WorkflowTask class

import os
import time

from celery import Celery

from sca_rhythm import WorkflowTask

app = Celery("tasks")


@app.task(base=WorkflowTask, bind=True)
def task1(self, batch_id, **kwargs):
    print(f'task - {os.getpid()} 1 starts with {batch_id}')
    # do work
    time.sleep(1)

    # update progress to result backend
    # sets the task's state as "PROGRESS"
    self.update_progress({
        'done': 2873,
        'total': 100000
    })

    # do some more work
    return batch_id, {'return_obj': 'foo'}

:warning: Task Constraints :warning:

  1. The task signature must contain **kwargs for the workflow orchestration to function.
  2. The return type must be of list / tuple type and the first element of the return value is sent to the next task as its argument.

Create Workflows with Workflow class

from celery import Celery

from sca_rhythm import Workflow

steps = [
    {
        'name': 'inspect',
        'task': 'tasks.inspect'
    },
    {
        'name': 'archive',
        'task': 'tasks.archive'
    },
    {
        'name': 'stage',
        'task': 'tasks.stage'
    }
]

wf = Workflow(app, steps=steps, name='archive_batch')
wf.start('batch-id-test')

Pause and Resume Workflows

Pausing a workflow stop the current running task and resuming a workflow will restart the stopped task with the same arguments.

wf = Workflow(app, workflow_id='2f87decb-a431-472b-b26e-32c894993881')

wf.pause()

wf.resume()

Build & Publish

poetry install
poetry publish --build

Task Status

  • PENDING: Task state is unknown (assumed pending since you know the id).
  • STARTED: Task was started by a worker (task_track_started = True)
  • SUCCESS: Task succeeded
  • FAILURE: Task failed
  • REVOKED: Task was revoked
  • RETRY: Task is waiting for retry.

Workflow Status

The workflow status is a summative status that is determined by the status of the initial step that is not marked as " SUCCESS," which is referred to as a "pending step".

  • PENDING - the pending step is the first step in the workflow and its status is pending.
  • STARTED - the status of the pending step is one of STARTED, RETRY, PENDING.
  • REVOKED - the pending step was revoked, the workflow can be resumed.
  • FAILURE - the pending step was failed, the workflow can be resumed.
  • SUCCESS - all steps have succeeded.

Status Groups:

  • DONE: { SUCCESS, FAILURE, REVOKED }
  • ACTIVE: !DONE

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sca_rhythm-0.6.13.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sca_rhythm-0.6.13-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file sca_rhythm-0.6.13.tar.gz.

File metadata

  • Download URL: sca_rhythm-0.6.13.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.10 Darwin/23.1.0

File hashes

Hashes for sca_rhythm-0.6.13.tar.gz
Algorithm Hash digest
SHA256 d0a907f1f9efb1001df86416b1e294cff98894cde4ea98714d10e564948dd29a
MD5 4c13697b8db0cac0d3cfa20ee08f3622
BLAKE2b-256 bfded1208d027c425e14443452807415ef5ebb6ff2e91c260eadb228b6f1a8b8

See more details on using hashes here.

File details

Details for the file sca_rhythm-0.6.13-py3-none-any.whl.

File metadata

  • Download URL: sca_rhythm-0.6.13-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.10 Darwin/23.1.0

File hashes

Hashes for sca_rhythm-0.6.13-py3-none-any.whl
Algorithm Hash digest
SHA256 6fbd83ea137d4e0989c47b1f7661a3934da625c788ebec401ed1aafcf448c957
MD5 e123f4e2d6bc0cc19a7fbf54ded3c1f7
BLAKE2b-256 89e5e6373052c2a5fc232460962b6d76b64db1ff2ef6eba7a6748e81ae0cb8eb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page