Fluent chain interface for Python with transparent sync/async handling. Write pipeline code once -- Quent automatically detects coroutines and handles them. Pure Python, zero dependencies.
Project description
quent
Write it once. Run it sync or async.
A transparent sync/async bridge for Python. Define a pipeline once and run it with sync callables, async callables, or any mix of both -- quent detects awaitables at runtime and handles the transition automatically. Pure Python, zero dependencies.
Why quent?
The Problem
Processing pipelines start simple but quickly become hard to read:
# Nested calls -- read inside-out, hard to follow
result = send_data(normalize_data(validate_data(fetch_data(id))))
# Intermediate variables -- verbose and repetitive
data = fetch_data(id)
data = validate_data(data)
data = normalize_data(data)
result = send_data(data)
# Async makes it worse -- now you maintain two versions
data = await fetch_data(id)
data = validate_data(data)
data = await normalize_data(data)
result = await send_data(data)
And the real cost: if you need to support both sync and async callers, you write the same logic twice. Two functions. Same steps. They drift apart. A bug fix in one gets forgotten in the other.
The Solution
from quent import Chain
pipeline = (
Chain()
.then(fetch_data)
.then(validate_data)
.then(normalize_data)
.then(send_data)
)
result = pipeline.run(id) # sync caller
result = await pipeline.run(id) # async caller
One definition. Both worlds. quent inspects each return value at runtime -- if it's an awaitable, execution seamlessly transitions to async mode. No annotations, no wrappers, no ceremony.
Write your code once, use it everywhere:
def process_data(data_source):
"""Works with both sync and async data sources."""
return (
Chain(data_source.fetch)
.then(validate)
.then(transform)
.then(data_source.save)
.run()
)
result = process_data(sync_database) # returns a value
result = await process_data(async_database) # returns a coroutine
Installation
pip install quent
Requires Python 3.10+. Zero runtime dependencies.
Quick Start
from quent import Chain
# Build a pipeline with .then() and .do()
result = Chain(42).then(lambda x: x * 2).do(print).then(str).run()
#> 84
#> '84'
# Collection operations
result = Chain([1, 2, 3, 4, 5]).filter(lambda x: x % 2 == 0).map(lambda x: x ** 2).run()
#> [4, 16]
# Async transparency -- same chain, sync or async callables
chain = Chain().then(fetch_user).then(serialize)
result = chain.run(user_id) # returns value if fetch_user is sync
result = await chain.run(user_id) # returns coroutine if fetch_user is async
Features
Pipeline Building
All pipeline methods return self for fluent chaining.
Chain(data).then(transform).do(log).then(save).run()
.then(v)-- result replaces the current pipeline value.do(fn)-- side-effect; result is discarded, current value passes through.map(fn)-- applyfnto each element, collect results into a list.foreach(fn)-- applyfnto each element as side-effect, keep originals.filter(fn)-- keep elements wherefnreturns truthy
Concurrent Execution
Run multiple functions on the current value in parallel. If any returns an awaitable, all are gathered concurrently via asyncio.gather.
results = Chain(data).gather(validate, enrich, score).run()
Context Managers
Works with both sync and async context managers.
content = Chain("data.txt").then(open).with_(lambda f: f.read()).run()
.with_(fn)-- enter current value as context manager, callfnwith the context value.with_do(fn)-- same, butfn's result is discarded
Conditional Logic
result = (
Chain(value)
.if_(lambda x: x > 0, then=process_positive)
.else_(process_negative)
.run()
)
Error Handling and Retry
One exception handler and one finally handler per chain. Retry re-executes the entire chain from scratch.
result = (
Chain(url)
.then(fetch)
.then(parse)
.except_(handle_error, exceptions=ConnectionError)
.finally_(cleanup)
.retry(max_attempts=3, backoff=lambda attempt: 2 ** attempt)
.run()
)
Control Flow
Early return and iteration break, usable inside any step.
Chain.return_(value) # exit the chain early with a value
Chain.break_(value) # break from a map/foreach/filter iteration
Iteration
Yields each element of the chain's output. Supports both for and async for.
for item in Chain(fetch_all).iterate():
process(item)
async for item in Chain(fetch_all).iterate():
await process(item)
Reusable Chains
Clone a chain to create independent copies for fork-and-extend patterns:
base = Chain().then(validate).then(normalize)
for_api = base.clone().then(serialize_json)
for_db = base.clone().then(serialize_sql)
Nest chains inside other chains -- inner chains execute as a single step:
validate_chain = Chain().then(check_schema).then(check_permissions)
pipeline = Chain(request).then(validate_chain).then(handle).run()
Wrap a chain as a function decorator:
pipeline = Chain().then(validate).then(transform)
@pipeline.decorator()
def handle(request):
return parse(request)
Enhanced Tracebacks
When an exception occurs inside a chain, quent injects a visualization into the traceback showing the full pipeline and exactly which step failed:
Traceback (most recent call last):
File "example.py", line 28, in <module>
.run()
^^^^^
File "<quent>", line 1, in
Chain(fetch_data, 42) = {'id': 42, 'value': 100}
.then(validate) <----
.then(transform)
.then(save)
File "example.py", line 11, in validate
raise ValueError("Value too large")
ValueError: Value too large
The <---- marker points to the step that raised the exception. Intermediate values are shown next to each step so you can trace exactly what data flowed through the pipeline. Internal quent frames are automatically cleaned from the traceback.
Set QUENT_NO_TRACEBACK=1 to disable.
Real-World Example
A Redis pipeline wrapper that works identically with both redis (sync) and redis.asyncio (async):
class RedisBatch:
def __init__(self, r, transaction=False):
self.r = r
self.transaction = transaction
self.operations = []
def add(self, op):
self.operations.append(op)
def flush(self):
pipe = self.r.pipeline(transaction=self.transaction)
for op in self.operations:
op(pipe)
return (
Chain(pipe.execute)
.then(self.process_results)
.finally_(pipe.reset, ...) # always reset; ... means "call with no args"
.run()
)
The same flush() method works whether self.r is a sync redis.Redis client or an async redis.asyncio.Redis client. No if/else, no duplication, no async def variant. The ... (Ellipsis) in .finally_(pipe.reset, ...) tells quent to call pipe.reset() with no arguments, overriding the default behavior of passing the current value.
Documentation
Full documentation is available at quent.readthedocs.io.
License
MIT -- Copyright (c) 2023 Ohad Drukman
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file quent-4.0.0.tar.gz.
File metadata
- Download URL: quent-4.0.0.tar.gz
- Upload date:
- Size: 36.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f4f099466ca0854d4ca2bee2afe25016e93d72cf2444289e74021fcd1d0efe86
|
|
| MD5 |
12f47ab52907cef4bbb12e166e55c67b
|
|
| BLAKE2b-256 |
254e01f4c8c272cb05f5534f49548ff4be98a732052a81a6ec465529fbdf0dd4
|
File details
Details for the file quent-4.0.0-py3-none-any.whl.
File metadata
- Download URL: quent-4.0.0-py3-none-any.whl
- Upload date:
- Size: 38.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0c42ceab7a00bc84a697ca88eda782c6d935404d8b1377b81f5d95d595270ae4
|
|
| MD5 |
de47a3884ed44cb4d3dd1a101e4602f4
|
|
| BLAKE2b-256 |
ee4260a40a3890bb5aedb3aef061aa14ee63f7eb3effd5c38f3e53af4818df23
|