Skip to main content

Run async workflows using pytest-fixtures-style dependency injection

Project description

asyncinject

PyPI Changelog License

Run async workflows using pytest-fixtures-style dependency injection

Installation

Install this library using pip:

$ pip install asyncinject

Usage

This library is inspired by pytest fixtures.

The idea is to simplify executing parallel asyncio operations by allowing them to be defined using a collection of functions, where the function arguments represent dependent functions that need to be executed first.

The library can then create and execute a plan for executing the required functions in parallel in the most efficient sequence possible.

Here's an example, using the httpx HTTP library.

from asyncinject import Registry
import httpx


async def get(url):
    async with httpx.AsyncClient() as client:
        return (await client.get(url)).text

async def example():
    return await get("http://www.example.com/")

async def simonwillison():
    return await get("https://simonwillison.net/search/?tag=empty")

async def both(example, simonwillison):
    return example + "\n\n" + simonwillison

registry = Registry(example, simonwillison, both)
combined = await registry.resolve(both)
print(combined)

If you run this in ipython or python -m asyncio (to enable top-level await in the console) you will see output that combines HTML from both of those pages.

The HTTP requests to www.example.com and simonwillison.net will be performed in parallel.

The library notices that both() takes two arguments which are the names of other registered async def functions, and will construct an execution plan that executes those two functions in parallel, then passes their results to the both() method.

Registry.from_dict()

Passing a list of functions to the Registry constructor will register each function under their introspected function name, using fn.__name__.

You can set explicit names instead using a dictionary:

registry = Registry.from_dict({
    "example": example,
    "simonwillison": simonwillison,
    "both": both
})

Those string names will be used to match parameters, so each function will need to accept parameters named after the keys used in that dictionary.

Registering additional functions

Functions that are registered can be regular functions or async def functions.

In addition to registering functions by passing them to the constructor, you can also add them to a registry using the .register() method:

async def another():
    return "another"

registry.register(another)

To register them with a name other than the name of the function, pass the name= argument:

async def another():
    return "another 2"

registry.register(another, name="another_2")

Resolving an unregistered function

You don't need to register the final function that you pass to .resolve() - if you pass an unregistered function, the library will introspect the function's parameters and resolve them directly.

This works with both regular and async functions:

async def one():
    return 1

async def two():
    return 2

registry = Registry(one, two)

# async def works here too:
def three(one, two):
    return one + two

print(await registry.resolve(three))
# Prints 3

Parameters are passed through

Your dependent functions can require keyword arguments which have been passed to the .resolve() call:

async def get_param_1(param1):
    return await get(param1)

async def get_param_2(param2):
    return await get(param2)

async def both(get_param_1, get_param_2):
    return get_param_1 + "\n\n" + get_param_2


combined = await Registry(get_param_1, get_param_2, both).resolve(
    both,
    param1 = "http://www.example.com/",
    param2 = "https://simonwillison.net/search/?tag=empty"
)
print(combined)

Parameters with default values are ignored

You can opt a parameter out of the dependency injection mechanism by assigning it a default value:

async def go(calc1, x=5):
    return calc1 + x

async def calc1():
    return 5

print(await Registry(calc1, go).resolve(go))
# Prints 10

Tracking with a timer

You can pass a timer= callable to the Registry constructor to gather timing information about executed tasks.. Your function should take three positional arguments:

  • name - the name of the function that is being timed
  • start - the time that it started executing, using time.perf_counter() (perf_counter() docs)
  • end - the time that it finished executing

You can use print here too:

combined = await Registry(
    get_param_1, get_param_2, both, timer=print
).resolve(
    both,
    param1 = "http://www.example.com/",
    param2 = "https://simonwillison.net/search/?tag=empty"
)

This will output:

get_param_1 436633.584580685 436633.797921747
get_param_2 436633.641832699 436634.196364347
both 436634.196570217 436634.196575639

Turning off parallel execution

By default, functions that can run in parallel according to the execution plan will run in parallel using asyncio.gather().

You can disable this parallel exection by passing parallel=False to the Registry constructor, or by setting registry.parallel = False after the registry object has been created.

This is mainly useful for benchmarking the difference between parallel and serial execution for your project.

Development

To contribute to this library, first checkout the code. Then create a new virtual environment:

cd asyncinject
python -m venv venv
source venv/bin/activate

Now install the dependencies and test dependencies:

pip install -e '.[test]'

To run the tests:

pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncinject-0.6.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

asyncinject-0.6-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file asyncinject-0.6.tar.gz.

File metadata

  • Download URL: asyncinject-0.6.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.11

File hashes

Hashes for asyncinject-0.6.tar.gz
Algorithm Hash digest
SHA256 ee8e879e3c18eaa5fd0b6962ac447ee80f21b201f5d12986f59fa6e0638ba774
MD5 8412d6066226d30a550a7ac08cc798e3
BLAKE2b-256 e26962b417803ee72f9cd5d159e52b29067c57f8a5260493ea90ff402e4be056

See more details on using hashes here.

File details

Details for the file asyncinject-0.6-py3-none-any.whl.

File metadata

  • Download URL: asyncinject-0.6-py3-none-any.whl
  • Upload date:
  • Size: 12.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.11

File hashes

Hashes for asyncinject-0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 2f1c9d03e50a51877bdc1b5907a206c47ed9bd1ddca25aee4d62ed2297b2e90a
MD5 92a9605b3a91a70b3e81128a30c2b029
BLAKE2b-256 f229cf95e0a15079d7a3d16a71cfd1654f40838541d556b95d86f35ca97ca3b2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page