aiomisc - miscellaneous utils for asyncio
Project description
Miscellaneous utils for asyncio.
Installation
Installing from pypi:
pip3 install aiomisc
Installing from github.com:
pip3 install git+https://github.com/mosquito/aiomisc.git
Quick Start
Async entrypoint with logging and useful arguments:
import argparse
import asyncio
import os
import logging
from aiomisc.entrypoint import entrypoint, LogFormat
parser = argparse.ArgumentParser()
parser.add_argument(
"-L", "--log-level", help="Log level",
default=os.getenv('LOG_LEVEL', 'info'),
choices=(
'critical', 'fatal', 'error', 'warning',
'warn', 'info', 'debug', 'notset'
),
)
parser.add_argument(
"--log-format", help="Log format",
default=os.getenv('LOG_FORMAT', 'color'),
choices=LogFormat.choices(),
metavar='LOG_FORMAT',
)
parser.add_argument(
"-D", "--debug", action='store_true',
help="Run loop and application in debug mode"
)
parser.add_argument(
"--pool-size", help="Thread pool size",
default=os.getenv('THREAD_POOL'), type=int,
)
log = logging.getLogger(__name__)
async def main():
log.info('Starting')
await asyncio.sleep(3)
log.info('Exiting')
if __name__ == '__main__':
arg = parser.parse_args()
with entrypoint(log_level=arg.log_level,
log_format=arg.log_format) as loop:
loop.run_until_complete(main())
Install event loop on program start:
import asyncio
from aiomisc.utils import new_event_loop
# Installing uvloop event loop
# and set `aiomisc.thread_pool.ThreadPoolExecutor`
# as default executor
new_event_loop()
async def main():
await asyncio.sleep(3)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Close current event loop and install a new one:
import asyncio
from aiomisc.utils import new_event_loop
async def main():
await asyncio.sleep(3)
if __name__ == '__main__':
loop = new_event_loop()
loop.run_until_complete(main())
Overview:
entrypoint
In generic case the entrypoint helper creates event loop and cancels already running coroutines on exit.
import asyncio
from aiomisc.entrypoint import entrypoint
async def main():
await asyncio.sleep(1)
with entrypoint() as loop:
loop.run_until_complete(main())
Services
Services are abstractions to help organize lots of different tasks in one process. Each service must implement start() method and can implement stop() method.
Service instance should be passed to the entrypoint, and will be started after event loop has been created.
Method start() creates as a separate task that can run forever. Anyway, during graceful shutdown method stop() will be called first, and after that all running tasks will be cancelled (including start()).
This package contains some useful base classes for simple services writing.
TCPServer
TCPServer - it’s a base class for writing TCP servers. Just implement handle_client(reader, writer) to use it.
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
with entrypoint(EchoServer(address='::1', port=8901)) as loop:
loop.run_forever()
UDPServer
UDPServer - it’s a base class for writing UDP servers. Just implement handle_datagram(data, addr) to use it.
class UDPPrinter(UDPServer):
async def handle_datagram(self, data: bytes, addr):
print(addr, '->', data)
with entrypoint(UDPPrinter(address='::1', port=3000)) as loop:
loop.run_forever()
TLSServer
TLSServer - it’s a base class for writing TCP servers with TLS. Just implement handle_client(reader, writer) to use it.
class SecureEchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
service = SecureEchoServer(
address='::1',
port=8900,
ca='ca.pem',
cert='cert.pem',
key='key.pem',
verify=False,
)
with entrypoint(service) as loop:
loop.run_forever()
Multiple services
Pass several service instances to the entrypoint to run all of them. After exiting the entrypoint service instances will be gracefully shut down.
import asyncio
from aiomisc.entrypoint import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer
class LoggingService(Service):
async def start(self):
while True:
print('Hello from service', self.name)
await asyncio.sleep(1)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
class UDPPrinter(UDPServer):
async def handle_datagram(self, data: bytes, addr):
print(addr, '->', data)
services = (
LoggingService(name='#1'),
EchoServer(address='::1', port=8901),
UDPPrinter(address='::1', port=3000),
)
with entrypoint(*services) as loop:
loop.run_forever()
Configuration
Service metaclass accepts all kwargs and will set it to self as attributes.
import asyncio
from aiomisc.entrypoint import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer
class LoggingService(Service):
# required kwargs
__required__ = frozenset({'name'})
# default value
delay: int = 1
async def start(self):
while True:
# attribute ``name`` from kwargs
# must be defined when instance initializes
print('Hello from service', self.name)
# attribute ``delay`` from kwargs
await asyncio.sleep(self.delay)
services = (
LoggingService(name='#1'),
LoggingService(name='#2', delay=3),
)
with entrypoint(*services) as loop:
loop.run_forever()
Context
Services can require each others data. In this case you should use Context.
Context is a repository associated with the running entrypoint.
Context-object will be created when entrypoint starts and linked to the running event loop.
Cross dependent services might await or set each others data via the context.
For service instances self.context is available since entrypoint started. In other cases get_context() function returns current context.
import asyncio
from random import random, randint
from aiomisc.entrypoint import entrypoint, get_context
from aiomisc.service import Service
class LoggingService(Service):
async def start(self):
context = get_context()
wait_time = await context['wait_time']
print('Wait time is', wait_time)
while True:
print('Hello from service', self.name)
await asyncio.sleep(wait_time)
class RemoteConfiguration(Service):
async def start(self):
# querying from remote server
await asyncio.sleep(random())
self.context['wait_time'] = randint(1, 5)
services = (
LoggingService(name='#1'),
LoggingService(name='#2'),
LoggingService(name='#3'),
RemoteConfiguration()
)
with entrypoint(*services) as loop:
loop.run_forever()
aiohttp service
aiohttp application can be started as a service:
import aiohttp.web
import argparse
from aiomisc.entrypoint import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--address", default="::",
help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return aiohttp.web.Response(text=text)
class REST(AIOHTTPService):
async def create_application(self):
app = aiohttp.web.Application()
app.add_routes([
aiohttp.web.get('/', handle),
aiohttp.web.get('/{name}', handle)
])
return app
arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
Class AIOHTTPSSLService is similar to AIOHTTPService but creates HTTPS server. You must pass SSL-required options (see TLSServer class).
Memory Tracer
Simple and useful service for logging large python objects allocated in memory.
import asyncio
import os
from aiomisc.entrypoint import entrypoint
from aiomisc.service import MemoryTracer
async def main():
leaking = []
while True:
leaking.append(os.urandom(128))
await asyncio.sleep(0)
with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
loop.run_until_complete(main())
Output example:
[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
Objects | Obj.Diff | Memory | Mem.Diff | Traceback
12 | 12 | 1.9KiB | 1.9KiB | aiomisc/periodic.py:40
12 | 12 | 1.8KiB | 1.8KiB | aiomisc/entrypoint.py:93
6 | 6 | 1.1KiB | 1.1KiB | aiomisc/thread_pool.py:71
2 | 2 | 976.0B | 976.0B | aiomisc/thread_pool.py:44
5 | 5 | 712.0B | 712.0B | aiomisc/thread_pool.py:52
[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
Objects | Obj.Diff | Memory | Mem.Diff | Traceback
43999 | 43999 | 7.1MiB | 7.1MiB | scratches/scratch_8.py:11
47 | 47 | 4.7KiB | 4.7KiB | env/bin/../lib/python3.7/abc.py:143
33 | 33 | 2.8KiB | 2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
44 | 44 | 2.4KiB | 2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
14 | 14 | 2.4KiB | 2.4KiB | aiomisc/periodic.py:40
timeout decorator
Decorator that ensures the execution time limit for decorated function is met.
from aiomisc.timeout import timeout
@timeout(1)
async def bad_func():
await asyncio.sleep(2)
Async backoff
Abstraction:
attempt_timeout is maximum execution time for one execution attempt.
deadline is maximum execution time for all execution attempts.
pause is time gap between execution attempts.
Decorator that ensures that attempt_timeout and deadline time limits are met by decorated function.
In case of exception function will be called again with similar arguments after pause seconds.
from aiomisc.backoff import asyncbackoff
attempt_timeout = 0.1
deadline = 1
pause = 0.1
@asyncbackoff(attempt_timeout, deadline, pause)
async def db_fetch():
...
@asyncbackoff(0.1, 1, 0.1)
async def db_save(data: dict):
...
asynchronous file operations
Asynchronous files operations. Based on thread-pool under the hood.
from aiomisc.io import async_open
async def db_fetch():
async with async_open('/tmp/test.txt', 'w+') as afp:
await afp.write("Hello")
await afp.write(" ")
await afp.write("world")
await afp.seek(0)
print(await afp.read())
Threaded decorator
Wraps blocking function and runs it in the current thread pool.
import asyncio
import time
from aiomisc.utils import new_event_loop
from aiomisc.thread_pool import threaded
@threaded
def blocking_function():
time.sleep(1)
async def main():
# Running in parallel
await asyncio.gather(
blocking_function(),
blocking_function(),
)
if __name__ == '__main__':
loop = new_event_loop()
loop.run_until_complete(main())
Fast ThreadPoolExecutor
This is a simple thread pool implementation.
Setting as a default thread pool:
import asyncio
from aiomisc.thread_pool import ThreadPoolExecutor
loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor(4, loop=loop)
loop.set_default_executor(thread_pool)
Bind socket
Bind socket and set setblocking(False) for just created socket. This detects address format and select socket family automatically.
from aiomisc.utils import bind_socket
# IPv4 socket
sock = bind_socket(address="127.0.0.1", port=1234)
# IPv6 socket (on Linux IPv4 socket will be bind too)
sock = bind_socket(address="::1", port=1234)
Periodic callback
Runs coroutine function periodically.
import asyncio
import time
from aiomisc.utils import new_event_loop
from aiomisc.periodic import PeriodicCallback
async def periodic_function():
print("Hello")
if __name__ == '__main__':
loop = new_event_loop()
periodic = PeriodicCallback(periodic_function)
# Call it each second
periodic.start(1)
loop.run_forever()
Logging configuration
Color
Setting up colorized logs:
import logging
from aiomisc.log import basic_config
# Configure logging
basic_config(level=logging.INFO, buffered=False, log_format='color')
JSON
Setting up json logs:
import logging
from aiomisc.log import basic_config
# Configure logging
basic_config(level=logging.INFO, buffered=False, log_format='json')
Buffered log handler
Parameter buffered=True enables memory buffer that flushes logs in a thread.
import logging
from aiomisc.log import basic_config
from aiomisc.periodic import PeriodicCallback
from aiomisc.utils import new_event_loop
# Configure logging globally
basic_config(level=logging.INFO, buffered=False, log_format='json')
async def write_log(loop):
logging.info("Hello %f", loop.time())
if __name__ == '__main__':
loop = new_event_loop()
# Configure
basic_config(
level=logging.INFO,
buffered=True,
log_format='color',
flush_interval=2
)
periodic = PeriodicCallback(write_log, loop)
periodic.start(0.3)
loop.run_forever()
Versioning
This software follows Semantic Versioning
How to develop?
Should be installed:
virtualenv
GNU Make as make
Python 3.5+ as python3
For setting up developer environment just type
make develop
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiomisc-2.1.7.tar.gz
.
File metadata
- Download URL: aiomisc-2.1.7.tar.gz
- Upload date:
- Size: 18.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.29.1 CPython/3.6.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e1acbd92184ff7dc0ccb08ad165845c62bbb6a8a9e2114b5e17ecd80a354530c |
|
MD5 | 4d8c8bb55641535884058fff3996380d |
|
BLAKE2b-256 | 95424f73075b0cd0ba77bab75178eb0502152c12a389614d7536c00b950838ce |
File details
Details for the file aiomisc-2.1.7-py3-none-any.whl
.
File metadata
- Download URL: aiomisc-2.1.7-py3-none-any.whl
- Upload date:
- Size: 29.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.29.1 CPython/3.6.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c9d1712ecf148d3a00b1da1358bcbcbae370707ddb50d128d2a76f5c91efefef |
|
MD5 | fcd4491e9f841e36c098a22c09c1159d |
|
BLAKE2b-256 | 55e4f2d940f9fdc0203a241594789c6ba9d37179db4baa9560ca1307a08c0cc8 |