qaviton io
Project description
Qaviton IO
Qaviton IO
is a package with a simple API, making use of python's async & multiprocessing
to enable fast execution of many asyncable operations.
Installation
pip install qaviton-io -U
Requirements
- Python 3.6+
Features
- async task manager
- process task manager
- task logger
Usage
async manager:
from time import time from requests import get # lets make use of requests to make async http calls from qaviton_io import AsyncManager, task # let's create an async manager m = AsyncManager() # first we make a simple function to make an http call. # we want to log the result, # and make sure that in case of an exception # the manager won't stop @task(exceptions=Exception) def task(): return get("https://qaviton.com") # this will run async tasks and measure their duration def run(tasks): t = time() m.run(tasks) t = time() - t print(f'took {round(t, 3)}s') # let's run our task once and see how long it takes run([task for _ in range(1)]) # now let's run our task 20 times and see how long it takes run([task for _ in range(20)]) # we can assert the collected results here assert len(m.results) == 21 for r in m.results: assert r.status_code == 200 # let's view the results in the log report m.report()
process manager:
""" make sure your tasks are defined at the module level, so they can be pickled by multiprocessing """ from time import time from requests import get from qaviton_io.types import Tasks from qaviton_io import ProcessManager, task from traceback import format_exc # now we make some tasks # this is a nested task # we don't want to handle any exceptions # so in case of failure the parent will not proceed @task() def task1(url): r = get(url) r.raise_for_status() # this is the prent task # we want to handle all exceptions # so in case of failure the next task will execute @task(exceptions=Exception) def multi_task(): for url in [ "https://qaviton.com", "https://qaviton.co.il", # make sure you enter a valid address "https://qaviton.com1", # make sure you enter a valid address ]: task1(url) # let's create a function to execute tasks def execute_tasks(tasks: Tasks, timeout): manager = ProcessManager() t = time() try: manager.run_until_complete(tasks, timeout=timeout) timed_out = None except TimeoutError: timed_out = format_exc() t = time() - t manager.report() print(f'took {round(t, 3)}s\n') manager.log.clear() return timed_out # now all that's left is to run the tasks if __name__ == "__main__": timeouts = [ execute_tasks([multi_task for _ in range(1)], timeout=3), execute_tasks([multi_task for _ in range(20)], timeout=6), execute_tasks([multi_task for _ in range(80)], timeout=9), ] for timeout in timeouts: if timeout: print(timeout)
notes:
-
for good performance and easy usage
you should probably stick with using the AsyncManager -
The ProcessManager uses async operations as well as multi-processing.
It distributes tasks across cpus, and those tasks are executed using the AsyncManager
if you want maximum efficiency you should consider using the ProcessManager -
The ProcessManager uses the multiprocessing module
and should be treated with it's restrictions & limitations accordingly -
The ProcessManager gets stuck easily,
make sure to use timeouts when using it
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for qaviton_io-2019.11.13.13.29.23.745334.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2a9ca521718d8f42f1f3416383fc824b15641fbd7307a9ccd9640b5f6cd8796b |
|
MD5 | 22d370c191febd43289e9f00ee07decc |
|
BLAKE2-256 | f87bc11acc012a29c8f44a9351dbc1242bb000765a401f24aaecc7fa345bb356 |
Hashes for qaviton_io-2019.11.13.13.29.23.745334-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d4388c72ee6d8b6dc74f1f05209982d30d4cdfea577d00464a718add3c3ff9da |
|
MD5 | c9429e290746e6c5f7ac7460ba9773c7 |
|
BLAKE2-256 | c5ef3619954378949e82db6c8dcee65ad880627501722a2e5f23eaca4cc52eef |