qaviton io
Project description
Qaviton IO
Qaviton IO
is a package with a simple API, making use of python's async & multiprocessing
to enable fast execution of many asyncable operations.
Installation
pip install qaviton-io -U
Requirements
- Python 3.6+
Features
- async task manager
- process task manager
- task logger
Usage
async manager:
from time import time
from requests import get # lets make use of requests to make async http calls
from qaviton_io import AsyncManager, task
# let's create an async manager
m = AsyncManager()
# first we make a simple function to make an http call.
# we want to log the result,
# and make sure that in case of an exception
# the manager won't stop
@task(exceptions=Exception)
def task(): return get("https://qaviton.com")
# this will run async tasks and measure their duration
def run(tasks):
t = time()
m.run(tasks)
t = time() - t
print(f'took {round(t, 3)}s')
# let's run our task once and see how long it takes
run([task for _ in range(1)])
# now let's run our task 20 times and see how long it takes
run([task for _ in range(20)])
# we can assert the collected results here
assert len(m.results) == 21
for r in m.results:
assert r.status_code == 200
# let's view the results in the log report
m.report()
process manager:
"""
make sure your tasks are defined at the module level,
so they can be pickled by multiprocessing
"""
from time import time
from requests import get
from qaviton_io.types import Tasks
from qaviton_io import ProcessManager, task
from traceback import format_exc
# now we make some tasks
# this is a nested task
# we don't want to handle any exceptions
# so in case of failure the parent will not proceed
@task()
def task1(url):
r = get(url)
r.raise_for_status()
# this is the prent task
# we want to handle all exceptions
# so in case of failure the next task will execute
@task(exceptions=Exception)
def multi_task():
for url in [
"https://qaviton.com",
"https://qaviton.co.il", # make sure you enter a valid address
"https://qaviton.com1", # make sure you enter a valid address
]:
task1(url)
# let's create a function to execute tasks
def execute_tasks(tasks: Tasks, timeout):
manager = ProcessManager()
t = time()
try:
manager.run_until_complete(tasks, timeout=timeout)
timed_out = None
except TimeoutError:
timed_out = format_exc()
t = time() - t
manager.report()
print(f'took {round(t, 3)}s\n')
manager.log.clear()
return timed_out
# now all that's left is to run the tasks
if __name__ == "__main__":
timeouts = [
execute_tasks([multi_task for _ in range(1)], timeout=3),
execute_tasks([multi_task for _ in range(20)], timeout=6),
execute_tasks([multi_task for _ in range(80)], timeout=9),
]
for timeout in timeouts:
if timeout:
print(timeout)
notes:
-
for good performance and easy usage
you should probably stick with using the AsyncManager -
The ProcessManager uses async operations as well as multi-processing.
It distributes tasks across cpus, and those tasks are executed using the AsyncManager
if you want maximum efficiency you should consider using the ProcessManager -
The ProcessManager uses the multiprocessing module
and should be treated with it's restrictions & limitations accordingly -
The ProcessManager gets stuck easily,
make sure to use timeouts when using it
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for qaviton_io-2019.11.13.13.29.23.745334.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2a9ca521718d8f42f1f3416383fc824b15641fbd7307a9ccd9640b5f6cd8796b |
|
MD5 | 22d370c191febd43289e9f00ee07decc |
|
BLAKE2b-256 | f87bc11acc012a29c8f44a9351dbc1242bb000765a401f24aaecc7fa345bb356 |
Hashes for qaviton_io-2019.11.13.13.29.23.745334-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d4388c72ee6d8b6dc74f1f05209982d30d4cdfea577d00464a718add3c3ff9da |
|
MD5 | c9429e290746e6c5f7ac7460ba9773c7 |
|
BLAKE2b-256 | c5ef3619954378949e82db6c8dcee65ad880627501722a2e5f23eaca4cc52eef |