Skip to main content

A Python 3.3+ library that integrates the multiprocessing module with asyncio.

Project description

aioprocessing
=============
[![Build Status](https://travis-ci.org/dano/aioprocessing.svg?branch=master)](https://travis-ci.org/dano/aioprocessing)


`aioprocessing` provides asynchronous, [`asyncio`](https://docs.python.org/3/library/asyncio.html) compatible, coroutine
versions of many blocking instance methods on objects in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)
library. Here's an example demonstrating the `aioprocessing` versions of
`Event`, `Queue`, and `Lock`:

```python
import time
import asyncio
import aioprocessing
import multiprocessing


def func(queue, event, lock, items):
""" Demo worker function.

This worker function runs in its own process, and uses
normal blocking calls to aioprocessing objects, exactly
the way you would use oridinary multiprocessing objects.

"""
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()

@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.ensure_future(example(queue, event, lock)),
asyncio.ensure_future(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
```

Python 3.5 syntax is supported, too. This means the `example2` function above
could look like this:

```python
async def example2(queue, event, lock):
await event.coro_wait()
async with lock:
await queue.coro_put(78)
await queue.coro_put(None) # Shut down the worker
```

The aioprocessing objects can be used just like their multiprocessing
equivalents - as they are in `func` above - but they can also be
seamlessly used inside of `asyncio` coroutines, without ever blocking
the event loop.


How does it work?
-----------------

In most cases, this library makes blocking calls to `multiprocessing` methods
asynchronous by executing the call in a [`ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor), using
[`asyncio.run_in_executor()`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_in_executor).
It does *not* re-implement multiprocessing using asynchronous I/O. This means
there is extra overhead added when you use `aioprocessing` objects instead of
`multiprocessing` objects, because each one is generally introducing a
`ThreadPoolExecutor` containing at least one [`threading.Thread`](https://docs.python.org/2/library/threading.html#thread-objects). It also means
that all the normal risks you get when you mix threads with fork apply here, too
(See http://bugs.python.org/issue6721 for more info).

The one exception to this is `aioprocessing.AioPool`, which makes use of the
existing `callback` and `error_callback` keyword arguments in the various
[`Pool.*_async`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async) methods to run them as `asyncio` coroutines. Note that
`multiprocessing.Pool` is actually using threads internally, so the thread/fork
mixing caveat still applies.

Each `multiprocessing` class is replaced by an equivalent `aioprocessing` class,
distinguished by the `Aio` prefix. So, `Pool` becomes `AioPool`, etc. All methods
that could block on I/O also have a coroutine version that can be used with `asyncio`. For example, `multiprocessing.Lock.acquire()` can be replaced with `aioprocessing.AioLock.coro_acquire()`. You can pass an `asyncio` EventLoop object to any `coro_*` method using the `loop` keyword argument. For example, `lock.coro_acquire(loop=my_loop)`.

Note that you can also use the `aioprocessing` synchronization primitives as replacements
for their equivalent `threading` primitives, in single-process, multi-threaded programs
that use `asyncio`.


What parts of multiprocessing are supported?
--------------------------------------------

Most of them! All methods that could do blocking I/O in the following objects
have equivalent versions in `aioprocessing` that extend the `multiprocessing`
versions by adding coroutine versions of all the blocking methods.

- `Pool`
- `Process`
- `Pipe`
- `Lock`
- `RLock`
- `Semaphore`
- `BoundedSemaphore`
- `Event`
- `Condition`
- `Barrier`
- `connection.Connection`
- `connection.Listener`
- `connection.Client`
- `Queue`
- `JoinableQueue`
- `SimpleQueue`
- All `managers.SyncManager` `Proxy` versions of the items above (`SyncManager.Queue`, `SyncManager.Lock()`, etc.).


What versions of Python are compatible?
---------------------------------------

`aioprocessing` will work out of the box on Python 3.4+.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioprocessing-1.0.1.tar.gz (12.1 kB view details)

Uploaded Source

Built Distribution

aioprocessing-1.0.1-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file aioprocessing-1.0.1.tar.gz.

File metadata

File hashes

Hashes for aioprocessing-1.0.1.tar.gz
Algorithm Hash digest
SHA256 b6952b476586c2a2e0d2802f42d73e6898ab474213ffda788d720a3fb57b01fb
MD5 868ead8536e0bcaa5e573f6df8da7192
BLAKE2b-256 38b17c24e264f240f89310ed95bf5257566deb4ae719e306e7eb6e654459383d

See more details on using hashes here.

File details

Details for the file aioprocessing-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for aioprocessing-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9b88f4e51b7358e7a53ea2b6cfd1e000848270a1b09532a410b6d38e015b7de5
MD5 7a6be1d600cda7e117caabd70db2c180
BLAKE2b-256 a287edcfc09731bd94f9776e34587939948ba5f67b995773990e5896657e9655

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page