Python bindings for libpetri
Project description
libpetri
Python bindings for the libpetri Coloured Time Petri Net engine.
Compose typed Petri nets in Python, run them on the Rust executor through PyO3. Full surface parity with the Java, TypeScript, and Rust implementations — same arc types, timing modes, composition primitives, formal-verification API, debug protocol, and DOT export.
Install
pip install libpetri
Wheels are published for CPython 3.11, 3.12, 3.13 on Linux / macOS / Windows (x86-64 and arm64).
Quick example
import libpetri as lp
request = lp.Place("Request")
response = lp.Place("Response")
def process(ctx: lp.TransitionContext) -> None:
req = ctx.input("Request")
ctx.output("Response", f"Processed: {req}")
net = (
lp.Net("Example")
.transition(
lp.Transition("Process")
.input(lp.one(request))
.output(lp.out(response))
.timing(lp.deadline(5000))
.action(process)
.build()
)
.build()
)
result = lp.run_sync(net, initial={request: ["hello"]})
print(result.first(response)) # → "Processed: hello"
Async callbacks work the same way; lp.run_async drives async def actions
on tokio, releasing the GIL between awaits:
import asyncio, libpetri as lp
async def main() -> None:
incoming = lp.Place("incoming")
approved = lp.Place("approved")
async def approve(ctx: lp.TransitionContext) -> None:
order = ctx.input("incoming")
await asyncio.sleep(0) # any awaitable works
ctx.output("approved", {**order, "approved": True})
net = (
lp.Net("orders")
.transition(
lp.Transition("approve")
.input(lp.one(incoming))
.output(lp.out(approved))
.action(approve)
.build()
)
.build()
)
result = await lp.run_async(net, initial={incoming: [{"id": 1}]})
print(result.first(approved)) # → {'id': 1, 'approved': True}
asyncio.run(main())
Writing async actions
Libpetri drives Python async def actions from a tokio worker thread.
Awaits inside the coroutine resolve against the asyncio loop that was
running when you called lp.run_async / lp.start_async, but the worker
thread itself does not run an asyncio loop. That means synchronous
asyncio APIs called inside an action raise RuntimeError: no running event loop:
async def action(ctx):
# ❌ all of these raise inside a libpetri action:
await asyncio.gather(tool_a(), tool_b())
asyncio.create_task(work())
loop = asyncio.get_running_loop()
await asyncio.wait_for(slow(), timeout=1.0)
Use structural fan-out as the primary parallelism pattern — fire N transitions in parallel from one FanOut, let the marking be the join:
# Net structure: FanOut → (tool_a_transition || tool_b_transition) → Join
# Each tool transition is its own action; the executor schedules them
# in parallel. No asyncio.gather needed.
For the case where you genuinely need to await N coroutines inside one
action (e.g. dispatching to LangChain BaseTool._arun calls), use
lp.action_gather:
import libpetri as lp
async def dispatch_tools(ctx):
calls = ctx.input("TOOL_CALLS")
# action_gather schedules on the captured asyncio loop, where
# gather() works. Real parallelism: three 300ms tools finish in
# ~300ms, not 900ms.
results = await lp.action_gather(*(_arun(c) for c in calls))
ctx.output("TOOL_RESULTS", results)
For blocking sync work (file I/O, blocking SDK calls), use
lp.action_to_thread:
async def write_file(ctx):
data = ctx.input("data")
await lp.action_to_thread(Path("out.bin").write_bytes, data)
ctx.output("done", True)
Exceptions raised by awaited coroutines are thrown back into your action
via coro.throw, so try/except inside the action works the same as
inside a regular asyncio task.
Streaming chunks with ctx.flush()
By default, all ctx.output(...) calls within one action firing are
buffered and published together when the action returns. For long-running
async actions that need to stream tokens (LLM chunks, byte streams), call
ctx.flush() to publish the buffered outputs now:
async def stream_chunks(ctx):
async for chunk in llm.astream(request):
ctx.output("TOKEN_STREAM", chunk)
ctx.flush()
await asyncio.sleep(0) # yield so downstream transitions can run
Each ctx.flush() is its own published event boundary — already-flushed
tokens stay in the marking even if the action later raises. ctx.flush()
raises RuntimeError from a sync action (run_sync); use async
execution. The await asyncio.sleep(0) after each flush is the
recommended cooperative yield so the executor's main loop can process the
flush and let downstream transitions run while you're still streaming.
What you get
- Full runtime — sync + async execution, environment-place injection, all five arc kinds (input / output / inhibitor / read / reset), all five timing modes, priority + FIFO scheduling.
- Composition —
SubnetDefwith typed ports + channels,compose(...)via structural rewrite, port bindings, instance prefixes. - ν-nets (correlated identity) — mint a fresh opaque name with
ctx.fresh_name()on a fork and join sibling tokens by name equality vialp.match_spec([...]); a boundedBudgetplace keeps the correlated fragment decidable. - Formal verification — SMT/IC3 properties (deadlock-free, mutual
exclusion, place bound, unreachable) through Z3 when the wheel ships with
the
z3system library available. - Debug protocol — same JSON wire format as the Java / TypeScript implementations; pair with the libpetri debug-ui for live inspection.
- DOT / Graphviz export —
lp.dot_export(net). - Typed and IDE-friendly — ships with
.pyistubs andpy.typed; IDE autocomplete andmypy --strictwork out of the box.
A note on token typing
The Java, TypeScript, and Rust implementations enforce Place[T] at compile
time. The Python binding stores tokens as Py<PyAny> across the FFI boundary
— net structure (arcs, transitions, composition) is still validated, but
token runtime types are not. A place named "order" will accept dicts,
integers, or strings interchangeably. This is intentional: Python has no
static generics across the FFI. Validate at your boundary (Pydantic,
dataclasses, isinstance) and only put validated values into markings.
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file libpetri-2.12.0.tar.gz.
File metadata
- Download URL: libpetri-2.12.0.tar.gz
- Upload date:
- Size: 1.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5bc7c1d683ef322117157691f5302b21b0e5afc0f777cd6e4e40c1c9cf01c757
|
|
| MD5 |
a7da6448d76a6f0ab21405e8b2a60089
|
|
| BLAKE2b-256 |
83ef83c0d8f31490824364eda97da3679fc91998f8f933449f234dd0e3b267ce
|
File details
Details for the file libpetri-2.12.0-cp312-cp312-manylinux_2_39_x86_64.whl.
File metadata
- Download URL: libpetri-2.12.0-cp312-cp312-manylinux_2_39_x86_64.whl
- Upload date:
- Size: 1.5 MB
- Tags: CPython 3.12, manylinux: glibc 2.39+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
47e1ed60cda79a767cec04427af940ad6449d503ece3e50bdd9f294653228d3c
|
|
| MD5 |
5069e63fb8100d38b25464aee034a9d0
|
|
| BLAKE2b-256 |
9c738b4168a18faadd42bd0481bf7944a58940c4d95b2f8d7401c1eeedcab882
|