Skip to main content

Streamable, durable pipeline runtime for Python backend services.

Project description

fluxio

Streamable, durable pipeline runtime for Python backend services.

📖 Documentation: English · Русский

  • Composable stages with auto-detected sync/async/stream types
  • Immutable HAMT context with O(1) fork / conflict-aware merge
  • Auto-parallelism from declared reads/writes
  • Durable execution with checkpoints and explicit resume
  • Conditional routing via Send and dict blocks
  • Middleware chain: retry, cache, circuit breaker, rate limit
  • Observability via callbacks (Langfuse integration included)

Install

pip install fluxio              # core
pip install fluxio[redis]       # RedisStore
pip install fluxio[langfuse]    # LangfuseCallback

Requires Python 3.12+.

Minimal example

import asyncio
from fluxio import Pipeline, stage

@stage
async def fetch_user(ctx):
    return ctx.set("user", {"id": ctx["user_id"], "name": "Alice"})

@stage
async def greet(ctx):
    return ctx.set("greeting", f"Hello {ctx['user']['name']}")

async def main():
    async with Pipeline([fetch_user, greet]) as pipe:
        result = await pipe.invoke({"user_id": 1})
        print(result["greeting"])

asyncio.run(main())

Production example

from fluxio import (
    Pipeline, Parallel, stage, Send,
    RetryMiddleware, CacheMiddleware, CircuitBreakerMiddleware,
    InMemoryStore, LoggingCallback,
)

@stage(reads=frozenset({"user_id"}), writes=frozenset({"user"}), timeout=5.0)
async def fetch_user(ctx): ...

@stage(reads=frozenset({"user"}), writes=frozenset({"profile"}))
async def enrich_profile(ctx): ...

@stage(reads=frozenset({"user_id"}), writes=frozenset({"orders"}))
async def fetch_orders(ctx): ...

@stage
async def route(ctx):
    return Send("premium" if ctx["user"]["tier"] == "pro" else "standard")

@stage
async def stream_response(ctx):
    async for chunk in llm.stream(ctx["prompt"]):
        yield chunk

# All real code goes inside an async function:
async def serve(prompt: str, user_id: int):
    async with Pipeline(
        [
            fetch_user,
            Parallel([enrich_profile, fetch_orders]),  # or declare reads/writes and let auto-parallel kick in
            route,
            {
                "premium":  [stream_response],
                "standard": [stream_response],
            },
        ],
        middleware=[
            CircuitBreakerMiddleware(failure_threshold=5),
            RetryMiddleware(max_attempts=3, backoff="exponential"),
            CacheMiddleware(ttl=60),
        ],
        callbacks=[LoggingCallback()],
        checkpoint_store=InMemoryStore(),
        durable=True,
    ) as pipe:
        return await pipe.invoke(
            {"user_id": user_id, "prompt": prompt},
            run_id="req-abc-123",
        )

# After a crash, resume from the last checkpoint in a NEW process by
# constructing the same Pipeline and calling:
#     await pipe.invoke({}, run_id="req-abc-123", resume=True)

How the pipeline above executes

flowchart TD
    Input["invoke({user_id: 42})"] --> FetchUser["fetch_user<br/>reads: user_id<br/>writes: user<br/>timeout=5s"]
    FetchUser -->|checkpoint| Fork(("fork"))
    Fork --> Enrich["enrich_profile<br/>writes: profile"]
    Fork --> Orders["fetch_orders<br/>writes: orders"]
    Enrich --> Join(("merge"))
    Orders --> Join
    Join --> Route["route<br/>returns Send"]
    Route -->|Send premium| Premium["stream_response<br/>async for chunk → yield"]
    Route -->|Send standard| Standard["stream_response<br/>async for chunk → yield"]
    Premium --> End["final ctx"]
    Standard --> End

    subgraph Middleware["middleware chain (per stage, outermost → innermost)"]
        direction LR
        CB[CircuitBreaker] --> Retry[Retry 3x exp] --> Cache[Cache TTL=60s]
    end

    subgraph Observability["side channels"]
        direction LR
        Store[(InMemoryStore<br/>checkpoints)]
        Logger[LoggingCallback]
    end

    FetchUser -.-> Store
    FetchUser -.-> Logger
    Route -.-> Logger
    Premium -.-> Logger
    Standard -.-> Logger

    classDef stage fill:#e3f2fd,stroke:#1976d2,color:#0d47a1
    classDef stream fill:#fff3e0,stroke:#f57c00,color:#e65100
    classDef junction fill:#eeeeee,stroke:#424242,color:#212121
    classDef side fill:#f3e5f5,stroke:#7b1fa2,color:#4a148c
    class FetchUser,Enrich,Orders,Route stage
    class Premium,Standard stream
    class Fork,Join junction
    class Store,Logger,CB,Retry,Cache side
  • Solid arrows = data flow between stages (each step passes a new immutable Context).
  • Dotted arrows = side channels: checkpoints and callbacks, invisible to stage logic.
  • The fork / merge pair is an implicit Parallel block — branches run concurrently and their writes are merged back (with conflict detection).
  • Send("premium") from route drives the dict branch selection; only one route body runs per invocation.
  • STREAM stages (orange) bypass RetryMiddleware and CacheMiddleware automatically so chunks aren't duplicated or frozen in cache.

Streaming

async with Pipeline([fetch_user, stream_response]) as pipe:
    async for chunk in pipe.stream({"user_id": 42}):
        await websocket.send(chunk)

Testing

from fluxio import stage
from fluxio.testing.harness import StepHarness

async def test_fetch_user():
    harness = StepHarness(fetch_user)
    result = await harness.run({"user_id": 1})
    assert result["user"]["name"] == "Alice"
    harness.close()

Layout

fluxio/
  api/          # Pipeline, Parallel, @stage, primitives
  compiler/     # bytecode + static analysis
  context/      # immutable HAMT
  runtime/      # interpreter, scheduler, middleware, cache
  observability/# callbacks: Base, Logging, Langfuse
  store/        # CheckpointStore: InMemory, Redis
  testing/      # StepHarness, make_ctx

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluxio-0.1.0.tar.gz (122.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluxio-0.1.0-py3-none-any.whl (36.3 kB view details)

Uploaded Python 3

File details

Details for the file fluxio-0.1.0.tar.gz.

File metadata

  • Download URL: fluxio-0.1.0.tar.gz
  • Upload date:
  • Size: 122.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fluxio-0.1.0.tar.gz
Algorithm Hash digest
SHA256 01309594893ac0f7d007e8b0b365f7e4673032e04c7662730edd0f1df6ba1ed3
MD5 a26116b38fa7059a8cd918c53682505d
BLAKE2b-256 147ab05fb8bb53f433a4118e9e44053389ca6da378284085111c1491b7670043

See more details on using hashes here.

File details

Details for the file fluxio-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fluxio-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 36.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fluxio-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 947f43002034cbbac10ce5f841c5a22100d0ad2a051cb48b91130bd16d65cbdb
MD5 5eee8fb740c96e240f59ce0d8bb34fc5
BLAKE2b-256 8639f56ce25fec792d7abce2e619e14e1951db4d167911f9cfe1dc06f45f8c56

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page