Skip to main content

Declarative orchestration of asynchronous queue-based tasks

Project description

Queue Pipelines

Declarative orchestration of asynchronous queue-based tasks

Usage

1. State-machine specification

from q.pipelines import Tasks, Task

# Input types for tasks
class Image:
  img: str

class Annotated(Image):
  annotation: str

# State machine declaration
# Task(INPUT_TYPES, *NEXT_TASK_IDs)
# Tasks({ taskID -> Task })
# 'output' is a special task ID
TASKS = Tasks(
  input_task='classify', Output=Result,
  tasks=(
    classify=Task(Image, 'annotate_digit', 'annotate_word'),
    annotate_digit=Task(Image, 'output'),
    annotate_work=Task(Image, 'output'),
  )
)

2. Generate code

def codegen():
  TASKS.codegen(__file__, 'TASKS')
  # generated/
  #   types.py
  #   local.py
  #   __init__.py
  TASKS.codegen_pipelines(__file__)
  # pipelines/
  #   _classify.py
  #   _annotate_digit.py
  #   _annotate_word.py
  #   __init__.py

3. Pipelines implementation

Generated code:

# pipelines/_classify.py
from ..generated import Classify

def classify(Qin: Classify.QueueIn, Qout: Classify.QueueOut):
  ...

E.g. implementation:

# pipelines/_classify.py
from fastapi import FastAPI
import uvicorn
from ..generated import Classify

def classify(Qin: Classify.QueueIn, Qout: Classify.QueueOut):
  app = FastAPI()
  @app.get('/tasks')
  async def tasks():
    return await Qin.items()

  @app.post('/annotate')
  async def annotate(annotation: Literal['digit', 'word']):
    id, task = await Qin.read()
    if annotation == 'digit':
      next = Classify.next('digit', task)
    else:
      next = Classify.next('word', task)
    await Qout.push(id, next)
    await Qin.pop(id)

  uvicorn.run(app)
    

4. Run!

from mypkg.pipelines import PIPELINES
from mypkg.generated import run, input_queue, output_queue, queues

def main(input_path: str, output_path: str, queues_path: str):
  Qin = input_queue(queues_path)
  Qout = output_queue(queues_path)
  Qs = queues(queues_path)
  run(Qin, Qout, Qs, **PIPELINES)
  • Each of the pipelines runs on a separate process
  • An extra "connect" process polls from all queues and dispatches to the appropriate next queues/output

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue_pipelines-0.1.4.tar.gz (8.2 kB view details)

Uploaded Source

Built Distribution

queue_pipelines-0.1.4-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file queue_pipelines-0.1.4.tar.gz.

File metadata

  • Download URL: queue_pipelines-0.1.4.tar.gz
  • Upload date:
  • Size: 8.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.6

File hashes

Hashes for queue_pipelines-0.1.4.tar.gz
Algorithm Hash digest
SHA256 346b1ac356763950ffb7168e18e3ca8d34527cc10972df74999bd83989125e77
MD5 0eb3865a6f0f3c9e65ffda0b6e49afec
BLAKE2b-256 7a26dac65dd7cb20710d52ae2734a29c4d96a306fb934300cf1f6c90642949c3

See more details on using hashes here.

File details

Details for the file queue_pipelines-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for queue_pipelines-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 1732f5275d1c98c869c58bf26ac4f16f4eef85d92361cfad33ba46e9fdc8c9fe
MD5 1438cf45f6a6541bec250e7e3c83863a
BLAKE2b-256 e15f4a58622ffd5c14f992362c79fa67d726c2ff73b043c4b4bbef2bfe807e9c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page