Skip to main content

Simple, explicit durable execution framework on top of ZeroMQ and SQLModel.

Project description

Pipeteer

Simple, explicit durable execution framework.

Read the docs

Welcome to Pipeteer

Pipeteer simplifies the complexity of durable execution whilst not hiding the underlying persistence.

Why Pipeteer?

Use pipeteer if you need...

  • Persistance: your app can stop or crash and resume at any time without losing progress
  • Observability: you can see the state of your app at any time, and modify it programmatically at runtime
  • Exactly-once semantics: your app can be stopped and resumed without dropping or duplicating work
  • Fault tolerance: if a task fails, it'll keep working on other tasks and retry it later
  • Explicitness: pipeteer's high level API is a very thin abstraction over SQLModel (for persistance) and ZeroMQ (for inter-process communication)

Proof of Concept

Definition. You can define a durable workflow this easy:

from pipeteer import activity, workflow, WorkflowContext

@activity()
async def double(x: int) -> int:
  return 2*x

@workflow()
async def quad(x: int, ctx: WorkflowContext) -> int:
  x2 = await ctx.call(double, x)
  x4 = await ctx.call(double, x2)
  return x4

Worker. And here's how to run it:

import asyncio
from pipeteer import DB, Context

db = DB.at('pipeline.db')
ctx = Context.of(db)

async def main():
  await asyncio.gather(
    double.run(ctx),
    quad.run(ctx),
    ctx.zmq.proxy(),
  )

Input. How to give it tasks?

from pipeteer import DB, Context

db = DB.at('pipeline.db')
ctx = Context.of(db)

Input = quad.input(ctx)
with db.session as s:
  s.add(Input(key='task', value=1, output='my-output'))
  s.commit()

await quad.notify(ctx)

Output. How to get the results?

from sqlmodel import select
from pipeteer import DB, Context

db = DB.at('pipeline.db')
ctx = Context.of(db)

Output = quad.output(ctx, 'my-output')
while True:
  with db.session as s:
    for entry in s.exec(select(Output)):
      print(f'Output: {entry.key} -> {entry.value}')
      s.delete(entry)
    s.commit()
  await ctx.wait('my-output')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeteer-1.0.5.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

pipeteer-1.0.5-py3-none-any.whl (15.7 kB view details)

Uploaded Python 3

File details

Details for the file pipeteer-1.0.5.tar.gz.

File metadata

  • Download URL: pipeteer-1.0.5.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.10

File hashes

Hashes for pipeteer-1.0.5.tar.gz
Algorithm Hash digest
SHA256 cfb8b9ca5c38f2cb5fe7d1b610818be3c2549a8f912b27f0746ae85df9da8107
MD5 62f5efbb9c5d17118b10884f396fab9d
BLAKE2b-256 3b9a88385e822226769bb3aa4fd3b5339b77b83b26e30bf0ece9a568979a56d3

See more details on using hashes here.

File details

Details for the file pipeteer-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: pipeteer-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 15.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.10

File hashes

Hashes for pipeteer-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 43a0a0621d006bc33c99f59a3e9f21c6d5b8ed434a162b1cae0bc7d841da924d
MD5 277a179ac2ac6b7cb2e9b17deb9602cf
BLAKE2b-256 c3d95ab052b3bf7f62ebff0b970669cabc1e84cb5b254484431f701b20cb2e5a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page