Skip to main content

aiomisc - miscellaneous utils for asyncio

Project description

aiomisc - miscellaneous utils for asyncio

Coveralls Travis CI Latest Version https://img.shields.io/pypi/wheel/aiomisc.svg https://img.shields.io/pypi/pyversions/aiomisc.svg https://img.shields.io/pypi/l/aiomisc.svg

Miscellaneous utils for asyncio.

Installation

Installing from pypi:

pip3 install aiomisc

Installing from github.com:

pip3 install git+https://github.com/mosquito/aiomisc.git

Quick Start

Async entrypoint with logging and useful arguments.

import argparse
import asyncio
import os
import logging

from aiomisc.entrypoint import entrypoint, LogFormat


parser = argparse.ArgumentParser()

parser.add_argument(
    "-L", "--log-level", help="Log level",
    default=os.getenv('LOG_LEVEL', 'info'),
    choices=(
        'critical', 'fatal', 'error', 'warning',
        'warn', 'info', 'debug', 'notset'
    ),
)

parser.add_argument(
    "--log-format", help="Log format",
    default=os.getenv('LOG_FORMAT', 'color'),
    choices=LogFormat.choices(),
    metavar='LOG_FORMAT',
)

parser.add_argument(
    "-D", "--debug", action='store_true',
    help="Run loop and application in debug mode"
)


parser.add_argument(
    "--pool-size", help="Thread pool size",
    default=os.getenv('THREAD_POOL'), type=int,
)


log = logging.getLogger(__name__)


async def main():
    log.info('Starting')
    await asyncio.sleep(3)
    log.info('Exiting')


if __name__ == '__main__':
    arg = parser.parse_args()

    with entrypoint(log_level=arg.log_level,
                    log_format=arg.log_format) as loop:
        loop.run_until_complete(main())

Install event loop on the program starts.

import asyncio
from aiomisc.utils import new_event_loop


# Installing uvloop event loop
# and set `aiomisc.thread_pool.ThreadPoolExecutor`
# as default executor
new_event_loop()


async def main():
    await asyncio.sleep(3)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Close current event loop and install the new one:

import asyncio
from aiomisc.utils import new_event_loop


async def main():
    await asyncio.sleep(3)


if __name__ == '__main__':
    loop = new_event_loop()
    loop.run_until_complete(main())

entrypoint

Running and graceful shutdown multiple services in one process.

import asyncio
from aiomisc.entrypoint import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(Service):
    async def start(self):
        while True:
            print('Hello from service', self.name)
            await asyncio.sleep(1)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())


class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


services = (
    LoggingService(name='#1'),
    EchoServer(address='::1', port=8901),
    UDPPrinter(address='::1', port=3000),
)


with entrypoint(*services) as loop:
    loop.run_forever()

Service for aiohttp

Installed aiohttp required.

import aiohttp.web
from aiomisc.entrypoint import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return aiohttp.web.Response(text=text)


class REST(AIOHTTPService):
    async def create_application(self):
        app = aiohttp.web.Application()

        app.add_routes([
            aiohttp.web.get('/', handle),
            aiohttp.web.get('/{name}', handle)
        ])

        return app


service = REST(address='127.0.0.1', port=8080)


with entrypoint(service) as loop:
    loop.run_forever()

threaded decorator

Wraps blocking function and run it on the thread pool.

import asyncio
import time
from aiomisc.utils import new_event_loop
from aiomisc.thread_pool import threaded


@threaded
def blocking_function():
    time.sleep(1)


async def main():
    # Running in parallel
    await asyncio.gather(
        blocking_function(),
        blocking_function(),
    )


if __name__ == '__main__':
    loop = new_event_loop()
    loop.run_until_complete(main())

Fast ThreadPoolExecutor

This is the simple thread pool implementation.

Installation as a default thread pool:

import asyncio
from aiomisc.thread_pool import ThreadPoolExecutor

loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor(4, loop=loop)
loop.set_default_executor(thread_pool)

Bind socket

from aiomisc.utils import bind_socket

# IPv4 socket
sock = bind_socket(address="127.0.0.1", port=1234)

# IPv6 socket (on Linux IPv4 socket will be bind too)
sock = bind_socket(address="::1", port=1234)

Periodic callback

Runs coroutine function periodically

import asyncio
import time
from aiomisc.utils import new_event_loop
from aiomisc.periodic import PeriodicCallback


async def periodic_function():
    print("Hello")


if __name__ == '__main__':
    loop = new_event_loop()

    periodic = PeriodicCallback(periodic_function)

    # Call it each second
    periodic.start(1)

    loop.run_forever()

Logging configuration

Setting up colorized logs:

import logging
from aiomisc.log import basic_config


# Configure logging
basic_config(level=logging.INFO, buffered=False, log_format='color')

Setting up json logs:

import logging
from aiomisc.log import basic_config


# Configure logging
basic_config(level=logging.INFO, buffered=False, log_format='json')

Buffered log handler

Parameter buffered=True enables memory buffer which flushing logs in thread.

import logging
from aiomisc.log import basic_config
from aiomisc.periodic import PeriodicCallback
from aiomisc.utils import new_event_loop


# Configure logging globally
basic_config(level=logging.INFO, buffered=False, log_format='json')

async def write_log(loop):
    logging.info("Hello %f", loop.time())

if __name__ == '__main__':
    loop = new_event_loop()

    # Configure
    basic_config(
        level=logging.INFO,
        buffered=True,
        log_format='color',
        flush_interval=2
    )

    periodic = PeriodicCallback(write_log, loop)
    periodic.start(0.3)

    loop.run_forever()

Useful services

Memory Tracer

Simple and useful service for logging the largest python objects allocated in memory.

import asyncio
import os
from aiomisc.entrypoint import entrypoint
from aiomisc.service import MemoryTracer


async def main():
    leaking = []

    while True:
        leaking.append(os.urandom(128))
        await asyncio.sleep(0)


with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
    loop.run_until_complete(main())

This example will be log something like this each second.

[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
      12 |       12 |   1.9KiB |   1.9KiB | aiomisc/periodic.py:40
      12 |       12 |   1.8KiB |   1.8KiB | aiomisc/entrypoint.py:93
       6 |        6 |   1.1KiB |   1.1KiB | aiomisc/thread_pool.py:71
       2 |        2 |   976.0B |   976.0B | aiomisc/thread_pool.py:44
       5 |        5 |   712.0B |   712.0B | aiomisc/thread_pool.py:52

[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
   43999 |    43999 |   7.1MiB |   7.1MiB | scratches/scratch_8.py:11
      47 |       47 |   4.7KiB |   4.7KiB | env/bin/../lib/python3.7/abc.py:143
      33 |       33 |   2.8KiB |   2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
      44 |       44 |   2.4KiB |   2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
      14 |       14 |   2.4KiB |   2.4KiB | aiomisc/periodic.py:40

Versioning

This software follows Semantic Versioning

How to develop?

Should be installed:

  • virtualenv

  • GNU Make as make

  • Python 3.5+ as python3

For setting up developer environment just type:

make develop

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiomisc-0.10.6.tar.gz (14.4 kB view details)

Uploaded Source

Built Distribution

aiomisc-0.10.6-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file aiomisc-0.10.6.tar.gz.

File metadata

  • Download URL: aiomisc-0.10.6.tar.gz
  • Upload date:
  • Size: 14.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.18.4 setuptools/33.1.1 requests-toolbelt/0.8.0 tqdm/4.20.0 CPython/3.6.5

File hashes

Hashes for aiomisc-0.10.6.tar.gz
Algorithm Hash digest
SHA256 e1f0bc69a1e920f189cc5dfe7dca342f7794504d4357e84ac2301a750cad55ed
MD5 4c7a62545e94b9e983e5060225e0f617
BLAKE2b-256 bb28702b9a4c95904c9a5d30d08d812a8b601569dc2c8288cae82e39811d0bd8

See more details on using hashes here.

File details

Details for the file aiomisc-0.10.6-py3-none-any.whl.

File metadata

  • Download URL: aiomisc-0.10.6-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.18.4 setuptools/33.1.1 requests-toolbelt/0.8.0 tqdm/4.20.0 CPython/3.6.5

File hashes

Hashes for aiomisc-0.10.6-py3-none-any.whl
Algorithm Hash digest
SHA256 90d85bad05bf5b2da332596ef9b9f14ccb9be5299f166fcddc5ba475833392a0
MD5 ca2cf760c5fa18bc25227e791b55a785
BLAKE2b-256 77694c5b0244f0317c5c28d5b81b968c37c770b790c30270fa5780b1998612c1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page