A Python 3.5+ library that integrates the multiprocessing module with asyncio.
Project description
aioprocessing
aioprocessing
provides asynchronous, asyncio
compatible, coroutine
versions of many blocking instance methods on objects in the multiprocessing
library. Here's an example demonstrating the aioprocessing
versions of
Event
, Queue
, and Lock
:
import time
import asyncio
import aioprocessing
def func(queue, event, lock, items):
""" Demo worker function.
This worker function runs in its own process, and uses
normal blocking calls to aioprocessing objects, exactly
the way you would use oridinary multiprocessing objects.
"""
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
async def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = await queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
await p.coro_join()
async def example2(queue, event, lock):
await event.coro_wait()
async with lock:
await queue.coro_put(78)
await queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.ensure_future(example(queue, event, lock)),
asyncio.ensure_future(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
The aioprocessing objects can be used just like their multiprocessing
equivalents - as they are in func
above - but they can also be
seamlessly used inside of asyncio
coroutines, without ever blocking
the event loop.
How does it work?
In most cases, this library makes blocking calls to multiprocessing
methods
asynchronous by executing the call in a ThreadPoolExecutor
, using
asyncio.run_in_executor()
.
It does not re-implement multiprocessing using asynchronous I/O. This means
there is extra overhead added when you use aioprocessing
objects instead of
multiprocessing
objects, because each one is generally introducing a
ThreadPoolExecutor
containing at least one threading.Thread
. It also means
that all the normal risks you get when you mix threads with fork apply here, too
(See http://bugs.python.org/issue6721 for more info).
The one exception to this is aioprocessing.AioPool
, which makes use of the
existing callback
and error_callback
keyword arguments in the various
Pool.*_async
methods to run them as asyncio
coroutines. Note that
multiprocessing.Pool
is actually using threads internally, so the thread/fork
mixing caveat still applies.
Each multiprocessing
class is replaced by an equivalent aioprocessing
class,
distinguished by the Aio
prefix. So, Pool
becomes AioPool
, etc. All methods
that could block on I/O also have a coroutine version that can be used with asyncio
. For example, multiprocessing.Lock.acquire()
can be replaced with aioprocessing.AioLock.coro_acquire()
. You can pass an asyncio
EventLoop object to any coro_*
method using the loop
keyword argument. For example, lock.coro_acquire(loop=my_loop)
.
Note that you can also use the aioprocessing
synchronization primitives as replacements
for their equivalent threading
primitives, in single-process, multi-threaded programs
that use asyncio
.
What parts of multiprocessing are supported?
Most of them! All methods that could do blocking I/O in the following objects
have equivalent versions in aioprocessing
that extend the multiprocessing
versions by adding coroutine versions of all the blocking methods.
Pool
Process
Pipe
Lock
RLock
Semaphore
BoundedSemaphore
Event
Condition
Barrier
connection.Connection
connection.Listener
connection.Client
Queue
JoinableQueue
SimpleQueue
- All
managers.SyncManager
Proxy
versions of the items above (SyncManager.Queue
,SyncManager.Lock()
, etc.).
What versions of Python are compatible?
aioprocessing
will work out of the box on Python 3.5+.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aioprocessing-1.1.0.tar.gz
.
File metadata
- Download URL: aioprocessing-1.1.0.tar.gz
- Upload date:
- Size: 23.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/51.0.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4603c86ff3fea673d4c643ad3adc519988cd778771b75079bc3be9e5ed4c5b66 |
|
MD5 | b4bcffc4283aec295338bbd36ecfd8cc |
|
BLAKE2b-256 | 695795688231f241f327122d53c8fc633a4adc10521b2891506247fa5a57f5bb |
File details
Details for the file aioprocessing-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: aioprocessing-1.1.0-py3-none-any.whl
- Upload date:
- Size: 14.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/51.0.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 21a61c9c3d9089349e4847206028308ba2146e081f5ecd9195cca3e12ed45345 |
|
MD5 | eb08764520895beff49a9fc5e2ee948f |
|
BLAKE2b-256 | d9d0f8282cd77b8eb88ed581c7b7bdbe19fe9d834f5661b92c8a8327a511fac7 |