Skip to main content

Python bindings for libpetri

Project description

libpetri

PyPI Python License

Python bindings for the libpetri Coloured Time Petri Net engine.

Compose typed Petri nets in Python, run them on the Rust executor through PyO3. Full surface parity with the Java, TypeScript, and Rust implementations — same arc types, timing modes, composition primitives, formal-verification API, debug protocol, and DOT export.

Install

pip install libpetri

Wheels are published for CPython 3.11, 3.12, 3.13 on Linux / macOS / Windows (x86-64 and arm64).

Quick example

import libpetri as lp

request  = lp.Place("Request")
response = lp.Place("Response")

def process(ctx: lp.TransitionContext) -> None:
    req = ctx.input("Request")
    ctx.output("Response", f"Processed: {req}")

net = (
    lp.Net("Example")
    .transition(
        lp.Transition("Process")
        .input(lp.one(request))
        .output(lp.out(response))
        .timing(lp.deadline(5000))
        .action(process)
        .build()
    )
    .build()
)

result = lp.run_sync(net, initial={request: ["hello"]})
print(result.first(response))  # → "Processed: hello"

Async callbacks work the same way; lp.run_async drives async def actions on tokio, releasing the GIL between awaits:

import asyncio, libpetri as lp

async def main() -> None:
    incoming = lp.Place("incoming")
    approved = lp.Place("approved")

    async def approve(ctx: lp.TransitionContext) -> None:
        order = ctx.input("incoming")
        await asyncio.sleep(0)  # any awaitable works
        ctx.output("approved", {**order, "approved": True})

    net = (
        lp.Net("orders")
        .transition(
            lp.Transition("approve")
            .input(lp.one(incoming))
            .output(lp.out(approved))
            .action(approve)
            .build()
        )
        .build()
    )

    result = await lp.run_async(net, initial={incoming: [{"id": 1}]})
    print(result.first(approved))  # → {'id': 1, 'approved': True}

asyncio.run(main())

Writing async actions

Libpetri drives Python async def actions from a tokio worker thread. Awaits inside the coroutine resolve against the asyncio loop that was running when you called lp.run_async / lp.start_async, but the worker thread itself does not run an asyncio loop. That means synchronous asyncio APIs called inside an action raise RuntimeError: no running event loop:

async def action(ctx):
    # ❌ all of these raise inside a libpetri action:
    await asyncio.gather(tool_a(), tool_b())
    asyncio.create_task(work())
    loop = asyncio.get_running_loop()
    await asyncio.wait_for(slow(), timeout=1.0)

Use structural fan-out as the primary parallelism pattern — fire N transitions in parallel from one FanOut, let the marking be the join:

# Net structure: FanOut → (tool_a_transition || tool_b_transition) → Join
# Each tool transition is its own action; the executor schedules them
# in parallel. No asyncio.gather needed.

For the case where you genuinely need to await N coroutines inside one action (e.g. dispatching to LangChain BaseTool._arun calls), use lp.action_gather:

import libpetri as lp

async def dispatch_tools(ctx):
    calls = ctx.input("TOOL_CALLS")
    # action_gather schedules on the captured asyncio loop, where
    # gather() works. Real parallelism: three 300ms tools finish in
    # ~300ms, not 900ms.
    results = await lp.action_gather(*(_arun(c) for c in calls))
    ctx.output("TOOL_RESULTS", results)

For blocking sync work (file I/O, blocking SDK calls), use lp.action_to_thread:

async def write_file(ctx):
    data = ctx.input("data")
    await lp.action_to_thread(Path("out.bin").write_bytes, data)
    ctx.output("done", True)

Exceptions raised by awaited coroutines are thrown back into your action via coro.throw, so try/except inside the action works the same as inside a regular asyncio task.

Streaming chunks with ctx.flush()

By default, all ctx.output(...) calls within one action firing are buffered and published together when the action returns. For long-running async actions that need to stream tokens (LLM chunks, byte streams), call ctx.flush() to publish the buffered outputs now:

async def stream_chunks(ctx):
    async for chunk in llm.astream(request):
        ctx.output("TOKEN_STREAM", chunk)
        ctx.flush()
        await asyncio.sleep(0)  # yield so downstream transitions can run

Each ctx.flush() is its own published event boundary — already-flushed tokens stay in the marking even if the action later raises. ctx.flush() raises RuntimeError from a sync action (run_sync); use async execution. The await asyncio.sleep(0) after each flush is the recommended cooperative yield so the executor's main loop can process the flush and let downstream transitions run while you're still streaming.

What you get

  • Full runtime — sync + async execution, environment-place injection, all five arc kinds (input / output / inhibitor / read / reset), all five timing modes, priority + FIFO scheduling.
  • CompositionSubnetDef with typed ports + channels, compose(...) via structural rewrite, port bindings, instance prefixes.
  • ν-nets (correlated identity) — mint a fresh opaque name with ctx.fresh_name() on a fork and join sibling tokens by name equality via lp.match_spec([...]); a bounded Budget place keeps the correlated fragment decidable.
  • Formal verification — SMT/IC3 properties (deadlock-free, mutual exclusion, place bound, unreachable) through Z3 when the wheel ships with the z3 system library available.
  • Debug protocol — same JSON wire format as the Java / TypeScript implementations; pair with the libpetri debug-ui for live inspection.
  • DOT / Graphviz exportlp.dot_export(net).
  • Typed and IDE-friendly — ships with .pyi stubs and py.typed; IDE autocomplete and mypy --strict work out of the box.

A note on token typing

The Java, TypeScript, and Rust implementations enforce Place[T] at compile time. The Python binding stores tokens as Py<PyAny> across the FFI boundary — net structure (arcs, transitions, composition) is still validated, but token runtime types are not. A place named "order" will accept dicts, integers, or strings interchangeably. This is intentional: Python has no static generics across the FFI. Validate at your boundary (Pydantic, dataclasses, isinstance) and only put validated values into markings.

Links

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

libpetri-2.13.0.tar.gz (1.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

libpetri-2.13.0-cp312-cp312-manylinux_2_39_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.39+ x86-64

File details

Details for the file libpetri-2.13.0.tar.gz.

File metadata

  • Download URL: libpetri-2.13.0.tar.gz
  • Upload date:
  • Size: 1.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for libpetri-2.13.0.tar.gz
Algorithm Hash digest
SHA256 2eb43849c2fef36ea9565d813c22837a62c7fc4bf7f7ac90b701593453c308ff
MD5 37ffe69f0c7357ffade4bc88000855ec
BLAKE2b-256 294ceea859611cc6c000b9547c0b30255582cb1fa663bb0bd16ed72b2f0c11b8

See more details on using hashes here.

File details

Details for the file libpetri-2.13.0-cp312-cp312-manylinux_2_39_x86_64.whl.

File metadata

File hashes

Hashes for libpetri-2.13.0-cp312-cp312-manylinux_2_39_x86_64.whl
Algorithm Hash digest
SHA256 8952684d3bbf5fa4f9fad67817765f2117a2db7a02ffe2e351a83c970d60d7c6
MD5 d5f3aba3a8856e494c0c6cd3b3cfbe35
BLAKE2b-256 1b8d0bb5ef325c93aacfab6f9a3300ab7e91929de8d06a669302c22cf9340e5c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page