Skip to main content

Fluent chain interface for Python with transparent sync/async handling. Write pipeline code once -- Quent automatically detects coroutines and handles them. Pure Python, zero dependencies.

Project description


quent
Write it once. Run it sync or async.

PyPI Python License CI Coverage Typed

A transparent sync/async bridge for Python. Define a pipeline once and run it with sync callables, async callables, or any mix of both -- quent detects awaitables at runtime and handles the transition automatically. Pure Python, zero dependencies.


Why quent?

The Problem

Processing pipelines start simple but quickly become hard to read:

# Nested calls -- read inside-out, hard to follow
result = send_data(normalize_data(validate_data(fetch_data(id))))

# Intermediate variables -- verbose and repetitive
data = fetch_data(id)
data = validate_data(data)
data = normalize_data(data)
result = send_data(data)

# Async makes it worse -- now you maintain two versions
data = await fetch_data(id)
data = validate_data(data)
data = await normalize_data(data)
result = await send_data(data)

And the real cost: if you need to support both sync and async callers, you write the same logic twice. Two functions. Same steps. They drift apart. A bug fix in one gets forgotten in the other.

The Solution

from quent import Chain

pipeline = (
    Chain()
    .then(fetch_data)
    .then(validate_data)
    .then(normalize_data)
    .then(send_data)
)

result = pipeline.run(id)            # sync caller
result = await pipeline.run(id)      # async caller

One definition. Both worlds. quent inspects each return value at runtime -- if it's an awaitable, execution seamlessly transitions to async mode. No annotations, no wrappers, no ceremony.

Write your code once, use it everywhere:

def process_data(data_source):
    """Works with both sync and async data sources."""
    return (
        Chain(data_source.fetch)
        .then(validate)
        .then(transform)
        .then(data_source.save)
        .run()
    )

result = process_data(sync_database)          # returns a value
result = await process_data(async_database)   # returns a coroutine

Installation

pip install quent

Requires Python 3.10+. Zero runtime dependencies.

Quick Start

from quent import Chain

# Build a pipeline with .then() and .do()
result = Chain(42).then(lambda x: x * 2).do(print).then(str).run()
#> 84
#> '84'

# Collection operations
result = Chain([1, 2, 3, 4, 5]).filter(lambda x: x % 2 == 0).map(lambda x: x ** 2).run()
#> [4, 16]

# Async transparency -- same chain, sync or async callables
chain = Chain().then(fetch_user).then(serialize)
result = chain.run(user_id)          # returns value if fetch_user is sync
result = await chain.run(user_id)    # returns coroutine if fetch_user is async

Features

Pipeline Building

All pipeline methods return self for fluent chaining.

Chain(data).then(transform).do(log).then(save).run()
  • .then(v) -- result replaces the current pipeline value
  • .do(fn) -- side-effect; result is discarded, current value passes through
  • .map(fn) -- apply fn to each element, collect results into a list
  • .foreach(fn) -- apply fn to each element as side-effect, keep originals
  • .filter(fn) -- keep elements where fn returns truthy

Concurrent Execution

Run multiple functions on the current value in parallel. If any returns an awaitable, all are gathered concurrently via asyncio.gather.

results = Chain(data).gather(validate, enrich, score).run()

Context Managers

Works with both sync and async context managers.

content = Chain("data.txt").then(open).with_(lambda f: f.read()).run()
  • .with_(fn) -- enter current value as context manager, call fn with the context value
  • .with_do(fn) -- same, but fn's result is discarded

Conditional Logic

result = (
    Chain(value)
    .if_(lambda x: x > 0, then=process_positive)
    .else_(process_negative)
    .run()
)

Error Handling and Retry

One exception handler and one finally handler per chain. Retry re-executes the entire chain from scratch.

result = (
    Chain(url)
    .then(fetch)
    .then(parse)
    .except_(handle_error, exceptions=ConnectionError)
    .finally_(cleanup)
    .retry(max_attempts=3, backoff=lambda attempt: 2 ** attempt)
    .run()
)

Control Flow

Early return and iteration break, usable inside any step.

Chain.return_(value)  # exit the chain early with a value
Chain.break_(value)   # break from a map/foreach/filter iteration

Iteration

Yields each element of the chain's output. Supports both for and async for.

for item in Chain(fetch_all).iterate():
    process(item)

async for item in Chain(fetch_all).iterate():
    await process(item)

Reusable Chains

Clone a chain to create independent copies for fork-and-extend patterns:

base = Chain().then(validate).then(normalize)

for_api = base.clone().then(serialize_json)
for_db = base.clone().then(serialize_sql)

Nest chains inside other chains -- inner chains execute as a single step:

validate_chain = Chain().then(check_schema).then(check_permissions)
pipeline = Chain(request).then(validate_chain).then(handle).run()

Wrap a chain as a function decorator:

pipeline = Chain().then(validate).then(transform)

@pipeline.decorator()
def handle(request):
    return parse(request)

Enhanced Tracebacks

When an exception occurs inside a chain, quent injects a visualization into the traceback showing the full pipeline and exactly which step failed:

Traceback (most recent call last):
  File "example.py", line 28, in <module>
    .run()
     ^^^^^
  File "<quent>", line 1, in
    Chain(fetch_data, 42) = {'id': 42, 'value': 100}
    .then(validate) <----
    .then(transform)
    .then(save)
  File "example.py", line 11, in validate
    raise ValueError("Value too large")
ValueError: Value too large

The <---- marker points to the step that raised the exception. Intermediate values are shown next to each step so you can trace exactly what data flowed through the pipeline. Internal quent frames are automatically cleaned from the traceback.

Set QUENT_NO_TRACEBACK=1 to disable.

Real-World Example

A Redis pipeline wrapper that works identically with both redis (sync) and redis.asyncio (async):

class RedisBatch:
    def __init__(self, r, transaction=False):
        self.r = r
        self.transaction = transaction
        self.operations = []

    def add(self, op):
        self.operations.append(op)

    def flush(self):
        pipe = self.r.pipeline(transaction=self.transaction)
        for op in self.operations:
            op(pipe)
        return (
            Chain(pipe.execute)
            .then(self.process_results)
            .finally_(pipe.reset, ...)  # always reset; ... means "call with no args"
            .run()
        )

The same flush() method works whether self.r is a sync redis.Redis client or an async redis.asyncio.Redis client. No if/else, no duplication, no async def variant. The ... (Ellipsis) in .finally_(pipe.reset, ...) tells quent to call pipe.reset() with no arguments, overriding the default behavior of passing the current value.

Documentation

Full documentation is available at quent.readthedocs.io.

License

MIT -- Copyright (c) 2023 Ohad Drukman

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quent-4.0.0.tar.gz (36.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quent-4.0.0-py3-none-any.whl (38.0 kB view details)

Uploaded Python 3

File details

Details for the file quent-4.0.0.tar.gz.

File metadata

  • Download URL: quent-4.0.0.tar.gz
  • Upload date:
  • Size: 36.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for quent-4.0.0.tar.gz
Algorithm Hash digest
SHA256 f4f099466ca0854d4ca2bee2afe25016e93d72cf2444289e74021fcd1d0efe86
MD5 12f47ab52907cef4bbb12e166e55c67b
BLAKE2b-256 254e01f4c8c272cb05f5534f49548ff4be98a732052a81a6ec465529fbdf0dd4

See more details on using hashes here.

File details

Details for the file quent-4.0.0-py3-none-any.whl.

File metadata

  • Download URL: quent-4.0.0-py3-none-any.whl
  • Upload date:
  • Size: 38.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for quent-4.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0c42ceab7a00bc84a697ca88eda782c6d935404d8b1377b81f5d95d595270ae4
MD5 de47a3884ed44cb4d3dd1a101e4602f4
BLAKE2b-256 ee4260a40a3890bb5aedb3aef061aa14ee63f7eb3effd5c38f3e53af4818df23

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page