Skip to main content

A simple async, android compatible(, atm not thread safe) process pool implementation including a (mostly) `concurrent.futures.Executor` / `concurrent.futures.ProcessPoolExecutor` compliant `Executor`.

Project description

aio_process_pool

PyPI - Version PyPI - Python Version


A simple async, android compatible process pool and a (mostly) concurrent.futures.Executor / ProcessPoolExecutor compliant Executor.

Not thread safe.

Table of Contents

Installation

pip install aio_process_pool

Usage

import asyncio

from aio_process_pool import ProcessPool, Executor

def foo(x=7):
    return x

# sync
executor = Executor()
assert executor.map(foo, [1, 2, 3]) == [1, 2, 3]
executor.shutdown()

# async
async def example():
    pool = ProcessPool()
    executor = Executor()

    # run a function in the pool
    assert await pool.run(foo, 2) == 2
    # map using a process pool
    assert await pool.map(foo, [1, 2, 3]) == [1, 2, 3]

    # use the executor with run_in_executor
    loop = asyncio.get_event_loop()
    assert await loop.run_in_executor(executor, foo) == 7

    # use submit
    concurrent_future = executor.submit(foo, 3)
    assert await asyncio.wrap_future(concurrent_future) == 3

    # map again
    assert await executor.map_async(foo, [1, 2, 3]) == [1, 2, 3]

    pool.shutdown()
    await executor.shutdown_async()

asyncio.run(example())

Executor

The Executor is mostly concurrent.futures.Executor compliant and can therefor be used as a replacement for concurrent.futures.ProcessPoolExecutor.

It is possible to monkey patch this executor into an environment:

import concurrent.futures, aio_process_pool
concurrent.futures.ProcessPoolExecutor = aio_process_pool.Executor

This is handy if you have code using a ProcessPoolExecutor and want to run it on android (with buildozer).

shutdown behaviour / deadlock under certain conditions

Since this package is based on asyncio I was not able to implement the specified shutdown behaviour under certain conditions.

If there are tasks pending and the wait parameter is True shutdown is supposed to block until all pending tasks are done. Since the execution of those task depends on the event loop this produces a deadlock.

The handle this situation this implementation raises a RuntimeError in that case instead of deadlocking.

If possible use shutdown_async instead. shutdown_async should behave concurrent.futures.Executor compliant.

map from within the loop

Since map is -- according to concurrent.futures.Executor -- supposed to be a sync function it uses asyncio.new_event_loop to get a new event loop and runs map_async in that loop. This is only possible if we're not inside a loop already. This is somehow not concurrent.futures.Executor compliant.

If possible use map_async instead.

If anyone knows how to improve this.....

Demo

import asyncio
from aio_process_pool import Executor

def fib(n):
    if n <= 2: return 1
    return fib(n-1) + fib(n-2)

def fib_wrapper(n):
    print(f"fib({n}) = .....")
    result = fib(n)
    print(f"fib({n}) = {result}")

async def watch_htop_and_output_while_execution():
    exe = Executor()
    await exe.map_async(fib_wrapper, range(45))
    exe.shutdown()

asyncio.run(watch_htop_and_output_while_execution())

License

aio_process_pool is distributed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_process_pool-0.1.0.tar.gz (18.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_process_pool-0.1.0-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file aio_process_pool-0.1.0.tar.gz.

File metadata

  • Download URL: aio_process_pool-0.1.0.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.1

File hashes

Hashes for aio_process_pool-0.1.0.tar.gz
Algorithm Hash digest
SHA256 165a92b002fa976cb097fb889b76839b2f240036c671b9e73a5e3e2ace6312f1
MD5 6ff83be151fd078a3884729f4e3d60d2
BLAKE2b-256 83403244bfe360d951a8a8150329707cbcfdc1c26285b65c88cb55d405c5bf56

See more details on using hashes here.

File details

Details for the file aio_process_pool-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for aio_process_pool-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5e22ce26b26c496b1c4332a70705b862467cada9ccb7601a477f96ef9738fc7f
MD5 ad6fba0a674fb9becd5d5759fd1876ef
BLAKE2b-256 5f303d1635d9996068bee77961b9a6e7465fc65eac98d7b9569675d2fd37c1a9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page