Crash-proof Python workflows with zero infrastructure. Temporal power as a pip install.
Project description
Gravtory
Crash-proof Python workflows with zero infrastructure.
Durable execution, distributed workers, sagas, scheduling, and observability, backed by the database you already run.
Quick Start · Features · Patterns · Installation · Examples
Why Gravtory?
Every production application eventually needs workflows that cannot fail halfway. A payment that must complete. An order that must ship. A data pipeline that must finish. When a process crashes between steps, you need guarantees.
The current options each carry significant trade-offs:
- Celery accepts that tasks can be lost on crash.
- Temporal requires a multi-service deployment and a steep learning curve.
- Prefect Cloud and Airflow demand hosted infrastructure or managed clusters.
- Hand-rolled retry loops provide no real durability.
Gravtory offers an alternative: Temporal-grade reliability delivered as a Python library, using the database you already operate.
pip install gravtory[postgres]
That is the entire infrastructure requirement.
Quick Start
1. Define a workflow
from gravtory import Gravtory, workflow, step
grav = Gravtory("postgresql://localhost/mydb")
@grav.workflow(id="order-{order_id}")
class OrderWorkflow:
@step(1)
async def charge_card(self, order_id: str) -> dict:
return await stripe.charge(order_id)
@step(2, depends_on=1)
async def reserve_inventory(self, order_id: str) -> dict:
return await inventory.reserve(order_id)
@step(3, depends_on=2)
async def send_notification(self, order_id: str) -> None:
await email.send(order_id)
2. Run it
await grav.start()
result = await grav.run(OrderWorkflow, order_id="ord_abc123")
3. It survives anything
Normal run: step 1 [OK] -> step 2 [OK] -> step 3 [OK] completes
Crash: step 1 [OK] -> step 2 [CRASH] process dies
Auto-resume: step 1 [SKIP] -> step 2 [OK] -> step 3 [OK] resumes exactly
Step 1 (charge card) is never re-executed. Its output was atomically checkpointed to the database. On restart, Gravtory loads that checkpoint and continues from the precise point of failure.
Features
Core
|
Patterns
|
Distribution
|
Operations
|
Developer Experience
|
AI/ML Native
|
Enterprise
|
Security
|
Comparison
| Celery | Temporal | Prefect | DBOS | Gravtory | |
|---|---|---|---|---|---|
| Infrastructure | Redis / RabbitMQ | Server + DB + Workers | Server | None | None |
| Setup time | ~30 min | ~2 days | ~2 hours | ~10 min | ~3 min |
| Library vs Service | Library + Broker | Service | Service | Library | Library |
| Crash-safe | No | Yes | Partial | Yes | Yes |
| Distributed workers | Yes | Yes | Yes | No | Yes |
| Saga compensation | No | Yes | No | No | Yes |
| Signals | No | Yes | No | No | Yes |
| Scheduling | Celery Beat | Built-in | Yes | Yes | Yes |
| Dashboard | Flower | Yes | Yes | No | Yes |
| Type-safe | No | No | No | No | Yes |
| Testing framework | No | Yes | No | No | Yes |
| AI/LLM native | No | No | No | No | Yes |
| Backends | Redis / RabbitMQ | PG / Cassandra | PG | PG only | 5 databases |
| License | BSD | MIT | Apache | MIT | AGPL |
Patterns
Saga with Automatic Compensation
@grav.workflow(id="transfer-{id}")
@saga
class TransferWorkflow:
@step(1, compensate="refund")
async def debit(self, amount: Decimal) -> dict:
return await bank.debit(self.source, amount)
@step(2, depends_on=1, compensate="reverse")
async def credit(self, amount: Decimal) -> dict:
return await bank.credit(self.dest, amount)
async def refund(self, output: dict):
await bank.credit(self.source, output["amount"])
async def reverse(self, output: dict):
await bank.reverse(output["transaction_id"])
# If credit fails, refund runs automatically -- crash-safe.
Parallel Processing
@grav.workflow(id="batch-{id}")
class BatchWorkflow:
@step(1)
async def get_items(self, id: str) -> list[str]:
return await db.get_item_ids(id)
@step(2, depends_on=1)
@parallel(max_concurrency=20)
async def process(self, item_id: str) -> dict:
return await compute(item_id)
# Each item is individually checkpointed.
# On crash: only unfinished items re-execute.
@step(3, depends_on=2)
async def summarize(self, results: list[dict]) -> dict:
return {"processed": len(results)}
Human-in-the-Loop
@grav.workflow(id="expense-{id}")
class ExpenseWorkflow:
@step(1)
async def submit(self, id: str, amount: float) -> dict:
await slack.send(f"Approve expense #{id} (${amount})?")
return {"id": id, "amount": amount}
@step(2, depends_on=1)
@wait_for_signal("approval", timeout=timedelta(days=7))
async def await_approval(self, signal: dict) -> bool:
return signal["approved"]
@step(3, depends_on=2, condition=lambda ctx: ctx.output(2))
async def reimburse(self, id: str) -> None:
await accounting.pay(id)
# From your API or Slack bot:
await grav.signal("expense-42", "approval", {"approved": True})
Scheduled Workflows
@grav.workflow(id="daily-report")
@grav.schedule(cron="0 9 * * *", tz="US/Eastern")
class DailyReport:
@step(1)
async def generate(self) -> dict:
return await analytics.report()
@step(2, depends_on=1)
async def send(self, report: dict) -> None:
await email.send_report(report)
Retry with Backoff
@step(1, retries=5, backoff="exponential", backoff_base=2.0, retry_on=[httpx.TimeoutError])
async def call_external_api(self, url: str) -> dict:
return await httpx.get(url).json()
# Retries at: 2s, 4s, 8s, 16s, 32s (with jitter)
Distribution
No message broker required. Workers coordinate through the database.
# Scale from 1 to N workers on a single machine
grav = Gravtory("postgresql://localhost/mydb", workers=8)
# Scale across machines -- same code, different hosts
# Machine A:
grav = Gravtory("postgresql://shared-db/workflows", workers=8, node_id="a")
# Machine B:
grav = Gravtory("postgresql://shared-db/workflows", workers=8, node_id="b")
Observability
grav = Gravtory(
"postgresql://localhost/mydb",
dashboard=True, # Web UI at :7777
otel_endpoint="jaeger:4317", # OpenTelemetry traces
metrics_port=9090, # Prometheus metrics
)
# Introspection API
state = await grav.inspect("order-ord_123")
print(state.status) # "completed"
print(state.steps[1].output) # {"charge_id": "ch_xyz"}
print(state.steps[1].duration_ms) # 142
# Failure hooks
@grav.on_failure
async def alert(ctx):
await slack.send(f"Workflow {ctx.workflow_run_id} failed: {ctx.error}")
Testing
No database required. The built-in test runner operates entirely in memory.
from gravtory.testing import WorkflowTestRunner
async def test_order_workflow():
runner = WorkflowTestRunner() # In-memory
runner.mock(OrderWorkflow.charge_card, return_value={"charge_id": "test"})
runner.mock(OrderWorkflow.reserve_inventory, return_value={"ok": True})
runner.mock(OrderWorkflow.send_notification, return_value=None)
result = await runner.run(OrderWorkflow, order_id="test_123")
assert result.status == "completed"
# Simulate crash and verify resume
runner.simulate_crash_after(step=1)
result = await runner.run(OrderWorkflow, order_id="test_456")
result = await runner.resume("order-test_456")
assert result.steps[1].was_replayed # Not re-executed
CLI
gravtory list --status=failed
gravtory inspect order-ord_123
gravtory retry order-ord_123
gravtory signal expense-42 approval '{"approved": true}'
gravtory dlq list
gravtory dashboard
gravtory workers start --count=4
Installation
# Core + PostgreSQL (recommended for production)
pip install gravtory[postgres]
# Core + SQLite (local development)
pip install gravtory[sqlite]
# Core + MySQL
pip install gravtory[mysql]
# Core + MongoDB
pip install gravtory[mongodb]
# All backends and optional extras
pip install gravtory[all]
Requirements: Python 3.10+
Gravtory ships with a py.typed marker
(PEP 561). Full type annotations work out
of the box with mypy, pyright, and other type checkers.
Backends
| Backend | Best For | Distribution | Signals |
|---|---|---|---|
| PostgreSQL | Production | SKIP LOCKED |
LISTEN/NOTIFY |
| SQLite | Development, testing | File locks | Polling |
| MySQL 8+ | Enterprise | SKIP LOCKED |
Polling |
| MongoDB | Document-heavy workloads | findOneAndUpdate |
Change Streams |
| Redis | High-throughput | Lua scripts | Pub/Sub |
Framework Integration
FastAPI
from contextlib import asynccontextmanager
from fastapi import FastAPI
from gravtory import Gravtory
grav = Gravtory("postgresql://localhost/mydb")
@asynccontextmanager
async def lifespan(app):
await grav.start()
yield
await grav.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/orders/{order_id}")
async def create_order(order_id: str):
run_id = await grav.run(OrderWorkflow, order_id=order_id, background=True)
return {"run_id": run_id}
Django
Django integration is planned for a future release. Progress is tracked in GitHub Issues.
Coming from Celery?
# Before (Celery)
@app.task(bind=True, max_retries=3)
def charge_card(self, order_id):
try:
result = stripe.charge(order_id)
except Exception as exc:
raise self.retry(exc=exc)
return result
# If this crashes: task may be lost. Result may be lost.
# If charge succeeds but ack fails: card charged twice.
# After (Gravtory)
@step(1, retries=3, backoff="exponential")
async def charge_card(self, order_id: str) -> dict:
return await stripe.charge(order_id)
# If this crashes: checkpoint guarantees at-least-once with idempotent replay.
# On resume: if charge completed, it's loaded from DB. Never re-executed.
License
Gravtory is released under the GNU Affero General Public License v3.0 (AGPL-3.0-or-later).
For commercial licensing inquiries, contact vatryok@protonmail.com.
Contributing
Contributions are welcome. Please read CONTRIBUTING.md for development setup, coding standards, and submission guidelines.
Support
If you find Gravtory useful, consider supporting its continued development on Ko-Fi.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gravtory-1.0.0.tar.gz.
File metadata
- Download URL: gravtory-1.0.0.tar.gz
- Upload date:
- Size: 406.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9914e4d7ad884852a42381d1b046c2202236283cd0c66b9d24b937bc15853aea
|
|
| MD5 |
37b2dffc70275e5cb17201df199bcf22
|
|
| BLAKE2b-256 |
fc2f71c1282b5a0d97ed056d127a80bc74cc160359567e3a049f515136004736
|
Provenance
The following attestation bundles were made for gravtory-1.0.0.tar.gz:
Publisher:
publish.yml on vatryok/Gravtory
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
gravtory-1.0.0.tar.gz -
Subject digest:
9914e4d7ad884852a42381d1b046c2202236283cd0c66b9d24b937bc15853aea - Sigstore transparency entry: 1322256065
- Sigstore integration time:
-
Permalink:
vatryok/Gravtory@f549eb2f3fdf3bc1e8f9cf7d942d6addfbb9f312 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/vatryok
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f549eb2f3fdf3bc1e8f9cf7d942d6addfbb9f312 -
Trigger Event:
release
-
Statement type:
File details
Details for the file gravtory-1.0.0-py3-none-any.whl.
File metadata
- Download URL: gravtory-1.0.0-py3-none-any.whl
- Upload date:
- Size: 239.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3c49a51c145d6ead712bc1855c47d076ae956ed2e1d11b9649a8d2f682c7828
|
|
| MD5 |
0b67b9645d2408dd6906eb03b3ae8288
|
|
| BLAKE2b-256 |
91cc6c3fef2473fa2a156dbf014e18b4c00066b17404d69fa5905744e3a87d26
|
Provenance
The following attestation bundles were made for gravtory-1.0.0-py3-none-any.whl:
Publisher:
publish.yml on vatryok/Gravtory
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
gravtory-1.0.0-py3-none-any.whl -
Subject digest:
d3c49a51c145d6ead712bc1855c47d076ae956ed2e1d11b9649a8d2f682c7828 - Sigstore transparency entry: 1322256181
- Sigstore integration time:
-
Permalink:
vatryok/Gravtory@f549eb2f3fdf3bc1e8f9cf7d942d6addfbb9f312 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/vatryok
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f549eb2f3fdf3bc1e8f9cf7d942d6addfbb9f312 -
Trigger Event:
release
-
Statement type: