Skip to main content

Yet Another Chain Interface.

Project description

Quent

Yet Another Chain Interface.

Installation

pip install quent

Table of Contents

Suggestions and contributions are more than welcome.

Introduction

Quent is an enhanced, chain interface implementation for Python, designed to handle coroutines transparently. The interface and usage of Quent remains exactly the same, whether you feed it synchronous or asynchronous objects - it can handle almost any use case.

Every documented API supports both regular functions and coroutines. It will work the exact same way as with a regular function. Quent automatically awaits any coroutines, even a coroutine that the function passed to .foreach() may return.

Quent is written in C (using Cython) to minimize it's overhead as much as possible.

As a basic example, take this function:

async def handle_request(id):
  data = await fetch_data(id)
  data = validate_data(data)
  data = normalize_data(data)
  return await send_data(data)

It uses intermediate variables that only serve to make to code more readable, as opposed to:

async def handle_request(id):
  return await send_data(normalize_data(validate_data(await fetch_data(id))))

With Quent, we can chain these operations:

from quent import Chain

def handle_request(id):
  return Chain(fetch_data, id).then(validate_data).then(normalize_data).then(send_data).run()

Upon evaluation (calling .run()), if an awaitable object is detected, Quent wraps it in a Task and returns it. The task is automatically scheduled for execution and the chain evaluation continues within the task. As Task objects need not be await-ed in order to run, you may or may not await it, depending on your needs.

Besides Chain, Quent provides the Cascade class which implements the fluent interface.

Quent aims to provide all the necessary tools to handle every use case. See the full capabilities of Quent in the API Section.

Real World Example

This snippet is taken from a thin Redis wrapper I wrote, which supports both the sync and async versions of redis without having a separate implementation for the async version.

def flush(self) -> Any | Coroutine:
  """ Execute the current pipeline and return the results, excluding
      the results of inline 'expire' commands.
  """
  pipe = self.r.pipeline(transaction=self.transaction)
  # this applies a bunch of Redis operations onto the `pipe` object.
  self.apply_operations(pipe)
  return (
    Chain(pipe.execute, raise_on_error=True)
    .then(self.remove_ignored_commands)
    .finally_(pipe.reset, ...)
    .run()
  )

Once the chain runs, it will execute the pipeline commands, remove the unwanted results, and return the rest of them. Finally, it will reset the pipe object. Any function passed to .finally_() will always be invoked, even if an exception has been raised during the execution of the chain. The purpose of the ... here is explained in the Ellipsis section.

pipe.execute and pipe.reset are both performing a network request to Redis, and in the case of an async Redis object - are coroutines and would have to be awaited. Notice that I return without an explicit await - if the user of this wrapper has initialized the class with an async Redis instance, they will know that they need to await it. This allows me to focus on the actual logic, without caring about sync vs async.

Some would say that this pattern can cause unexpected behavior, since it isn't clear when it will return a Task or not. I see it no differently than any undocumented code - with a proper and clear documentation (be it an external documentation or just a simple docstring), there shouldn't be any truly unexpected behavior (barring any unknown bugs).

Details & Examples

Literal Values

You don't have to pass a callable as a chain item - literal values works just as well.

Chain(fetch_data, id).then(True).run()

will execute fetch_data(id), and then return True.

Custom Arguments

You may provide args or kwargs to a chain item - by doing so, Quent assumes that the item is a callable and will evaluate it with the provided arguments, instead of evaluating it with the current value.

Chain(fetch_data, id).then(fetch_data, another_id, password=password).run()

will execute fetch_data(id), and then fetch_data(another_id, password=password).

Ellipsis

The Ellipsis / ... is a special case - if the first argument for most functions that register a chain item or a callback is ..., the item will be evaluated without any arguments.

Chain(fetch_data, id).then(do_something, ...).run()

will execute fetch_data(id), and then do_something().

Flow Modifiers

While the default operation of a chain is to, well, chain operations (using .then()), there are cases where you may want to break out of this flow. For this, Chain provides the functions .root() and .ignore(). They both behave like .then(), but with a small difference:

  • .root() evaluates the item using the root value, instead of the current value.
  • .ignore() evaluates the item with the current value but will not propagate its result forwards.

There is also a .root_ignore() which is the combination of .root() and .ignore().

Reusing A Chain

You may reuse a chain as many times as you wish.

chain = Chain(fetch_data, id).then(validate_data).then(normalize_data).then(send_data)
chain.run()
chain.run()
...

There are many cases where you may need to apply the same sequence of operations but with different inputs. Take our previous example:

Chain(fetch_data, id).then(validate_data).then(normalize_data).then(send_data).run()

Instead, we can create a template chain and reuse it, passing different values to .run():

handle_data = Chain().then(validate_data).then(normalize_data).then(send_data)

for id in list_of_ids:
  handle_data.run(fetch_data, id)

Re-using a Chain object will significantly reduce its overhead, as most of the performance hit is due to the creation of a new Chain instance. Nonetheless, the performance hit is negligible and not worth to sacrifice readability for. So unless it makes sense (or you need to really squeeze out performance), it's better to create a new Chain instance.

Nesting A Chain

You can nest a Chain object within another Chain object:

Chain(fetch_data, id)
.then(Chain().then(validate_data).then(normalize_data))
.then(send_data)
.run()

A nested chain must always be a template chain (i.e. initialized without arguments).

A nested chain will be evaluated with the current value of the parent chain passed to its .run() method.

Pipe Syntax

Pipe syntax is supported:

from quent import Chain, run

(Chain(fetch_data) | process_data | normalize_data | send_data).run()
Chain(fetch_data) | process_data | normalize_data | send_data | run()

You can also use Pipe with Quent:

from pipe import where

Chain(get_items).then(where(lambda item: item.is_valid()))
Chain(get_items) | where(lambda item: item.is_valid())

Safety Callbacks

The usage of except and finally is supported:

Chain(open('<file-path>')).then(do_something_with_file).finally_(lambda f: f.close())

Read more about it in Callbacks.

Comparisons

Most basic comparison operations are supported:

Chain(get_key).in_(list_of_keys)  # == get_key() in list_of_keys

See the full list of operations in Conditionals.

Iterators

You can easily iterate over the result of something:

Chain(fetch_keys).foreach(do_something_with_key)

The full details of .foreach() are explained here.

Contexts

You can execute a function (or do quite anything you want) inside a context:

Chain(get_lock, id).with_(fetch_data, id)

The full details of .with_() are explained here.

API

Value Evaluation

Most of the methods in the following section receives value, args, and kwargs. Unless explicitly told otherwise, the evaluation of value in all of those methods is roughly equivalent to:

if args[0] is Ellipsis:
  return value()

elif args or kwargs:
  return value(*args, **kwargs)

elif callable(value):
  return value(current_value)

else:
  return value

The evaluate_value function contains the full evaluation logic.

Core

__init__(value: Any = None, *args, **kwargs)

Creates a new chain with value as the chain's root item. value can be anything - a literal value, a function, a class, etc. If args or kwargs are provided, value is assumed to be a callable and will be evaluated with those arguments. Otherwise, a check is performed to determine whether value is a callable. If it is, it is called without any arguments.

Not passing a value will create a template chain (see: Reusing A Chain). You can still normally use it, but then you must call .run() with a value (see the next section).

A few examples:

Chain(42)
Chain(fn, True)
Chain(cls, name='foo')
Chain(lambda v: v*10, 4.2)

run(value: Any = None, *args, **kwargs) -> Any | asyncio.Task

Evaluates the chain and returns the result, or a Task if there are any coroutines in the chain.

If the chain is a template chain (initialized without a value), you must call .run() with a value, which will act as the root item of the chain.

Conversely, if .run() is called with a value and the chain is a non-template chain, then an exception will be raised. The only case where you can both create a template chain and run it without a value is for the Cascade class, which is documented below in Cascade - Void Mode.

Similarly to the examples above,

Chain().run(42)
Chain().run(fn, True)
Chain().run(cls, name='foo')
Chain().run(lambda v: v*10, 2)

then(value: Any, *args, **kwargs) -> Chain

Adds value to the chain as a chain item. value can be anything - a literal value, a function, a class, etc.

Returns the evaluation of value.

This is the main and default way of adding items to the chain.

(see: Ellipsis if you need to invoke value without arguments)

Chain(fn).then(False)
Chain(42).then(verify_result)
Chain('<uuid>').then(uuid.UUID)

root(value: Any = None, *args, **kwargs) -> Chain

Like .then(), but it first sets the root value as the current value, and then it evaluates value by the default evaluation procedure.

Calling .root() without a value simply returns the root value.

Read more in Flow Modifiers.

Chain(42).then(lambda v: v/10).root(lambda v: v == 42)

ignore(value: Any, *args, **kwargs) -> Chain

Like .then(), but keeps the current value unchanged.

In other words, this function does not affect the flow of the chain.

Read more in Flow Modifiers.

Chain(fetch_data, id).ignore(print).then(validate_data)

root_ignore(value: Any, *args, **kwargs) -> Chain

The combination of .root() and .ignore().

Chain(fetch_data, id).then(validate_data).root_ignore(print).then(normalize_data)

attr(name: str) -> Chain

Like .then(), but evaluates to getattr(current_value, name).

class A:
  @property
  def a1(self):
    # I return something important
    pass

Chain(A()).attr('a1')
ChainAttr(A()).a1

attr_fn(name: str, *args, **kwargs) -> Chain

Like .attr(), but evaluates to getattr(current_value, name)(*args, **kwargs).

class A:
  def a1(self, foo=None):
    # I do something important
    pass

Chain(A()).attr_fn('a1', foo=1)
ChainAttr(A()).a1(2)

Foreach

foreach(fn: Callable) -> Chain

Iterates over the current value and invokes fn(element) for each element. Returns a list that is the result of fn(element) for each element.

If the iterator implements __aiter__, async for ... will be used.

Example:

Chain(list_of_ids).foreach(validate_id).run()

will iterate over list_of_ids, and return a list that is equivalent to [validate_id(id) for id in list_of_ids].

foreach_do(fn: Callable) -> Chain

Like .foreach(), but returns nothing. In other words, this is the combination of .foreach() and .ignore().

Example:

Chain(list_of_ids)
.foreach_do(Chain().then(fetch_data).then(validate_data).then(normalize_data).then(send_data))
.run()

will iterate over list_of_ids, invoke the nested chain with each different id, and then return list_of_ids.

With

with_(self, value: Any | Callable = None, *args, **kwargs) -> Chain

Executes with current_value as ctx and evaluates value inside the context block, with ctx as the current value, and returns the result. If value is not provided, returns ctx. This method follows the default evaluation procedure, so passing args or kwargs is perfectly valid.

Depending on value (and args/kwargs), this is roughly equivalent to

with current_value as ctx:
  return value(ctx)

If the context object implements __aenter__, async with ... will be used.

Example:

Chain(get_lock, id).with_(fetch_data, id).run()

is roughly equivalent to:

with get_lock(id) as lock:
  # `lock` is not used here since we passed a custom argument `id`.
  return fetch_data(id)

with_do(self, value: Any | Callable, *args, **kwargs) -> Chain

Like .with_(), but returns nothing. In other words, this is the combination of .with_() and .ignore().

Class Methods

Chain.from_(*args) -> Chain

Creates a Chain template, and registers args as chain items.

Chain.from_(validate_data, normalize_data, send_data).run(fetch_data, id)
# is the same as doing
Chain().then(validate_data).then(normalize_data).then(send_data).run(fetch_data, id)

Callbacks

except_(fn: Callable | str, *args, **kwargs) -> Chain

Register a callback that will be called if an exception is raised anytime during the chain's evaluation. The callback is evaluated with the root value, or with args and kwargs if provided.

If fn is a string, then it is assumed to be an attribute method of the root value.

Chain(fetch_data).then(validate_data).except_(discard_data)

finally_(fn: Callable | str, *args, **kwargs) -> Chain

Register a callback that will always be called after the chain's evaluation. The callback is evaluated with the root value, or with args and kwargs if provided.

If fn is a string, then it is assumed to be an attribute method of the root value.

Chain(get_id).then(aqcuire_lock).root(fetch_data).finally_(release_lock)

Conditionals

if_(on_true: Any | Callable, *args, **kwargs) -> Chain

Evaluates the truthiness of the current value (bool(current_value)). If on_true is provided and the result is True, evaluates on_true and returns the result. If on_true is not provided, simply returns the truthiness result (bool).

on_true may be anything and follows the default evaluation procedure as described above.

Chain(get_random_number).then(lambda n: n > 5).if_(you_win, prize=1)

else_(on_false: Any | Callable, *args, **kwargs) -> Chain

If a previous conditional result is falsy, evaluates on_false and returns the result.

on_false may be anything and follows the default evaluation procedure as described above.

Can only be called immediately following a conditional.

Chain(get_random_number).then(lambda n: n > 5).if_(you_win, prize=1).else_(you_lose, cost=10)

not_() -> Chain

  • not current_value

This method currently does not support the on_true argument since it looks confusing. I might add it in the future.

Chain(is_valid, 'something').not_()

eq(value: Any, on_true: Any | Callable = None, *args, **kwargs) -> Chain

  • current_value == value
Chain(420).then(lambda v: v/10).eq(42)
Chain(420).then(lambda v: v/10).eq(40).else_(on_fail)

neq(value: Any, on_true: Any | Callable = None, *args, **kwargs) -> Chain

  • current_value != value
Chain(420).then(lambda v: v/10).neq(40)

is_(value: Any, on_true: Any | Callable = None, *args, **kwargs) -> Chain

  • current_value is value
Chain(object()).is_(1)

is_not(value: Any, on_true: Any | Callable = None, *args, **kwargs) -> Chain

  • current_value is not value
Chain(object()).is_not(object())

in_(value: Any, on_true: Any | Callable = None, *args, **kwargs) -> Chain

  • current_value in value
Chain('sub').in_('subway')

not_in(value: Any, on_true: Any | Callable = None, *args, **kwargs) -> Chain

  • current_value not in value
Chain('bus').then(lambda s: s[::-1]).not_in('subway')

Cascade

Although considered unpythonic, in some cases the cascade design can be very helpful. The Cascade class is identical to Chain, except that during the chain's evaluation, each chain item is evaluated using the root value as an argument (or in other words, the current value is always the chain's root value). The return value of Cascade.run() is always its root value.

from quent import Cascade

fetched_data = (
  Cascade(fetch_data, id)
  .then(send_data_to_backup)
  .then(lambda data: send_data(data, to_id=1))
  .then(print)
  .run()
)

will execute fetch_data(id), then send_data_to_backup(data), then send_data(data, to_id=1), and then print(data).

You can also use Cascade to make existing classes behave the same way:

from quent import CascadeAttr


class Foo:
  def foo(self):
    ...
  async def bar(self):
    ...
  def baz(self):
    ...

async def get_foo():
  f = Foo()
  f.foo()
  await f.bar()
  f.baz()
  return f

def better_get_foo():
  return CascadeAttr(Foo()).foo().bar().baz().run()

Cascade works for any kind of object:

from quent import CascadeAttr

CascadeAttr([]).append(1).append(2).append(3).run() == [1, 2, 3]

Cascade - Void Mode

In some cases it may be desired to run a bunch of independent operations. Using Cascade, one can achieve this by simply not passing a root value to the constructor nor to .run(). All the chain items will not receive any arguments (excluding explicitly provided args / kwargs).

await (
  Cascade()
  .then(foo, False)
  .then(bar)
  .then(baz)
  .run()
)

will execute foo(False), then bar(), then baz().

A void Cascade will always return None.

Direct Attribute Access

Both Chain and Cascade can support "direct" attribute access via the ChainAttr and CascadeAttr classes. See the Cascade section above to see an example of CascadeAttr usage. The same principle holds for ChainAttr. Accessing attributes without using the Attr subclass is possible using .attr() and .attr_fn().

The reason I decided to separate this functionality from the main classes is due to the fact that it requires overriding __getattr__, which drastically increases the overhead of both creating an instance and accessing any properties / methods. And since I don't think this kind of usage will be common, I decided to keep this functionality opt-in.

Important Notes

Asynchronous except and finally callbacks

If an except/finally callback is a coroutine function, and an exception is raised before the first coroutine of the chain has been evaluated, or if there aren't any coroutines in the chain - each callback will be invoked inside a new Task, which you won't have access to. This is due to the fact that we cannot return the new Task(s) from the except/finally clauses.

This shouldn't be an issue in most use cases, but important to be aware of. A RuntimeWarning will be emitted in such a case.

As an example, suppose that fetch_data is synchronous, and report_usage is asynchronous.

Chain(fetch_data).then(raise_exception, ...).finally_(report_usage).run()

will execute fetch_data(), then raise_exception(), and then report_usage(data). But report_usage(data) is a coroutine, and fetch_data and raise_exceptions are not. Then Quent will wrap report_usage(data) in a Task and "forget" about it.

If you must, you can "force" an async chain by giving it a dummy coroutine:

async def fn(v):
  return v

await Chain(fetch_data).then(fn).then(raise_exception, ...).finally_(report_usage).run()

This will ensure that report_usage() will be awaited properly.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quent-2.1.2.tar.gz (359.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page