qaviton io
Project description
Qaviton IO
Qaviton IO
is a package with a simple API, making use of python's async & multiprocessing
to enable fast execution of many asyncable operations.
Installation
pip install qaviton-io -U
Requirements
- Python 3.6+
Features
- async task manager
- process task manager
- task logger
Usage
async manager:
from time import time
from requests import get # lets make use of requests to make async http calls
from qaviton_io import AsyncManager, task
# let's create an async manager
m = AsyncManager()
# first we make a simple function to make an http call.
# we want to log the result,
# and make sure that in case of an exception
# the manager won't stop
@task(exceptions=Exception)
def task(): return get("https://qaviton.com")
# this will run async tasks and measure their duration
def run(tasks):
t = time()
m.run(tasks)
t = time() - t
print(f'took {round(t, 3)}s')
# let's run our task once and see how long it takes
run([task for _ in range(1)])
# now let's run our task 20 times and see how long it takes
run([task for _ in range(20)])
# we can assert the collected results here
assert len(m.results) == 21
for r in m.results:
assert r.status_code == 200
# let's view the results in the log report
m.report()
process manager:
"""
make sure your tasks are defined at the module level,
so they can be pickled by multiprocessing
"""
from time import time
from requests import get
from qaviton_io.types import Tasks
from qaviton_io import ProcessManager, task
from traceback import format_exc
# now we make some tasks
# this is a nested task
# we don't want to handle any exceptions
# so in case of failure the parent will not proceed
@task()
def task1(url):
r = get(url)
r.raise_for_status()
# this is the prent task
# we want to handle all exceptions
# so in case of failure the next task will execute
@task(exceptions=Exception)
def multi_task():
for url in [
"https://qaviton.com",
"https://qaviton.co.il", # make sure you enter a valid address
"https://qaviton.com1", # make sure you enter a valid address
]:
task1(url)
# let's create a function to execute tasks
def execute_tasks(tasks: Tasks, timeout):
manager = ProcessManager()
t = time()
try:
manager.run_until_complete(tasks, timeout=timeout)
timed_out = None
except TimeoutError:
timed_out = format_exc()
t = time() - t
manager.report()
print(f'took {round(t, 3)}s\n')
manager.log.clear()
return timed_out
# now all that's left is to run the tasks
if __name__ == "__main__":
timeouts = [
execute_tasks([multi_task for _ in range(1)], timeout=3),
execute_tasks([multi_task for _ in range(20)], timeout=6),
execute_tasks([multi_task for _ in range(80)], timeout=9),
]
for timeout in timeouts:
if timeout:
print(timeout)
notes:
-
for good performance and easy usage
you should probably stick with using the AsyncManager -
The ProcessManager uses async operations as well as multi-processing.
It distributes tasks across cpus, and those tasks are executed using the AsyncManager
if you want maximum efficiency you should consider using the ProcessManager -
The ProcessManager uses the multiprocessing module
and should be treated with it's restrictions & limitations accordingly -
The ProcessManager gets stuck easily,
make sure to use timeouts when using it
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for qaviton_io-2019.11.12.20.52.20.14283.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 264b42ccd7fdf6c8b42b76455216302193c40a6a96a69ab825fad73edfb6f269 |
|
MD5 | e43f257cf937024f791c3a47fd338843 |
|
BLAKE2b-256 | 8482003c0ddd485319868d5419e1a89b1d8bce35115ee6967f17a25f874a5e2b |
Hashes for qaviton_io-2019.11.12.20.52.20.14283-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4f4166ce348dae22f88f3f36868a27acce5efe46dff821dbad03c26b7e9141ae |
|
MD5 | c71cda189b6c5988631d2e43aa550412 |
|
BLAKE2b-256 | 72b86ee198051e898ff6c56d905c883db8a7cacdd3b8774dd67bece0b6cab268 |