Skip to main content

qaviton io

Project description

Qaviton IO

version license open issues downloads code size

Qaviton IO
is a package with a simple API, making use of python's async & multiprocessing
to enable fast execution of many asyncable operations.


pip install qaviton-io -U


  • Python 3.6+


  • async task manager
  • process task manager
  • task logger


async manager:

from time import time
from requests import get  # lets make use of requests to make async http calls
from qaviton_io import AsyncManager, task

# let's create an async manager
m = AsyncManager()

# first we make a simple function to make an http call.
# we want to log the result,
# and make sure that in case of an exception
# the manager won't stop
def task(): return get("")

# this will run async tasks and measure their duration
def run(tasks):
    t = time()
    t = time() - t
    print(f'took {round(t, 3)}s')

# let's run our task once and see how long it takes
run([task for _ in range(1)])

# now let's run our task 20 times and see how long it takes
run([task for _ in range(20)])

# we can assert the collected results here
assert len(m.results) == 21
for r in m.results:
    assert r.status_code == 200

# let's view the results in the log report

process manager:

make sure your tasks are defined at the module level,
so they can be pickled by multiprocessing
from time import time
from requests import get
from qaviton_io.types import Tasks
from qaviton_io import ProcessManager, task
from traceback import format_exc

# now we make some tasks
# this is a nested task
# we don't want to handle any exceptions
# so in case of failure the parent will not proceed
def task1(url):
    r = get(url)

# this is the prent task
# we want to handle all exceptions
# so in case of failure the next task will execute
def multi_task():
    for url in [
        "",  # make sure you enter a valid address
        "https://qaviton.com1",  # make sure you enter a valid address

# let's create a function to execute tasks
def execute_tasks(tasks: Tasks, timeout):
    manager = ProcessManager()
    t = time()
        manager.run_until_complete(tasks, timeout=timeout)
        timed_out = None
    except TimeoutError:
        timed_out = format_exc()
    t = time() - t
    print(f'took {round(t, 3)}s\n')
    return timed_out

# now all that's left is to run the tasks
if __name__ == "__main__":
    timeouts = [
        execute_tasks([multi_task for _ in range(1)], timeout=3),
        execute_tasks([multi_task for _ in range(20)], timeout=6),
        execute_tasks([multi_task for _ in range(80)], timeout=9),
    for timeout in timeouts:
        if timeout:


  • for good performance and easy usage
    you should probably stick with using the AsyncManager

  • The ProcessManager uses async operations as well as multi-processing.
    It distributes tasks across cpus, and those tasks are executed using the AsyncManager
    if you want maximum efficiency you should consider using the ProcessManager

  • The ProcessManager uses the multiprocessing module
    and should be treated with it's restrictions & limitations accordingly

  • The ProcessManager gets stuck easily,
    make sure to use timeouts when using it

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qaviton_io-2019. (6.1 kB view hashes)

Uploaded source

Built Distribution

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page