Skip to main content

Multiprocess communication pipes for asyncio

Project description

aiopipe -- Multiprocess communication pipes for asyncio

Documentation Build status

This package wraps the os.pipe simplex communication pipe so it can be used as part of the non-blocking asyncio event loop. A duplex pipe is also provided, which allows reading and writing on both ends.

Simplex example

The following example opens a pipe with the write end in the child process and the read end in the parent process.

>>> from multiprocessing import Process
>>> import asyncio
>>>
>>> from aiopipe import aiopipe
>>>
>>> async def main():
...     rx, tx = aiopipe()
...
...     with tx.detach() as tx:
...         proc = Process(target=childproc, args=(tx,))
...         proc.start()
...
...     # The write end is now available in the child process
...     # and detached from the parent process.
...
...     async with rx.open() as rx:
...         msg = await rx.readline()
...
...     proc.join()
...     return msg
>>>
>>> def childproc(tx):
...     asyncio.run(childtask(tx))
>>>
>>> async def childtask(tx):
...     async with tx.open() as tx:
...         tx.write(b"hi from the child process\n")
>>>
>>> asyncio.run(main())
b'hi from the child process\n'
>>>

Duplex example

The following example shows a parent and child process sharing a duplex pipe to exchange messages.

>>> from multiprocessing import Process
>>> import asyncio
>>>
>>> from aiopipe import aioduplex
>>>
>>> async def main():
...     mainpipe, chpipe = aioduplex()
...
...     with chpipe.detach() as chpipe:
...         proc = Process(target=childproc, args=(chpipe,))
...         proc.start()
...
...     # The second pipe is now available in the child process
...     # and detached from the parent process.
...
...     async with mainpipe.open() as (rx, tx):
...         req = await rx.read(5)
...         tx.write(req + b" world\n")
...         msg = await rx.readline()
...
...     proc.join()
...     return msg
>>>
>>> def childproc(pipe):
...     asyncio.run(childtask(pipe))
>>>
>>> async def childtask(pipe):
...     async with pipe.open() as (rx, tx):
...         tx.write(b"hello")
...         rep = await rx.readline()
...         tx.write(rep.upper())
>>>
>>> asyncio.run(main())
b'HELLO WORLD\n'
>>>

Installation

This package requires Python 3.7+ and can be installed with pip:

pip install aiopipe

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiopipe-0.2.1.tar.gz (4.5 kB view hashes)

Uploaded Source

Built Distribution

aiopipe-0.2.1-py3-none-any.whl (5.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page