Skip to main content

Builder for performance-efficient prediction.

Project description

Framework for performance-efficient prediction.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service

  • All optimisations in one library

  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples.

from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""

    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here

class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)


class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())

Batching

Aqueduct supports the ability to process tasks with batches. To use batching you simply should specify batch_size parameter that indicates, how many tasks could be in a batch. You can determine correct batch_size for your handler by manually measuring handler performance for different batch sizes.

import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
        def __init__(self, array: np.array, *args, **kwargs):
                super().__init__(*args, **kwargs)
                self.array = array
                self.result = None


class CatDetector:
        """GPU model emulator that predicts the presence of the cat in the image."""
        IMAGE_PROCESS_TIME = 0.01
        BATCH_REDUCTION_FACTOR = 0.7
        OVERHEAD_TIME = 0.02
        BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

        def predict(self, images: np.array) -> np.array:
                """Always says that there is a cat in the image.

                The image is represented by a one-dimensional array.
                The model spends less time for processing batch of images due to GPU optimizations. It's emulated
                with BATCH_REDUCTION_FACTOR coefficient.
                """
                batch_size = images.shape[0]
                if batch_size == 1:
                        time.sleep(self.IMAGE_PROCESS_TIME)
                else:
                        time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
                return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
        def handle(self, *tasks: ArrayFieldTask):
                images = np.array([task.array for task in tasks])
                predicts = CatDetector().predict(images)
                for task, predict in zip(tasks, predicts):
                        task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
        return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
        await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
        process_tasks(flow_with_batch_handler, tasks_batch),
        timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
        FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
        process_tasks(flow_with_batch_handler, tasks_batch),
        timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Aqueduct (by default) does not guaranty that handler would always be getting exact batch size. It may be less than batch_size, but newer greater. That is because we are not waiting for batch to be fully collected. This allows as to avoid overhead for low load scenarios, and on the other hand, if input requests would be frequent enough, real batch would always be equal batch_size. If you find that your handler performs better with specific, exact batch size, you can use additional batch_timeout parameter to specify time to wait for full batch to be collected.

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with __Sentry__, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
        dsn = os.getenv('SENTRY_DSN')
        sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
        log.addHandler(sentry_handler)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aqueduct-1.10.1.tar.gz (38.7 kB view details)

Uploaded Source

File details

Details for the file aqueduct-1.10.1.tar.gz.

File metadata

  • Download URL: aqueduct-1.10.1.tar.gz
  • Upload date:
  • Size: 38.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for aqueduct-1.10.1.tar.gz
Algorithm Hash digest
SHA256 21148e7a2c7c470f8875f5929876222d60be2dd299bbccd2d150bb268e0ca931
MD5 861214a34c0d5524ef7f0575d2740be9
BLAKE2b-256 75b9c8d2880a13808db3f225c83efdfa04bf6bf4223047ef61a2b70046097948

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page