Skip to main content

A Python 3.5+ library that integrates the multiprocessing module with asyncio.

Project description

aioprocessing

Build Status

aioprocessing provides asynchronous, asyncio compatible, coroutine versions of many blocking instance methods on objects in the multiprocessing library. To use dill for universal pickling, install using pip install aioprocessing[dill]. Here's an example demonstrating the aioprocessing versions of Event, Queue, and Lock:

import time
import asyncio
import aioprocessing


def func(queue, event, lock, items):
    """ Demo worker function.

    This worker function runs in its own process, and uses
    normal blocking calls to aioprocessing objects, exactly 
    the way you would use oridinary multiprocessing objects.

    """
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()


async def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
    p.start()
    while True:
        result = await queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    await p.coro_join()

async def example2(queue, event, lock):
    await event.coro_wait()
    async with lock:
        await queue.coro_put(78)
        await queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [
        asyncio.ensure_future(example(queue, event, lock)), 
        asyncio.ensure_future(example2(queue, event, lock)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

The aioprocessing objects can be used just like their multiprocessing equivalents - as they are in func above - but they can also be seamlessly used inside of asyncio coroutines, without ever blocking the event loop.

What's new

v2.0.0

  • Add support for universal pickling using dill, installable with pip install aioprocessing[dill]. The library will now attempt to import multiprocess, falling back to stdlib multiprocessing. Force stdlib behaviour by setting a non-empty environment variable AIOPROCESSING_DILL_DISABLED=1. This can be used to avoid errors when attempting to combine aioprocessing[dill] with stdlib multiprocessing based objects like concurrent.futures.ProcessPoolExecutor.

How does it work?

In most cases, this library makes blocking calls to multiprocessing methods asynchronous by executing the call in a ThreadPoolExecutor, using asyncio.run_in_executor(). It does not re-implement multiprocessing using asynchronous I/O. This means there is extra overhead added when you use aioprocessing objects instead of multiprocessing objects, because each one is generally introducing a ThreadPoolExecutor containing at least one threading.Thread. It also means that all the normal risks you get when you mix threads with fork apply here, too (See http://bugs.python.org/issue6721 for more info).

The one exception to this is aioprocessing.AioPool, which makes use of the existing callback and error_callback keyword arguments in the various Pool.*_async methods to run them as asyncio coroutines. Note that multiprocessing.Pool is actually using threads internally, so the thread/fork mixing caveat still applies.

Each multiprocessing class is replaced by an equivalent aioprocessing class, distinguished by the Aio prefix. So, Pool becomes AioPool, etc. All methods that could block on I/O also have a coroutine version that can be used with asyncio. For example, multiprocessing.Lock.acquire() can be replaced with aioprocessing.AioLock.coro_acquire(). You can pass an asyncio EventLoop object to any coro_* method using the loop keyword argument. For example, lock.coro_acquire(loop=my_loop).

Note that you can also use the aioprocessing synchronization primitives as replacements for their equivalent threading primitives, in single-process, multi-threaded programs that use asyncio.

What parts of multiprocessing are supported?

Most of them! All methods that could do blocking I/O in the following objects have equivalent versions in aioprocessing that extend the multiprocessing versions by adding coroutine versions of all the blocking methods.

  • Pool
  • Process
  • Pipe
  • Lock
  • RLock
  • Semaphore
  • BoundedSemaphore
  • Event
  • Condition
  • Barrier
  • connection.Connection
  • connection.Listener
  • connection.Client
  • Queue
  • JoinableQueue
  • SimpleQueue
  • All managers.SyncManager Proxy versions of the items above (SyncManager.Queue, SyncManager.Lock(), etc.).

What versions of Python are compatible?

aioprocessing will work out of the box on Python 3.5+.

Gotchas

Keep in mind that, while the API exposes coroutines for interacting with multiprocessing APIs, internally they are almost always being delegated to a ThreadPoolExecutor, this means the caveats that apply with using ThreadPoolExecutor with asyncio apply: namely, you won't be able to cancel any of the coroutines, because the work being done in the worker thread can't be interrupted.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioprocessing-2.0.0.tar.gz (25.0 kB view details)

Uploaded Source

Built Distribution

aioprocessing-2.0.0-py3-none-any.whl (14.8 kB view details)

Uploaded Python 3

File details

Details for the file aioprocessing-2.0.0.tar.gz.

File metadata

  • Download URL: aioprocessing-2.0.0.tar.gz
  • Upload date:
  • Size: 25.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.5.0 pkginfo/1.6.1 requests/2.25.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.8.3

File hashes

Hashes for aioprocessing-2.0.0.tar.gz
Algorithm Hash digest
SHA256 469dfb746e8c4e0c727ba135cfabf9e034c554f6a73c27f908bfe3625dd74b9e
MD5 838b6aa073f1373bd42204f86b0e3196
BLAKE2b-256 8e3e54266241660fb026bfd27f660d44cd81a4b7f8a145d8e2db010de12622a0

See more details on using hashes here.

File details

Details for the file aioprocessing-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: aioprocessing-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.5.0 pkginfo/1.6.1 requests/2.25.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.8.3

File hashes

Hashes for aioprocessing-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 00757aa3ae8d99a7e41079d7b5fcdcbb7794b1d454961e8c6601e3d4be63c03f
MD5 8ee47282665a2c2d8cc2843093b15a4c
BLAKE2b-256 7932b6c2b59e4580cccfc4d589f277e92d6924bd94047c95e2a3b1edf19ac7e2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page