Write pipeline code once — runs sync or async automatically. Transparent sync/async bridge for Python. Pure Python, zero runtime dependencies on Python 3.11+.
Project description
A transparent sync/async bridge for Python.
Define a pipeline once — quent handles the rest.
- One definition, two worlds — a single chain works for both sync and async callers. Zero code duplication.
- Zero ceremony — no decorators, no base classes, no type wrappers. Just chain your functions.
- Drop-in migration — unify existing sync and async implementations into one pipeline. Stop maintaining two versions.
- Pure Python — zero runtime dependencies on Python 3.11+. Fully typed (PEP 561).
- Focused — every feature exists because removing it would force separate sync and async code paths.
The Problem
Any codebase that supports both sync and async callers ends up maintaining two versions of the same logic:
# Without quent -- the same pipeline, written twice
def process_sync(data):
validated = validate_sync(data)
transformed = transform_sync(validated)
return save_sync(transformed)
async def process_async(data):
validated = await validate_async(data)
transformed = await transform_async(validated)
return await save_async(transformed)
Every function, every pipeline, every utility — duplicated. When a bug is fixed in one version, the other falls out of sync. When a new step is added, it must be added in both places.
The Solution
# With quent -- write it once
pipeline = Chain().then(validate).then(transform).then(save)
result = pipeline.run(data) # sync if all steps are sync
result = await pipeline.run(data) # async if any step is async
One definition. The chain starts executing synchronously. The moment any step returns an awaitable, execution seamlessly transitions to async and stays there. The caller decides whether to await.
Installation
pip install quent
Requires Python 3.10+. Supports 3.10 through 3.14, including free-threaded builds. Zero runtime dependencies on Python 3.11+ (typing_extensions on 3.10).
Quick Start
from quent import Chain
# Basic pipeline
result = Chain(5).then(lambda x: x * 2).then(lambda x: x + 1).run()
print(result) # 11
# Side effects -- do() runs the function but passes the value through
result = Chain(42).then(lambda x: x * 2).do(print).then(str).run() # prints: 84
print(result) # '84'
# Works with any callable
result = Chain(' hello ').then(str.strip).then(str.upper).run()
print(result) # HELLO
The same chain works whether your functions are sync, async, or a mix:
pipeline = Chain().then(fetch_data).then(validate).then(normalize)
# Sync context
result = pipeline.run(id)
# Async context -- same chain, no changes
result = await pipeline.run(id)
Features
Build pipelines fluently. Every builder method returns self for chaining.
from quent import Chain
result = (
Chain(fetch_user) # root callable
.then(validate) # transform
.do(log) # side-effect
.foreach(normalize_field) # per-element
.gather(enrich, score) # concurrent
.then(merge) # combine
.if_(has_premium).then(upgrade) # conditional
.except_(handle_error) # error handling
.finally_(cleanup) # cleanup
.run(user_id) # execute
)
Collection Operations — foreach, foreach_do
# foreach -- transform each element, collect results
Chain([1, 2, 3]).foreach(lambda x: x ** 2).run() # [1, 4, 9]
# foreach_do -- side-effect per element, keep originals
Chain([1, 2, 3]).foreach_do(print).run() # prints 1, 2, 3; returns [1, 2, 3]
# filter via list comprehension
Chain([1, 2, 3, 4, 5]).then(lambda xs: [x for x in xs if x % 2 == 0]).run() # [2, 4]
Concurrent Execution — gather, concurrency parameter
Run multiple functions on the same value concurrently:
Chain('hello').gather(str.upper, len).run() # ('HELLO', 5)
Limit concurrency on collection operations with the concurrency parameter. Uses ThreadPoolExecutor for sync callables and asyncio.Semaphore + TaskGroup for async:
# Process up to 10 items at a time
Chain(urls).foreach(fetch, concurrency=10).run()
# Limit concurrent gather branches
Chain(data).gather(analyze, compress, upload, concurrency=5).run()
Pass a custom executor for sync concurrent operations:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as pool:
Chain(urls).foreach(fetch, concurrency=4, executor=pool).run()
Conditionals — if_ / else_
Chain(5).if_(lambda x: x > 0).then(lambda x: x * 2).run() # 10
Chain(-5).if_(lambda x: x > 0).then(str).else_(abs).run() # 5
# When predicate is omitted, uses truthiness of the current value
Chain('hello').if_().then(str.upper).run() # 'HELLO'
Chain('').if_().then(str.upper).else_(lambda _: 'empty').run() # 'empty'
# Literal predicate -- truthiness used directly
Chain(value).if_(is_admin).then(grant_access).run()
# Side-effect conditional branch
Chain(user).if_(is_premium).do(log_premium_access).then(next_step).run()
Context Managers — with_ / with_do
Transparently handles both sync and async context managers:
Chain(open('data.txt')).with_(lambda f: f.read()).run()
# Side-effect variant (result discarded, original value passes through)
Chain(open('log.txt', 'w')).with_do(lambda f: f.write('done')).run()
Error Handling — except_ / finally_
One exception handler and one finally handler per chain:
from quent import Chain, ChainExcInfo
Chain(0).then(lambda x: 1 / x).except_(lambda ei: -1).run() # -1
Chain(url)
.then(fetch)
.then(parse)
.except_(handle_error, exceptions=ConnectionError)
.finally_(cleanup)
.run()
except_ catches Exception by default. The handler receives a ChainExcInfo(exc, root_value) as its current value. Use reraise=True to re-raise after handling (handler runs for side-effects only). finally_ always runs and receives the chain's root value.
Control Flow — return_ / break_
# Early return -- skips all remaining steps
Chain(5) \
.then(lambda x: Chain.return_(x * 10) if x > 0 else x) \
.then(str) \
.run() # 50 (str step is skipped)
# Break from iteration -- break value is appended to partial results
Chain([1, 2, 3, 4, 5]).foreach(lambda x: Chain.break_(x) if x == 3 else x * 2).run()
# [2, 4, 3]
Composition — clone, decorator
clone — fork-and-extend without modifying the original:
base = Chain().then(validate).then(normalize)
for_api = base.clone().then(to_json) # base is untouched
for_db = base.clone().then(to_record) # independent copy
decorator — wrap a chain as a function decorator:
@Chain().then(lambda x: x.strip()).then(str.upper).decorator()
def get_name():
return ' alice '
get_name() # 'ALICE'
Iteration — iterate / iterate_do
Dual sync/async generators over chain output:
for item in Chain(range(5)).iterate(lambda x: x ** 2):
print(item) # 0, 1, 4, 9, 16
async for item in Chain(async_source).iterate(transform):
print(item) # works with async sources too
Calling Conventions
How arguments flow through the pipeline is determined by two rules, checked in priority order:
| Condition | Behavior |
|---|---|
| Explicit args/kwargs provided | Call fn(*args, **kwargs) -- current value NOT passed |
| No args (default) | Call fn(current_value), fn() if no value, or return value as-is if non-callable |
Chain(5).then(str).run() # str(5) -- current value passed
Chain(5).then(print, 'hello').run() # print('hello') -- explicit args used
Enhanced Tracebacks
When an exception occurs inside a chain, quent injects a visualization directly into the traceback showing exactly which step failed:
Traceback (most recent call last):
...
File "<quent>", line 0, in Chain(fetch_data).then(validate).then(transform) <----.do(log)
...
ZeroDivisionError: division by zero
The <---- marker points to the step that raised. Internal quent frames are cleaned from the traceback. On Python 3.11+, a concise exception note is also attached.
Opt out by setting QUENT_NO_TRACEBACK=1 before importing quent.
API Reference
Constructor
Chain(v=Null, /, *args, **kwargs)
Pipeline Building
All methods return self for fluent chaining.
| Method | Description |
|---|---|
.then(v, /, *args, **kwargs) |
Append step; result replaces current value |
.do(fn, /, *args, **kwargs) |
Side-effect step; fn must be callable, result discarded |
.foreach(fn, /, *, concurrency=None, executor=None) |
Transform each element, collect results |
.foreach_do(fn, /, *, concurrency=None, executor=None) |
Side-effect per element, keep originals |
.gather(*fns, concurrency=-1, executor=None) |
Run multiple fns on current value, collect results as tuple |
.with_(fn, /, *args, **kwargs) |
Enter current value as context manager, call fn |
.with_do(fn, /, *args, **kwargs) |
Same as with_, but fn result discarded |
.if_(predicate=None, /, *args, **kwargs) |
Begin conditional; must be followed by .then() or .do() |
.if_(...).then(fn, /, *args, **kwargs) |
Conditional transform -- runs fn if predicate is truthy, result replaces current value |
.if_(...).do(fn, /, *args, **kwargs) |
Conditional side-effect -- runs fn if predicate is truthy, result discarded |
.else_(v, /, *args, **kwargs) |
Else branch (must follow .then() or .do()) |
.else_do(fn, /, *args, **kwargs) |
Side-effect else branch (result discarded) |
.except_(fn, /, *args, exceptions=None, reraise=False, **kwargs) |
Exception handler (one per chain) |
.finally_(fn, /, *args, **kwargs) |
Cleanup handler (one per chain) |
.name(label) |
Assign a label for traceback identification |
Execution
| Method | Description |
|---|---|
.run(v=Null, /, *args, **kwargs) |
Execute the chain; returns value or coroutine |
chain(...) |
Alias for .run() |
Reuse and Iteration
| Method | Description |
|---|---|
.decorator() |
Wrap chain as a function decorator |
.iterate(fn=None) |
Dual sync/async generator over output |
.iterate_do(fn=None) |
Like iterate, fn results discarded |
.clone() |
Deep copy for fork-and-extend |
Control Flow
Class methods
| Method | Description |
|---|---|
Chain.return_(v=Null, /, *args, **kwargs) |
Signal early return from chain |
Chain.break_(v=Null, /, *args, **kwargs) |
Signal break from iteration; value is appended to partial results |
Exports and Instrumentation
| Name | Description |
|---|---|
Chain |
Main pipeline class |
ChainExcInfo |
NamedTuple (exc, root_value) passed to except handlers |
ChainIterator |
Type alias for .iterate() / .iterate_do() return values |
Null |
Sentinel for "no value provided" (distinct from None) |
QuentException |
Exception type for quent-specific errors |
__version__ |
Package version string |
Chain.on_step |
Optional callback (chain, step_name, input_value, result, elapsed_ns) for instrumentation |
Note: Chain objects cannot be pickled (security measure -- see Troubleshooting). Define chains at module level and reference by name instead of serializing.
Examples
See the examples/ directory for complete, runnable recipes covering ETL pipelines, API gateways, fan-out/fan-in patterns, retry with backoff, and testing chains.
Documentation
Full documentation — including guides, advanced usage, recipes, and framework integration examples — is available at quent.readthedocs.io.
Contributing
See the contributing guide for setup instructions, code style, and PR guidelines.
git clone https://github.com/drukmano/quent.git
cd quent
uv sync --group dev # or: pip install -e . && pip install coverage ruff mypy
bash scripts/run_tests.sh
Docs • GitHub • PyPI • Getting Started • Changelog
MIT — Copyright (c) 2023–2026 Ohad Drukman
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file quent-5.0.0.tar.gz.
File metadata
- Download URL: quent-5.0.0.tar.gz
- Upload date:
- Size: 80.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7b14099b8c016830522859dbe356747fb4b4e243a81ed2063ffbd3d6221e5f81
|
|
| MD5 |
513b49f7c5e8ecf4ac99c9722082006c
|
|
| BLAKE2b-256 |
5a9462b5a222ee9372b1e6d0482f23f59b45d217af765246419a9b967f3116a8
|
Provenance
The following attestation bundles were made for quent-5.0.0.tar.gz:
Publisher:
release.yml on drukmano/quent
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
quent-5.0.0.tar.gz -
Subject digest:
7b14099b8c016830522859dbe356747fb4b4e243a81ed2063ffbd3d6221e5f81 - Sigstore transparency entry: 1111147113
- Sigstore integration time:
-
Permalink:
drukmano/quent@79def3443d591e2a988910478cd0b0cdec7ac281 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/drukmano
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@79def3443d591e2a988910478cd0b0cdec7ac281 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file quent-5.0.0-py3-none-any.whl.
File metadata
- Download URL: quent-5.0.0-py3-none-any.whl
- Upload date:
- Size: 71.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f834745d870873b6a15d4f6bb0e1758c8902ddd35624cd1737fddd290e3ae68
|
|
| MD5 |
f340d001276ef12ac5e3b442bf48f3ce
|
|
| BLAKE2b-256 |
7ca6044c23fb38dc3a4037448259a8010b1842506cff6b296a7e81e85a983bcc
|
Provenance
The following attestation bundles were made for quent-5.0.0-py3-none-any.whl:
Publisher:
release.yml on drukmano/quent
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
quent-5.0.0-py3-none-any.whl -
Subject digest:
4f834745d870873b6a15d4f6bb0e1758c8902ddd35624cd1737fddd290e3ae68 - Sigstore transparency entry: 1111147115
- Sigstore integration time:
-
Permalink:
drukmano/quent@79def3443d591e2a988910478cd0b0cdec7ac281 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/drukmano
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@79def3443d591e2a988910478cd0b0cdec7ac281 -
Trigger Event:
workflow_dispatch
-
Statement type: