Skip to main content

Write pipeline code once — runs sync or async automatically. Transparent sync/async bridge for Python. Pure Python, zero runtime dependencies on Python 3.11+.

Project description

quent

Write it once. Run it sync or async.


PyPI   Python   License   CI   Ruff   Downloads   Docs


A transparent sync/async bridge for Python.
Define a pipeline once — quent handles the rest.


  • One definition, two worlds — a single chain works for both sync and async callers. Zero code duplication.
  • Zero ceremony — no decorators, no base classes, no type wrappers. Just chain your functions.
  • Drop-in migration — unify existing sync and async implementations into one pipeline. Stop maintaining two versions.
  • Pure Python — zero runtime dependencies. Fully typed (PEP 561). Compatible with asyncio, trio, and curio — async event loop detection uses sys.modules lookups, adding zero overhead when those libraries are not loaded.
  • Focused — every feature exists because removing it would force separate sync and async code paths.

The Problem

Any codebase that supports both sync and async callers ends up maintaining two versions of the same logic:

# Without quent -- the same pipeline, written twice

def process_sync(data):
  validated = validate_sync(data)
  transformed = transform_sync(validated)
  return save_sync(transformed)

async def process_async(data):
  validated = await validate_async(data)
  transformed = await transform_async(validated)
  return await save_async(transformed)

Every function, every pipeline, every utility — duplicated. When a bug is fixed in one version, the other falls out of sync. When a new step is added, it must be added in both places.


The Solution

# With quent -- write it once

pipeline = Chain().then(validate).then(transform).then(save)

result = pipeline.run(data)          # sync if all steps are sync
result = await pipeline.run(data)    # async if any step is async

One definition. The chain starts executing synchronously. The moment any step returns an awaitable, execution seamlessly transitions to async and stays there. The caller decides whether to await.


Installation

pip install quent

Requires Python 3.10+. Supports 3.10 through 3.14, including free-threaded builds. Zero runtime dependencies on Python 3.11+ (typing_extensions on 3.10).


Quick Start

from quent import Chain

# Basic pipeline
result = Chain(5).then(lambda x: x * 2).then(lambda x: x + 1).run()
print(result)  # 11

# Side effects -- do() runs the function but passes the value through
result = Chain(42).then(lambda x: x * 2).do(print).then(str).run()  # prints: 84
print(result)  # '84'

# Works with any callable
result = Chain('  hello  ').then(str.strip).then(str.upper).run()
print(result)  # HELLO

The same chain works whether your functions are sync, async, or a mix:

pipeline = Chain().then(fetch_data).then(validate).then(normalize)

# Sync context
result = pipeline.run(id)

# Async context -- same chain, no changes
result = await pipeline.run(id)

Features

Build pipelines fluently. Every builder method returns self for chaining.

from quent import Chain

result = (
  Chain(fetch_user, user_id)       # fetch user by id
  .then(validate)                  # transform
  .do(log)                         # side-effect
  .foreach(normalize_field)        # per-element
  .gather(enrich, score)           # concurrent
  .then(merge)                     # combine
  .if_(has_premium).then(upgrade)  # conditional
  .except_(handle_error)           # error handling
  .finally_(cleanup)               # cleanup
  .run()                           # execute
)
Collection Operations — foreach, foreach_do
# foreach -- transform each element, collect results
Chain([1, 2, 3]).foreach(lambda x: x ** 2).run()  # [1, 4, 9]

# foreach_do -- side-effect per element, keep originals
Chain([1, 2, 3]).foreach_do(print).run()  # prints 1, 2, 3; returns [1, 2, 3]

# filter via list comprehension
Chain([1, 2, 3, 4, 5]).then(lambda xs: [x for x in xs if x % 2 == 0]).run()  # [2, 4]
Concurrent Execution — gather, concurrency parameter

Run multiple functions on the same value concurrently:

Chain('hello').gather(str.upper, len).run()  # ('HELLO', 5)

Limit concurrency on collection operations with the concurrency parameter. Uses ThreadPoolExecutor for sync callables and asyncio.Semaphore + TaskGroup for async:

# Process up to 10 items at a time
Chain(urls).foreach(fetch, concurrency=10).run()

# Limit concurrent gather branches
Chain(data).gather(analyze, compress, upload, concurrency=5).run()

Pass a custom executor for sync concurrent operations:

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as pool:
  Chain(urls).foreach(fetch, concurrency=4, executor=pool).run()
Conditionals — if_ / else_
Chain(5).if_(lambda x: x > 0).then(lambda x: x * 2).run()  # 10
Chain(-5).if_(lambda x: x > 0).then(str).else_(abs).run()   # 5

# When predicate is omitted, uses truthiness of the current value
Chain('hello').if_().then(str.upper).run()                     # 'HELLO'
Chain('').if_().then(str.upper).else_(lambda _: 'empty').run() # 'empty'

# Literal predicate -- truthiness used directly
Chain(value).if_(is_admin).then(grant_access).run()

# Side-effect conditional branch
Chain(user).if_(is_premium).do(log_premium_access).then(next_step).run()
Context Managers — with_ / with_do

Transparently handles both sync and async context managers:

Chain(open('data.txt')).with_(lambda f: f.read()).run()

# Side-effect variant (result discarded, original value passes through)
Chain(open('log.txt', 'w')).with_do(lambda f: f.write('done')).run()
Error Handling — except_ / finally_

One exception handler and one finally handler per chain:

from quent import Chain, ChainExcInfo

Chain(0).then(lambda x: 1 / x).except_(lambda ei: -1).run()  # -1

Chain(url)
  .then(fetch)
  .then(parse)
  .except_(handle_error, exceptions=ConnectionError)
  .finally_(cleanup)
  .run()

except_ catches Exception by default. The handler receives a ChainExcInfo(exc, root_value) as its current value. Use reraise=True to re-raise after handling (handler runs for side-effects only). finally_ always runs and receives the chain's root value.

Control Flow — return_ / break_
# Early return -- skips all remaining steps
Chain(5) \
  .then(lambda x: Chain.return_(x * 10) if x > 0 else x) \
  .then(str) \
  .run()  # 50 (str step is skipped)

# Break from iteration -- break value is appended to partial results
Chain([1, 2, 3, 4, 5]).foreach(lambda x: Chain.break_(x) if x == 3 else x * 2).run()
# [2, 4, 3]
Composition — clone, decorator

clone — fork-and-extend without modifying the original:

base = Chain().then(validate).then(normalize)
for_api = base.clone().then(to_json)    # base is untouched
for_db  = base.clone().then(to_record)  # independent copy

decorator — wrap a chain as a function decorator:

@Chain().then(lambda x: x.strip()).then(str.upper).decorator()
def get_name():
  return '  alice  '

get_name()  # 'ALICE'
Iteration — iterate / iterate_do

Dual sync/async generators over chain output:

for item in Chain(range(5)).iterate(lambda x: x ** 2):
  print(item)  # 0, 1, 4, 9, 16

async for item in Chain(async_source).iterate(transform):
  print(item)  # works with async sources too

Calling Conventions

How arguments flow through the pipeline is determined by two rules, checked in priority order:

Condition Behavior
Explicit args/kwargs provided Call fn(*args, **kwargs) -- current value NOT passed
No args (default) Call fn(current_value), fn() if no value, or return value as-is if non-callable
Chain(5).then(str).run()                    # str(5) -- current value passed
Chain(5).then(print, 'hello').run()         # print('hello') -- explicit args used

Enhanced Tracebacks

When an exception occurs inside a chain, quent injects a visualization directly into the traceback showing exactly which step failed:

Traceback (most recent call last):
  ...
  File "<quent>", line 1, in
    Chain(fetch_data)
    .then(validate)
    .then(transform) <----
    .do(log)
  ...
ZeroDivisionError: division by zero

The <---- marker points to the step that raised. Internal quent frames are cleaned from the traceback. On Python 3.11+, a concise exception note is also attached.

Opt out by setting QUENT_NO_TRACEBACK=1 before importing quent.


API Reference

Constructor

Chain(v=<no value>, /, *args, **kwargs)

Pipeline Building

All methods return self for fluent chaining.

Method Description
.then(v, /, *args, **kwargs) Append step; result replaces current value
.do(fn, /, *args, **kwargs) Side-effect step; fn must be callable, result discarded
.foreach(fn, /, *, concurrency=None, executor=None) Transform each element, collect results
.foreach_do(fn, /, *, concurrency=None, executor=None) Side-effect per element, keep originals
.gather(*fns, concurrency=-1, executor=None) Run multiple fns on current value, collect results as tuple
.with_(fn, /, *args, **kwargs) Enter current value as context manager, call fn
.with_do(fn, /, *args, **kwargs) Same as with_, but fn result discarded
.if_(predicate=None, /, *args, **kwargs) Begin conditional; must be followed by .then() or .do()
.if_(...).then(fn, /, *args, **kwargs) Conditional transform -- runs fn if predicate is truthy, result replaces current value
.if_(...).do(fn, /, *args, **kwargs) Conditional side-effect -- runs fn if predicate is truthy, result discarded
.else_(v, /, *args, **kwargs) Else branch (must follow .then() or .do())
.else_do(fn, /, *args, **kwargs) Side-effect else branch (result discarded)
.except_(fn, /, *args, exceptions=None, reraise=False, **kwargs) Exception handler (one per chain)
.finally_(fn, /, *args, **kwargs) Cleanup handler (one per chain)
.name(label) Assign a label for traceback identification

Execution

Method Description
.run(v=Null, /, *args, **kwargs) Execute the chain; returns value or coroutine
chain(...) Alias for .run()

Reuse and Iteration

Method Description
.decorator() Wrap chain as a function decorator
.iterate(fn=None) Dual sync/async generator over output
.iterate_do(fn=None) Like iterate, fn results discarded
.clone() Deep copy for fork-and-extend

Control Flow

Class methods

Method Description
Chain.return_(v=Null, /, *args, **kwargs) Signal early return from chain
Chain.break_(v=Null, /, *args, **kwargs) Signal break from iteration; value is appended to partial results

Exports and Instrumentation

Name Description
Chain Main pipeline class
ChainExcInfo NamedTuple (exc, root_value) passed to except handlers
ChainIterator Type alias for .iterate() / .iterate_do() return values
QuentException Exception type for quent-specific errors
__version__ Package version string
Chain.on_step Optional callback (chain, step_name, input_value, result, elapsed_ns) for instrumentation

Note: Chain objects cannot be pickled (security measure -- see Troubleshooting). Define chains at module level and reference by name instead of serializing.


Examples

See the examples/ directory for complete, runnable recipes covering ETL pipelines, API gateways, fan-out/fan-in patterns, retry with backoff, and testing chains.


Testing

quent's correctness rests on a single guarantee: any pipeline step can be swapped between sync and async without changing the result. The test suite is purpose-built to prove this exhaustively.

Scale

  • 1,342 test methods across 20 test modules and 286 test classes
  • 21 CI matrix combinations — 3 OSes (Ubuntu, macOS, Windows) × 5 Python versions (3.10–3.14), plus free-threaded builds (3.13t, 3.14t)
  • Security scanningpip-audit for dependency vulnerabilities, bandit SAST for source code

Exhaustive Bridge Testing

The core testing infrastructure proves the sync/async bridge contract across a 7-axis combinatorial space:

  1. Operation type — 96 "bricks" covering every chain operation × every calling convention
  2. Chain length — pipelines of 1 to N steps
  3. Operation order — every permutation of operations (with repetition)
  4. Sync/async per position — each step independently sync or async (2N combinations per pipeline)
  5. Error injection — exceptions, base exceptions, and control flow signals at each position
  6. Concurrency — sequential and concurrent variants
  7. Handler configuration — 18 error handler combinations (except/finally/both, sync/async, consuming/reraising/failing)

For each configuration, all 2N sync/async permutations run and must produce identical results. No expected values are precomputed — the invariant is that all permutations agree with each other. Correctness is independently verified by composing pure-Python oracle functions.

Additional Testing Strategies

  • Transition matrix — all 17,576 triplets of 26 atomic operations verify that every method adjacency produces correct results in all sync/async variants
  • Property-based testing — Hypothesis generates random inputs for 179 property and fuzz tests, including CWE-117 repr sanitization with adversarial ANSI escape sequences
  • Thread safety — 30–50 concurrent threads with barrier synchronization verify safe concurrent execution of fully constructed chains
  • Oracle validation — each of the 96 bricks has an independent oracle function; oracles are verified against quent before being used in bridge assertions
  • Warning validation — all warnings emitted during exhaustive runs are captured and validated against expected patterns

Running Tests

# Full suite (format + lint + type check + tests)
./run_tests.sh

# Tests only (parallel -- wall-clock time = slowest module)
python scripts/run_tests_parallel.py

# Single module
python -m unittest tests.bridge_tests

Documentation

Full documentation — including guides, advanced usage, recipes, and framework integration examples — is available at quent.readthedocs.io.


Contributing

See the contributing guide for setup instructions, code style, and PR guidelines.

git clone https://github.com/drukmano/quent.git
cd quent
uv sync --group dev       # or: pip install -e . && pip install ruff mypy
bash scripts/run_tests.sh

Docs   •   GitHub   •   PyPI   •   Getting Started   •   Changelog

MIT — Copyright (c) 2023–2026 Ohad Drukman

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quent-5.3.0.tar.gz (93.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quent-5.3.0-py3-none-any.whl (78.9 kB view details)

Uploaded Python 3

File details

Details for the file quent-5.3.0.tar.gz.

File metadata

  • Download URL: quent-5.3.0.tar.gz
  • Upload date:
  • Size: 93.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for quent-5.3.0.tar.gz
Algorithm Hash digest
SHA256 df2f5472a25ee025f06f8ef04c5e56efbade88090b32ede11c997034a350a1a9
MD5 c5cd032c44ac410628ac002286345fee
BLAKE2b-256 f5fd581551c45ebb4fe9a520772c9d882fce426f86f029284b782f217ec5112f

See more details on using hashes here.

Provenance

The following attestation bundles were made for quent-5.3.0.tar.gz:

Publisher: release.yml on drukmano/quent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file quent-5.3.0-py3-none-any.whl.

File metadata

  • Download URL: quent-5.3.0-py3-none-any.whl
  • Upload date:
  • Size: 78.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for quent-5.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3f4e78a00fa7694cea5f1ddfd72f5fab7c9ff46d7c78a4af3bd0509b19adb24a
MD5 b6188ad0563be039073eb8b7cf637f90
BLAKE2b-256 174726b54b42da6d3b030bf6aff0e68965e2943670698ae704e58a5b3c4fa879

See more details on using hashes here.

Provenance

The following attestation bundles were made for quent-5.3.0-py3-none-any.whl:

Publisher: release.yml on drukmano/quent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page