Skip to main content

A minimal async pipeline library

Project description

flowlet

flowlet is a small async pipeline library for transforming streams with bounded stage-level parallelism.

Requirements

flowlet requires Python 3.13 or newer.

Installation

uv add flowlet

or:

pip install flowlet

Pipeline API

The pipeline API is the default interface. It is method chaining over a lazy async stream; nothing runs until the pipeline is consumed with async for or a terminal method such as .collect(), or .drain().

from flowlet import pipe

results = await (
    pipe(urls)
    .map(fetch, concurrency=20)
    .flat_map(extract_links)
    .filter(is_valid)
    .map(normalize)
    .collect()
)
  • pipe(source) accepts an Iterable[T] or AsyncIterable[T].
  • .map(fn) transforms one input into one output.
  • .flat_map(fn) transforms one input into zero or more outputs.
  • .filter(pred) keeps or drops each input.
  • .batch(size) collects items into lists of up to size.
  • .through(flow) appends a reusable Flow fragment.
  • .collect() consumes the pipeline into a list.
  • .drain() consumes the pipeline when outputs are intentionally ignored.

Functions may be sync or async.

Use in_thread(...) for blocking synchronous work that should not run on the event loop:

from flowlet import in_thread, pipe

items = await pipe(keys).map(in_thread(load_s3), concurrency=64).collect()

in_thread(fn, limit=16) adds a wrapper-level throttle. concurrency still controls how many items the pipeline stage may have in flight. With .map(in_thread(fn, limit=16), concurrency=64), up to 64 items may be active in the stage while at most 16 wrapped calls are submitted to the executor at once. That limit is enforced per wrapped callable per event loop, so the same wrapper can be reused safely across separate asyncio.run(...) calls.

Cancellation stops waiting for a threaded call's result, but it does not interrupt synchronous code that is already running in a worker thread.

If you pass both executor=pool and limit=16, both bounds apply: limit throttles this wrapper, while pool.max_workers remains the actual thread-pool-wide cap.

If multiple stages should share one thread pool, pass the same executor to each wrapper:

from concurrent.futures import ThreadPoolExecutor
from flowlet import in_thread, pipe

with ThreadPoolExecutor(max_workers=16) as pool:
    items = await (
        pipe(keys)
        .map(in_thread(load_s3, executor=pool, limit=8), concurrency=32)
        .map(in_thread(parse_blob, executor=pool, limit=4), concurrency=32)
        .collect()
    )

Pass a ThreadPoolExecutor when using executor=.... in_thread(...) is for blocking thread-based work, not process pools.

Use in_process(...) for CPU-bound synchronous work that should run in a process pool:

from concurrent.futures import ProcessPoolExecutor
from flowlet import in_process, pipe

with ProcessPoolExecutor(max_workers=8) as pool:
    items = await pipe(keys).map(in_process(crunch, executor=pool), concurrency=8).collect()

Unlike in_thread(...), in_process(...) has no default executor. in_thread(...) can use asyncio's default thread pool, but process pools must be explicitly started and shut down, so in_process(fn, executor=pool, limit=...) always requires a ProcessPoolExecutor. For the usual case, set concurrency to the process pool size and omit limit; use limit only when this wrapper should submit fewer calls than the stage or executor would otherwise allow.

The wrapped function, its arguments, and its return values must be pickleable for cross-platform process-pool code. The default start method varies by platform and Python version: fork on Linux before Python 3.14 (where local functions and lambdas work), forkserver on Linux from Python 3.14 onwards, and spawn on macOS and Windows — both forkserver and spawn require importable module-level functions. Use module-level functions for portable in_process(...) pipelines.

in_process(...) does not propagate contextvars into workers. in_thread(...) can copy context into another thread in the same process, but contextvars.Context is not pickleable and cannot be sent to worker processes. Cancellation stops waiting for the process-pool result, but a task that is already running in a ProcessPoolExecutor cannot be cancelled individually; use concurrency, limit, or the executor's worker count as the practical throttles.

Reusable Flows

Flow is the reusable sourceless pipeline fragment type. Use it when you want to name and reuse a transform.

from flowlet import Flow, pipe

extract: Flow[Page, str] = (
    Flow[Page]()
    .flat_map(find_links)
    .filter(is_internal)
    .map(normalize_url)
)

links = await pipe(pages).through(extract).collect()

Flow[T]() starts a fragment whose input and current output type are both T. Starting from bare Flow() is allowed, but type checkers cannot infer the fragment input type from no source. Prefer Flow[T]() for typed reusable fragments.

The | syntax is optional sugar for .through(...):

links = await (pipe(pages) | extract).collect()

Async Iteration

Pipelines are async iterables. Use async for when you do not want to collect every item, especially for unbounded streams.

async for link in pipe(events).map(parse).filter(is_interesting):
    await handle(link)

Stop the loop normally when you have enough items:

items = []

async for item in pipe(events).map(parse):
    items.append(item)
    if len(items) == 100:
        break

Operator Syntax

The op namespace constructs single-step Flows for compact reusable composition. It is secondary to method chaining and does not change execution behavior.

from flowlet import op, pipe

items = await (
    pipe(pages)
    | op.flat_map(find_links)
    | op.filter(is_internal)
    | op.map(normalize_url)
).collect()

This is equivalent to:

items = await (
    pipe(pages)
    .flat_map(find_links)
    .filter(is_internal)
    .map(normalize_url)
    .collect()
)

pipe(source) | flow is equivalent to pipe(source).through(flow).

Use op when you want compact sourceless flow composition:

from flowlet import op, pipe

extract = (
    op.flat_map(find_links)
    | op.filter(is_internal)
    | op.map(normalize_url)
)

links = await (pipe(pages) | extract).collect()

Functional API

flowlet.functional is a lower-level API. It exposes the curried stream operators that power Pipeline, Flow, and op.

import flowlet.functional as F

pipeline = F.chain(
    F.map(fetch, concurrency=20),
    F.flat_map(extract_links),
    F.filter(is_valid),
    F.map(normalize),
)

items = await F.collect(pipeline(urls))

Each functional operator returns a reusable stream transformer:

fetch_pages = F.map(fetch, concurrency=20)
pages = fetch_pages(urls)

Most users should prefer the pipeline API. Use flowlet.functional when you specifically want to build or pass around stream-transformer functions.

Error Behavior

The default error policy is fail-fast. Exceptions from sources or stages propagate to the caller.

In a concurrent stage, if one in-flight item raises, the pipeline raises and pending sibling tasks in that stage are cancelled. There is no skip-or-recover API yet; use explicit try/except inside your stage function if you want to convert failures into values or filter them with .flat_map(...).

Concurrency

Concurrency is configured per stage. A pipeline with two concurrency=20 stages can have work in flight in both stages at the same time as downstream consumption allows.

Concurrent stages emit values in completion order. If you need input order, use concurrency=1.

items = await pipe(urls).map(fetch, concurrency=20).collect()

With concurrency > 1, a faster later item can be yielded before a slower earlier item. Source items are pulled lazily, up to the stage concurrency and downstream demand.

This applies to every concurrent stage, including filters:

items = await pipe([1, 2, 3]).filter(async_pred, concurrency=3).collect()
# May return [3, 1, 2], not necessarily [1, 2, 3].

Cardinality

Use the method that matches the stage cardinality.

pipe(items).map(fn)       # one input -> one output
pipe(items).filter(pred)  # one input -> zero or one output
pipe(items).flat_map(fn)  # one input -> zero or more outputs
pipe(items).batch(size)   # many inputs -> one output (list)

flat_map(fn) accepts a sync or async function returning an Iterable[U] or AsyncIterable[U]. It streams each returned iterable; an async expansion can yield values without first finishing the whole expansion. It expects each input to expand into a finite, reasonably small iterable or async iterable. Outputs are buffered internally; very large or infinite expansions may consume unbounded memory. It does not accept a single scalar output; use .map(fn) for one-to-one transforms.

With flat_map(in_thread(fn)), prefer returning a materialized collection such as list or tuple. If fn returns a lazy generator, the generator object is created in the worker thread but iterated later on the event-loop thread.

batch(size) collects items into lists of up to size. The last emitted list may be shorter when the source is exhausted with a partial group. This is useful for batching API calls, database inserts, or any operation that benefits from processing items in chunks:

await pipe(records).batch(100).map(bulk_insert).drain()

None is treated as normal data. Filtering is explicit.

Source Contract

pipe(source) and F.collect(source) accept iterables and async iterables. They consume the source lazily. One-shot iterators and generators remain one-shot if reused across multiple pipeline runs.

.collect() on an infinite source never completes because it waits to build a complete list. Use async iteration or .drain() for unbounded streams.

Use .drain() when side effects are inside the pipeline stages and no per-item action is needed at the terminal - the pipeline is just drained to completion.

await pipe(events).map(write_to_log, concurrency=20).drain()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowlet-0.2.0.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowlet-0.2.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file flowlet-0.2.0.tar.gz.

File metadata

  • Download URL: flowlet-0.2.0.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for flowlet-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e94ec6ac260d795a287383590962487f69c3d3d6d8471dc1e3bfb13c109548e2
MD5 388b4ea9a5a1c5d5b25bf988cc74b475
BLAKE2b-256 e2f15521eb65e084d94f68bb76807c6119a22253d57b37370f6c627a6e18da1e

See more details on using hashes here.

File details

Details for the file flowlet-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: flowlet-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for flowlet-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 580234241123c89d5111e3f3040b227bcd3990b2310a103d41239215b42089b9
MD5 c75188d7d82d41128d70960f55de7fe6
BLAKE2b-256 f88aa82c83cdcc261b68518b79a648ded4f84c5d005b5cb06e809f823c92c926

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page