Skip to main content

Async distributed process pool using asyncio

Project description

Build PyPi Documentation

Introduction

Distex offers a distributed process pool to utilize multiple CPUs or machines. It uses asyncio to efficiently manage the worker processes.

Features:

  • Scales from 1 to 1000’s of processors;

  • Can handle in the order of 50.000 small tasks per second;

  • Easy to use with SSH (secure shell) hosts;

  • Full async support;

  • Maps over unbounded iterables;

  • Compatible with concurrent.futures.ProcessPool (or PEP3148).

Installation

pip3 install -U distex

When using remote hosts then distex must be installed on those too. Make sure that the distex_proc script can be found in the path.

For SSH hosts: Authentication should be done with SSH keys since there is no support for passwords. The remote installation can be tested with:

ssh <host> distex_proc

Dependencies:

  • Python version 3.6 or higher;

  • On Unix the uvloop package is recommended: pip3 install uvloop

  • SSH client and server (optional).

Examples

A process pool can have local and remote workers. Here is a pool that uses 4 local workers:

from distex import Pool

def f(x):
    return x*x

pool = Pool(4)
for y in pool.map(f, range(100)):
    print(y)

To create a pool that also uses 8 workers on host maxi, using ssh:

pool = Pool(4, 'ssh://maxi/8')

To use a pool in combination with eventkit:

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

There is full support for every asynchronous construct imaginable:

import asyncio
from distex import Pool

def init():
    # pool initializer: set the start time for every worker
    import time
    import builtins
    builtins.t0 = time.time()

async def timer(i=0):
    # async code running in the pool
    import time
    import asyncio
    await asyncio.sleep(1)
    return time.time() - t0

async def ait():
    # async iterator running on the user side
    for i in range(20):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async with Pool(4, initializer=init, qsize=1) as pool:
        async for t in pool.map_async(timer, ait()):
            print(t)
        print(await pool.run_on_all_async(timer))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

High level architecture

Distex does not use remote ‘task servers’. Instead it is done the other way around: A local server is started first; Then the local and remote workers are started and each of them will connect on its own back to the server. When all workers have connected then the pool is ready for duty.

Each worker consists of a single-threaded process that is running an asyncio event loop. This loop is used both for communication and for running asynchronous tasks. Synchronous tasks are run in a blocking fashion.

When using ssh, a remote (or ‘reverse’) tunnel is created from a remote Unix socket to the local Unix socket that the local server is listening on. Multiple workers on a remote machine will use the same Unix socket and share the same ssh tunnel.

The plain ssh executable is used instead of much nicer solutions such as AsyncSSH. This is to keep the CPU usage of encrypting/decrypting outside of the event loop and offload it to the ssh process(es).

Documentation

Distex documentation

author:

Ewald de Wit <ewald.de.wit@gmail.com>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distex-0.7.1.tar.gz (17.8 kB view details)

Uploaded Source

Built Distribution

distex-0.7.1-py3-none-any.whl (19.2 kB view details)

Uploaded Python 3

File details

Details for the file distex-0.7.1.tar.gz.

File metadata

  • Download URL: distex-0.7.1.tar.gz
  • Upload date:
  • Size: 17.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.50.0 CPython/3.9.0

File hashes

Hashes for distex-0.7.1.tar.gz
Algorithm Hash digest
SHA256 db80eb376240d3d5f0caa04298a9c9611c59595203b3c9813ebbd832717d53e1
MD5 a584d5661c9101a20cb7e06b3a9c6392
BLAKE2b-256 8da288b165442fc970ec48ec8ac5c52705c8b9d435466903bbd40f2a48b44fdb

See more details on using hashes here.

File details

Details for the file distex-0.7.1-py3-none-any.whl.

File metadata

  • Download URL: distex-0.7.1-py3-none-any.whl
  • Upload date:
  • Size: 19.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.50.0 CPython/3.9.0

File hashes

Hashes for distex-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 672b14b65435e54790ab60cfa57a0f4ff08d7f28b4eeb534bacaf3ada9bf7f93
MD5 a7e8e0462a8e05442acb19f49fcfcf4c
BLAKE2b-256 dd586114279d921d7a23a31684be9f937476919f52fd37de03a5e16e0748a9ab

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page