Preemptive concurrency and parallelism for sporadic workloads
Project description
Poniol, CC BY-SA 3.0, via Wikimedia Commons
Asyncpal
Preemptive concurrency and parallelism for sporadic workloads
Table of contents
- Overview
- Examples
- Embarrassingly parallel workloads
- Initializers, finalizers, and the BrokenPoolError exception
- The peculiar case of daemons
- Application programming interface
- Testing and contributing
- Installation
Overview
Asyncpal is a Python library designed for preemptive concurrency and parallelism. It achieves concurrency using the thread pool design pattern that it extends with processes to enable parallelism.
Designed for sporadic workloads
Although a thread pool is the right tool for the problems it solves, its creation and usage involve the allocation of resources that must be properly released. For this reason, it is recommended to use a thread pool with a context manager to ensure that resources are properly released.
However, this strategy can introduce overhead in programs that sporadically submit tasks to a thread pool, as multiple pools may be created and destroyed throughout the execution of these programs.
Maintaining one or a few thread pools for the entire duration of a program could be an effective solution, especially when the thread pool can automatically shrink after being idle for a short amount of time defined by the programmer.
Asyncpal offers the ability to set an idle timeout for workers, allowing the pool to which they belong to shrink when they are not in use.
Learn how Asyncpal ensures a graceful shutdown of open pools when an uncaught exception occurs.
Supplied with advanced capabilities
Asyncpal pools provide methods to manage embarrassingly parallel workloads, allowing for lazy or eager execution and optional workload splitting into large chunks, with or without preserving their original order.
Some level of introspection is achievable directly from the pool interface, such as counting busy workers or pending tasks. Additionally, a Future
class (never directly instantiated by the user) is provided, whose objects allow a task to be cancelled or its result to be collected. Furthermore, the pending time and running duration of a task can be obtained directly from a Future
object.
Overall, the characteristics of Asyncpal make it suitable for both implicit use in the background through higher-level abstractions provided by frameworks or libraries, and for explicit use with or without a context manager.
Featuring a familiar interface
Asyncpal is inspired by the great preemptive concurrency and parallelism packages provided in Python and Java.
For instance, the chunk_size
option for map operations is borrowed from Python's multiprocessing and concurrent.futures packages, while the fixed-size pools, such as the SingleThreadPool
class, are inspired by Java's java.util.concurrent.Executors.newSingleThreadExecutor static method.
Examples
The following code snippets are adapted from examples provided by Python's concurrent.futures
documentation page.
Thread pool example
The following code snippet is adapted from ThreadPoolExecutor example provided by Python's concurrent.futures documentation page.
import urllib.request
from asyncpal import ThreadPool, as_done
URLS = ["https://ubuntu.com/",
"https://github.com/pyrustic/asyncpal/",
"https://youtu.be/xLi83prR5fg",
"https://news.ycombinator.com/",
"https://nonexistant-subdomain.python.org/"]
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# Thread pool with a context manager (not mandatory, tho)
with ThreadPool(max_workers=5) as pool:
# Start the load operations and mark each future with its URL
future_to_url = {pool.submit(load_url, url, 60): url for url in URLS}
for future in as_done(future_to_url):
url = future_to_url[future]
try:
data = future.collect() # collect the result or raise the exception
except Exception as exc:
print("%r generated an exception: %s" % (url, exc))
else:
print("%r page is %d bytes" % (url, len(data)))
The function
as_done
accepts a list of Future objects and also theordered
andtimeout
keyword arguments.
Process pool example
The following code snippet is adapted from ProcessPoolExecutor example provided by Python's concurrent.futures documentation page.
import math
from asyncpal import ProcessPool
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with ProcessPool() as pool:
# The 'map' method is lazy and slower than 'map_all'.
# For very long iterables, 'map_all' may cause high memory usage.
for number, prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
print("%d is prime: %s" % (number, prime))
if __name__ == "__main__":
main()
The method
map
also accepts these keyword arguments:chunk_size
,buffer_size
,ordered
, andtimeout
.
Embarrassingly parallel workloads
Asyncpal pool classes provide four methods to perform Map operations and for cases where control is more important than convenience, there are public functions for manually splitting a task into subtasks to submit to the pool.
Pool class methods to perform Map operations
Pool class methods to perform Map operations are map
, map_all
, starmap
, and starmap_all
.
from itertools import starmap
from asyncpal import ThreadPool
def add(x, y):
return x + y
with ThreadPool(4) as pool:
# numbers go from 0 to 99
numbers = range(100)
# The 'map' method is lazy and slower than 'map_all'.
# Keyword arguments: chunk_size, buffer_size, ordered, timeout
iterator = pool.map(add, numbers, numbers, chunk_size=25)
assert tuple(iterator) == tuple(map(add, numbers, numbers))
# For very long iterables, 'map_all' may cause high memory usage.
# Keyword arguments: chunk_size, ordered, timeout
iterator = pool.map_all(add, numbers, numbers, chunk_size=25)
assert tuple(iterator) == tuple(map(add, numbers, numbers))
# The 'starmap' method is lazy and slower than 'starmap_all'.
# Keyword arguments: chunk_size, buffer_size, ordered, timeout
iterator = pool.starmap(add, zip(numbers, numbers), chunk_size=25)
assert tuple(iterator) == tuple(starmap(add, zip(numbers, numbers)))
# For very long iterables, 'starmap_all' may cause high memory usage.
# Keyword arguments: chunk_size, ordered, timeout
iterator = pool.starmap_all(add, zip(numbers, numbers), chunk_size=25)
assert tuple(iterator) == tuple(starmap(add, zip(numbers, numbers)))
Useful functions
The split_map_task
and split_starmap_task
functions allow to manually split a task into subtasks. There are also the wait
, collect
and as_done
functions which are intended to be applied to sequences of Future objects.
from asyncpal import (ThreadPool, split_map_task, split_starmap_task,
wait, collect, as_done)
def add(x, y):
return x + y
with ThreadPool(4) as pool:
# numbers go from 0 to 99
numbers = range(100)
# Manually split a 'map' task into 4 subtasks
futures = list()
for subtask in split_map_task(add, numbers, numbers, chunk_size=25):
future = pool.submit(subtask)
futures.append(future)
# We could've used 'split_starmap_task'
for subtask in split_starmap_task(add, zip(numbers, numbers)):
pass
# We can block the current thread, waiting for the results to be available
wait(futures, timeout=42) # 42 seconds !
# Or we can just collect results (beware, an exception may be raised)
result = list()
for sub_result in collect(futures, timeout=42):
result.extend(sub_result)
assert tuple(result) == tuple(map(add, numbers, numbers))
# We could've used 'as_done' to filter out futures as they are done.
# Note that by default, the keyword argument 'ordered' is False !
for future in as_done(futures, timeout=42):
pass
Initializers, finalizers, and the BrokenPoolError exception
At the creation of a pool, the programmer can provide an initializer and/or a finalizer. Consequently, each time the pool spawns a worker (whether it is a thread or a process), the initializer is executed at startup, and the finalizer is executed right before the worker shuts down.
Any exception raised during initialization, finalization, or in between will be caught by the pool, which will then enter a "broken" state. Once the pool is broken, it will shut down other workers, cancel pending tasks, and make them available via the cancelled_tasks
property. It will also raise a BrokenPoolError exception whenever the programmer attempts to submit new tasks.
Asyncpal offers a way to reduce the risk of encountering a BrokenPoolError
exception at an inconvenient time by testing the pool beforehand. All pool classes provide a test
method that replicate the pool with its configuration, perform some computation on it, then close it, letting any exception propagate to the top.
The peculiar case of daemons
In Python, a thread can be flagged as a daemon thread. The significance of this flag is that the entire Python program exits when only daemon threads are left.
Prior to Python 3.9, concurrent.futures
used daemon threads as workers for its thread pool and relied on atexit hooks to gracefully shut down the pools that had not been explicitly closed. For compatibility with subinterpreters, which do not support daemon threads, it was decided to remove the daemon flag. However, simply removing the daemon flag would have been problematic.
The fix for this issue involved stopping the use of atexit hooks and instead relying on an internal threading atexit hook. Asyncpal does not use the daemon flag either. Instead of relying on some internal Python function that might disappear without warning, it implements its own workaround. This workaround involves a single thread for the entire program, started by asyncpal.pool.GlobalShutdown
, whose job is to join the main thread and, once joined, run the shutdown handlers registered by the pools.
Feel free to open an issue to criticize this workaround or to suggest a better idea.
Application programming interface
This section describes the API and refers to the API reference for more details.
Asyncpal consists of three key components: the Pool, the Worker, and the Future. From the programmer perspective, the pool represents the main interface of a system that spawns workers as needed and returns Future objects.
Preemptive concurrency is achieved with the ThreadPool
class while parallelism is handled by the ProcessPool
class. Under the hood, the thread pool spawns Python's threading.Thread
as workers and the process pool spawns Python's multiprocessing.Process
as workers.
ThreadPool class
Preemptive concurrency is achieved with the ThreadPool
class. Under the hood, the thread pool spawns Python's threading.Thread
as workers.
For convenience, the following four derived classes are provided:
SingleThreadPool
: spawns only 1 workerDualThreadPool
: spawns up to 2 workersTripleThreadPool
: spawns up to 3 workersQuadThreadPool
: spawns up to 4 workers
from asyncpal import ThreadPool
from asyncpal.errors import BrokenPoolError, InitializerError, FinalizerError
def add(x, y):
return x + y
def initializer(*args, **kwargs):
pass
def finalizer(*args, **kwargs):
pass
# all these arguments are optional
pool = ThreadPool(max_workers=4, name="my-pool", idle_timeout=60,
initializer=initializer, init_args=(1, 2),
init_kwargs={"arg": 1}, finalizer=finalizer,
final_args=(3, 4), max_tasks_per_worker=None)
# submit a task
future = pool.submit(add, 10, 2)
# test the pool
try:
pool.test()
# exception coming from the initializer
except InitializerError as e:
e.__cause__ # the cause
# exception coming from the finalizer
except FinalizerError:
pass
# exception coming from the initializer
# or the finalizer
except BrokenPoolError:
pass
# calling this will raise RuntimeError if the pool is closed
# or BrokenPoolError (or its subclass)
pool.check()
# retrieve useful data
pool.count_workers()
pool.count_busy_workers()
pool.count_free_workers()
pool.count_pending_tasks()
# manually spawn workers
pool.spawn_workers(2) # 2 extra workers
# join all workers
pool.join(timeout=42)
# gracefully shut down the pool
pool.shutdown()
assert pool.terminated
# list of cancelled tasks
pool.cancelled_tasks
Check out the API reference for asyncpal.ThreadPool.
ProcessPool class
Parallelism is achieved with the ProcessPool
class. Under the hood, the process pool spawns Python's multiprocessing.Process
as workers with the spawn
context.
The ProcessPool
class is similar to the ThreadPool
class.
For convenience, the following four derived classes are provided:
SingleProcessPool
: spawns only 1 workerDualProcessPool
: spawns up to 2 workersTripleProcessPool
: spawns up to 3 workersQuadProcessPool
: spawns up to 4 workers
Note that you must guard your process pool with
if __name__ == '__main__'
and also avoid writting multiprocessing code directly in the__main__
module of your projects.
Check out the API reference for asyncpal.ProcessPool.
Future class
A Future object is not meant to be instantiated by the programmer but rather returned by the submit
method of pools.
from asyncpal import ThreadPool
def divide(x, y):
return x // y
with ThreadPool(4) as pool:
# submit a task
future = pool.submit(divide, 10, 2)
# add a callback that accepts the future as argument
# and that will be called when the future is done
future.add_callback(lambda f: None)
# safely collect the result (by default, it blocks)
try:
# blocks (max 42s) until the Future is done
result = future.collect(timeout=42)
except ZeroDivisionError as e:
pass
else:
assert result == 5
# get duration (in seconds)
pending_time, running_time = future.duration
# cancel the future (it is a bit too late, but ok)
future.cancel()
# we could've waited for the Future to be done (it blocks)
future.wait(timeout=42) # 42s !
# get the result (returns None if the Future isn't done)
result = future.result
# get the exception (returns None if the Future isn't done)
exc = future.exception
# some useful properties
future.cancel_flag # boolean set to True after cancel() is called
future.cancelled # boolean that confirms cancellation
future.done # is True when Completed, Cancelled, or Failed
future.pending # True while task is pending
future.running # True while task is running
# etc...
Check out the API reference for asyncpal.Future.
Miscellaneous functions and classes
Check out the API reference for asyncpal.
Testing and contributing
Feel free to open an issue to report a bug, suggest some changes, show some useful code snippets, or discuss anything related to this project. You can also directly email me.
Setup your development environment
Following are instructions to setup your development environment
# create and activate a virtual environmentb
python -m venv venv
source venv/bin/activate
# clone the project then change into its directory
git clone https://github.com/pyrustic/asyncpal.git
cd asyncpal
# install the package locally (editable mode)
pip install -e .
# run tests
python -m tests
# deactivate the virtual environment
deactivate
Installation
Asyncpal is cross-platform. It is built on Ubuntu and should work on Python 3.8 or newer.
Create and activate a virtual environment
python -m venv venv
source venv/bin/activate
Install for the first time
pip install asyncpal
Upgrade the package
pip install asyncpal --upgrade --upgrade-strategy eager
Deactivate the virtual environment
deactivate
About the author
Hello world, I'm Alex, a tech enthusiast ! Feel free to get in touch with me !
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.