Declarative orchestration of asynchronous queue-based tasks
Project description
Queue Pipelines
Declarative orchestration of asynchronous queue-based tasks
Usage
1. State-machine specification
from q.pipelines import Tasks, Task
# Input types for tasks
class Image:
img: str
class Annotated(Image):
annotation: str
# State machine declaration
# Task(INPUT_TYPES, *NEXT_TASK_IDs)
# Tasks({ taskID -> Task })
# 'output' is a special task ID
TASKS = Tasks(
input_task='classify', Output=Result,
tasks=(
classify=Task(Image, 'annotate_digit', 'annotate_word'),
annotate_digit=Task(Image, 'output'),
annotate_work=Task(Image, 'output'),
)
)
2. Generate code
def codegen():
TASKS.codegen(__file__, 'TASKS')
# generated/
# types.py
# local.py
# __init__.py
TASKS.codegen_pipelines(__file__)
# pipelines/
# _classify.py
# _annotate_digit.py
# _annotate_word.py
# __init__.py
3. Pipelines implementation
Generated code:
# pipelines/_classify.py
from ..generated import Classify
def classify(Qin: Classify.QueueIn, Qout: Classify.QueueOut):
...
E.g. implementation:
# pipelines/_classify.py
from fastapi import FastAPI
import uvicorn
from ..generated import Classify
def classify(Qin: Classify.QueueIn, Qout: Classify.QueueOut):
app = FastAPI()
@app.get('/tasks')
async def tasks():
return await Qin.items()
@app.post('/annotate')
async def annotate(annotation: Literal['digit', 'word']):
id, task = await Qin.read()
if annotation == 'digit':
next = Classify.next('digit', task)
else:
next = Classify.next('word', task)
await Qout.push(id, next)
await Qin.pop(id)
uvicorn.run(app)
4. Run!
from mypkg.pipelines import PIPELINES
from mypkg.generated import run, input_queue, output_queue, queues
def main(input_path: str, output_path: str, queues_path: str):
Qin = input_queue(queues_path)
Qout = output_queue(queues_path)
Qs = queues(queues_path)
run(Qin, Qout, Qs, **PIPELINES)
- Each of the pipelines runs on a separate process
- An extra "connect" process polls from all queues and dispatches to the appropriate next queues/output
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
queue_pipelines-0.1.4.tar.gz
(8.2 kB
view details)
Built Distribution
File details
Details for the file queue_pipelines-0.1.4.tar.gz
.
File metadata
- Download URL: queue_pipelines-0.1.4.tar.gz
- Upload date:
- Size: 8.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 346b1ac356763950ffb7168e18e3ca8d34527cc10972df74999bd83989125e77 |
|
MD5 | 0eb3865a6f0f3c9e65ffda0b6e49afec |
|
BLAKE2b-256 | 7a26dac65dd7cb20710d52ae2734a29c4d96a306fb934300cf1f6c90642949c3 |
File details
Details for the file queue_pipelines-0.1.4-py3-none-any.whl
.
File metadata
- Download URL: queue_pipelines-0.1.4-py3-none-any.whl
- Upload date:
- Size: 11.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1732f5275d1c98c869c58bf26ac4f16f4eef85d92361cfad33ba46e9fdc8c9fe |
|
MD5 | 1438cf45f6a6541bec250e7e3c83863a |
|
BLAKE2b-256 | e15f4a58622ffd5c14f992362c79fa67d726c2ff73b043c4b4bbef2bfe807e9c |