Skip to main content

Orchestra is a job scheduler on top of Celery

Project description


Orchestra is:

  • an asyncio based job scheduler
  • using Celery (which is a distributed task queue) under the hood for running tasks/jobs
  • using scheduler under the hood which is a simple in-process python scheduler library with asyncio, threading and timezone support.



  • python >= 3.11.7
  • celery > 5.3.6
  • sqlalchemy > 2.0.25
  • psycopg2-binary
  • scheduler > 0.8.5
  • pytz > 2023.3.post1


Install from pypi:

pip install pyorchestra

Getting started

Schedule definition

- name: sample-tasks # optional name for this block, this has no bearing on how orchestra works whatsoever, you may name your blocks in any way
  module: tasks # this is the python module where celery will look for the task
  schedules: # a list of schedules
    - name: "short_task_every_1_second" # name of the task that shows up in the logs, *has to be unique*
      task: short_task # the function in the module decorated by `orchestra.task`
      enabled: false # if it's not enabled it will be ignored
        timing: "every 00:00:01" # see the examples for a list of understood expressions
        timezone: Europe/Paris # name of a time zone from the tz database
      additional_options: # everything in this optional section will be passed to the celery task when invoked
        queue: background-jobs
      tags: # tags are list of string, they may be used to group together tasks
        - cpu
        - fast

Timing expressions

A list of examples of understood expressions:

  • every 00:00:01
  • every 48:00:00
  • every 31 minutes and 12 seconds
  • every Monday at 03:15:00
  • next Tuesday at 13:11:00
  • once, Wednesday at 01:15:00
  • once on Thursday, 00:31:41
  • every day at 09:15:14
  • every minute at 15 seconds
  • every hour at 05:00
  • every day on 9:03
  • every day at 9:15
  • every day at 13 hours
  • once in 13:11:52
  • 11:23:00
  • 9:13
  • 2023-04-01 09:13
  • 2023-04-01 09:13:11
  • next 23:00:00
  • once in 5 hours 2 minutes and 15 seconds
  • once in 1 hour and 10 minutes
  • once in 2 minutes and 10 seconds
  • once in 17 seconds

By default timing expression with only a time or timestamp (without date) like 11:23:00 will be interpreted as a time instant and will trigger the next time the wall clock shows the specified time.

Creating an instance

import asyncio
from orchestra import Orchestra

orchestra = Orchestra(
    broker="sqla+sqlite:///log.db",  # any Celery supported broker connection string will work here
    backend_conn_str="sqlite:///log.db",  # any SQLAlchemy 2 supported backend connection string will work here
    broker_connection_retry_on_startup=True,  # any Celery supported keyword argument can be passed through to Celery

def simple_task() -> str:
    return "hello world"

async def main():
    schedule_definitions = [
        {"module": "main",  # if you're using the same Python module to declare your Celery tasks and running the scheduler, then this has to be the module's name
         "schedules": [
                 "name": "Simple Task every 48 hours",
                 "task": "simple_task",
                 "enabled": True,
                 "schedule": {
                     "timing": "every 48:00:00",

    await orchestra.create_schedule(schedule_definitions)  # you have to create a schedule first, optionally preload a schedule definition file
    await  # run Orchestra

if __name__ == "__main__":  # Orchestra is based on the scheduler package, which uses asyncio under the hood

The output would look something like the following:

[08:38:50] INFO     Job Simple Task every 48 hours was scheduled to
                    run 2 days, 0:00:00                                         
           INFO     Orchestra starting                      
│State       Shedule     Job name                            Module and task        Due at                     Timezone     Due in     Attempts    Tags │
│Running     CYCLIC      Simple Task every 48 hours          main.simple_task       2024-03-05 07:38:50        UTC          1 day      0/inf            │
│ Orchestrating jobs ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │

You may also use a standalone python module for holding your Celery tasks, but you have to declare Orchestra there as well or import it from the main module:

from time import sleep
from orchestra import Orchestra

orchestra = Orchestra(
    broker="sqla+sqlite:///log.db",  # any Celery supported broker connection string will work here
    backend_conn_str="sqlite:///log.db",  # any SQLAlchemy 2 supported backend connection string will work here
    broker_connection_retry_on_startup=True,  # any Celery supported keyword argument can be passed through to Celery

def fibo(n: int):
    if n <= 1:
        return n
        return fibo(n - 1) + fibo(n - 2)

def short_task() -> int:
    return fibo(10)

def long_task() -> int:
    return fibo(33)

import asyncio
import yaml
from orchestra import Orchestra

orchestra = Orchestra(

async def main():
    schedule_definitions = yaml.safe_load(open("schedule.yaml", "rt"))  # see the Schedule definition section for an example
    await orchestra.create_schedule(schedule_definitions)  # you have to create a schedule first, optionally preload a schedule definition file
    await  # run Orchestra

if __name__ == "__main__":  # Orchestra is based on the scheduler package, which uses asyncio under the hood

Adding jobs programmatically

import asyncio
import datetime

import pytz
from scheduler import trigger

from orchestra import Orchestra

orchestra = Orchestra(

def simple_task() -> str:
    return "Ran Simple Task"

async def hello():

async def main():
    await orchestra.create_schedule()

    # schedule a regular python function, this won't be tracked in the database and Orchestra does not support resuming such jobs in case of a shutdown
    orchestra.scheduler.cyclic(timing=datetime.timedelta(seconds=10), alias="Hello World every 10s", handle=hello)

    # schedule a Celery task function, Orchestra fully supports scheduling Celery tasks programmatically as well as through a schedule definition
                                   job_name="Simple task every 1s")

                                   timing=trigger.Monday(datetime.time(hour=16, minute=30, tzinfo=pytz.timezone("Europe/Berlin"))),
                                   job_name="Simple task every Monday at 16:30 UTC")


if __name__ == "__main__":

The output would look something like the following:

[08:36:47] INFO     Job task_every_48_hours was scheduled to run 2
                    days, 0:00:00                                               
           INFO     Job task_every_31_minutes_12_seconds was
                    scheduled to run 0:31:12                                    
           INFO     Job task_every_Monday_at_03_15_00 was scheduled
                    to run Monday(time=datetime.time(3, 15,                     
                    tzinfo=<DstTzInfo 'Europe/Berlin' LMT+1:16:00             
           INFO     Orchestra starting                      
│State    Shedule   Job name                                    Module and task       Due at               Timezone         Due in    Attempts  Tags    │
│Running  ONCE      task_at_9_13                                tasks.long_task       2024-03-03 09:13:00  Europe/Berlin  0:36:09   0/1       gpu     │
│Running  HOURLY    task_every_hour_at_05_00                    tasks.long_task       2024-03-03 09:05:00  Europe/Berlin  0:28:09   0/inf     gpu     │
│Running  CYCLIC    exception_task_every_10_second              tasks.exception_task  2024-03-03 07:36:57  UTC              0:00:06   0/inf     cpu,fast│
│Running  DAILY     task_every_day_at_9_15                      tasks.long_task       2024-03-03 09:15:00  Europe/Berlin  0:38:09   0/inf     gpu     │
│Running  ONCE      task_next_23_00_00                          tasks.long_task       2024-03-03 23:00:00  Europe/Berlin  14:23:09  0/1       gpu     │
│Running  ONCE      task_once_in_13_11_52                       tasks.long_task       2024-03-03 20:48:39  UTC              13:11:49  0/1       gpu     │
│Running  CYCLIC    task_every_31_minutes_12_seconds            tasks.long_task       2024-03-03 08:07:59  UTC              0:31:09   0/inf     gpu     │
│Running  CYCLIC    short_task_every_1_second                   tasks.short_task      2024-03-03 07:36:51  UTC              0:00:00   3/inf     cpu,fast│
│Running  ONCE      task_at_11_23_00                            tasks.long_task       2024-03-03 11:23:00  Europe/Berlin  2:46:09   0/1       gpu     │
│Running  ONCE      task_once_in_1_hour_and_10_minutes          tasks.long_task       2024-03-03 08:46:47  UTC              1:09:57   0/1       gpu     │
│Running  WEEKLY    task_every_Monday_at_03_15_00               tasks.long_task       2024-03-04 03:15:00  Europe/Berlin  18:38:09  0/inf     gpu     │
│Running  DAILY     task_every_day_at_13_hours                  tasks.long_task       2024-03-03 13:00:00  Europe/Berlin  4:23:09   0/inf     gpu     │
│Running  ONCE      task_next_Tuesday_at_13_11_00               tasks.long_task       2024-03-05 13:11:00  Europe/Berlin  2 days    0/1       gpu     │
│Running  DAILY     task_every_day_on_9_03                      tasks.long_task       2024-03-03 09:03:00  Europe/Berlin  0:26:09   0/inf     gpu     │
│Running  MINUTELY  task_every_minute_at_15_seconds             tasks.long_task       2024-03-03 08:37:15  Europe/Berlin  0:00:24   0/inf     gpu     │
│Running  ONCE      task_2024_04_01_09_13                       tasks.long_task       2024-04-01 09:13:00  Europe/Berlin  28 days   0/1       gpu     │
│Running  ONCE      task_once_Wednesday_at_01_15_00             tasks.long_task       2024-03-06 01:15:00  Europe/Berlin  2 days    0/1       gpu     │
│Running  ONCE      task_once_on_Thursday_00_31_41              tasks.long_task       2024-03-07 00:31:41  Europe/Berlin  3 days    0/1       gpu     │
│Running  ONCE      task_once_in_5_hours_2_minutes_and_15_sec…  tasks.long_task       2024-03-03 12:39:02  UTC              5:02:12   0/1       gpu     │
│Running  DAILY     task_every_day_at_09_15_14                  tasks.long_task       2024-03-03 09:15:14  Europe/Berlin  0:38:23   0/inf     gpu     │
│Running  CYCLIC    task_every_48_hours                         tasks.long_task       2024-03-05 07:36:47  UTC              1 day     0/inf     gpu     │
│Running  WEEKLY    task_every_Monday_at_03_15_00_US            tasks.long_task       2024-03-04 03:15:00  America/Boise    1 day     0/inf     gpu     │
│ Orchestrating jobs ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │

Pausing and resuming jobs

import asyncio
import datetime

import pytz
from scheduler import trigger

from orchestra import Orchestra

orchestra = Orchestra(

def simple_task() -> str:
    return "Ran Simple Task"

async def hello():

async def main():
    await orchestra.create_schedule()

    # schedule a regular python function, this won't be tracked in the database and Orchestra does not support resuming such jobs in case of a shutdown
    orchestra.scheduler.cyclic(timing=datetime.timedelta(seconds=10), alias="Hello World every 10s", handle=hello)

    # schedule a Celery task function, Orchestra fully supports scheduling Celery tasks programmatically as well as through a schedule definition
                                   job_name="Simple task every 1s",
                                   tags={"latency", "gpu"})

                                   timing=trigger.Monday(datetime.time(hour=16, minute=30, tzinfo=pytz.timezone("Europe/Berlin"))),
                                   job_name="Simple task every Monday at 16:30 UTC",

    # you may pause a task by name or by matching tags
    orchestra.pause_job("Simple task every Monday at 16:30 UTC")

    orchestra.pause_jobs_with_tags({"latency"}, any_tag=True)

    # resume job by name or by matching tags
    orchestra.resume_jobs_with_tags({"latency"}, any_tag=True)


if __name__ == "__main__":

The output would look something like the following:

[08:39:41] INFO     Job Simple task every 1s was scheduled to run
           INFO     Job Simple task every Monday at 16:30 UTC was
                    scheduled to run Monday(time=datetime.time(16,              
                    30, tzinfo=<DstTzInfo 'Europe/Berlin'                       
                    LMT+0:53:00 STD>))                                          
           INFO     Paused job Simple task every Monday at 16:30 UTC
           INFO     Paused job Simple task every 1s        
           INFO     Resumed job Simple task every 1s       
           INFO     Orchestra starting                      
│State     Shedule   Job name                                 Module and task        Due at                Timezone       Due in   Attempts  Tags       │
│Running   CYCLIC    Hello World every 10s                    hello                  2024-03-03 07:39:51   UTC            0:00:04  0/inf                │
│Running   CYCLIC    Simple task every 1s                     __main__.simple_task   2024-03-03 07:39:47   UTC            0:00:00  5/inf     gpu,latency│
│Paused    WEEKLY    Simple task every Monday at 16:30 UTC    __main__.simple_task   2024-03-04 16:30:00   Europe/Berlin  1 day    0/inf     cpu        │
│ Orchestrating jobs ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │

Using the API

Orchestra comes with a built-in API accessible by setting api_server=True on the Orchestra instance. You may optionally provide an api_address parameter to control which host:port gets bound by uvicorn.

import asyncio

import yaml

from orchestra import Orchestra

orchestra = Orchestra(

async def main():
    schedule_definitions = yaml.safe_load(open("schedule.yaml", "rt"))
    await orchestra.create_schedule(schedule_definitions)

if __name__ == "__main__":

You can check the Swagger Docs created by FastAPI at the specified api_address, which is http://localhost:5000 in the above example and by default.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyorchestra-0.4.1.tar.gz (22.2 kB view hashes)

Uploaded Source

Built Distribution

pyorchestra-0.4.1-py3-none-any.whl (23.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page