Simple, explicit durable execution framework on top of ZeroMQ and SQLModel.
Project description
Pipeteer
Simple, explicit durable execution framework.
Welcome to Pipeteer
Pipeteer simplifies the complexity of durable execution whilst not hiding the underlying persistence.
Why Pipeteer?
Use pipeteer
if you need...
- Persistance: your app can stop or crash and resume at any time without losing progress
- Observability: you can see the state of your app at any time, and modify it programmatically at runtime
- Exactly-once semantics: your app can be stopped and resumed without dropping or duplicating work
- Fault tolerance: if a task fails, it'll keep working on other tasks and retry it later
- Explicitness:
pipeteer
's high level API is a very thin abstraction over SQLModel (for persistance) and ZeroMQ (for inter-process communication)
Proof of Concept
Definition. You can define a durable workflow this easy:
from pipeteer import activity, workflow, WorkflowContext
@activity()
async def double(x: int) -> int:
return 2*x
@workflow()
async def quad(x: int, ctx: WorkflowContext) -> int:
x2 = await ctx.call(double, x)
x4 = await ctx.call(double, x2)
return x4
Worker. And here's how to run it:
import asyncio
from pipeteer import DB, Context
db = DB.at('pipeline.db')
ctx = Context.of(db)
async def main():
await asyncio.gather(
double.run(ctx),
quad.run(ctx),
ctx.zmq.proxy(),
)
Input. How to give it tasks?
from pipeteer import DB, Context
db = DB.at('pipeline.db')
ctx = Context.of(db)
Input = quad.input(ctx)
with db.session as s:
s.add(Input(key='task', value=1, output='my-output'))
s.commit()
await quad.notify(ctx)
Output. How to get the results?
from sqlmodel import select
from pipeteer import DB, Context
db = DB.at('pipeline.db')
ctx = Context.of(db)
Output = quad.output(ctx, 'my-output')
while True:
with db.session as s:
for entry in s.exec(select(Output)):
print(f'Output: {entry.key} -> {entry.value}')
s.delete(entry)
s.commit()
await ctx.wait('my-output')
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pipeteer-1.0.3.tar.gz
(11.7 kB
view details)
Built Distribution
pipeteer-1.0.3-py3-none-any.whl
(15.7 kB
view details)
File details
Details for the file pipeteer-1.0.3.tar.gz
.
File metadata
- Download URL: pipeteer-1.0.3.tar.gz
- Upload date:
- Size: 11.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e094124efb09a5ee6065bbe54db32612a4d70c62e3affa7cffdd21cbfe0c1b03 |
|
MD5 | ede161aa30d7961adfc5fee0c1997ec6 |
|
BLAKE2b-256 | c638ade294d2f712b96bd6d919716b5b3f9f22a905a49f6b45b2288f6aa9160f |
File details
Details for the file pipeteer-1.0.3-py3-none-any.whl
.
File metadata
- Download URL: pipeteer-1.0.3-py3-none-any.whl
- Upload date:
- Size: 15.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3dc8a9917e0060f6cb272743da6e65a0e4acca57c788a787120d450efacb0e4b |
|
MD5 | 3403999c30042533ffb1f9cb6e4f691c |
|
BLAKE2b-256 | 9e84b0b9d0e7e700bb68fd6747f736f1e614ea86476de53fac11086c2ff77fb3 |