Xqtor - a more flexible ThreadPoolExecutor
Project description
Xqtor - a more flexible concurrent.futures.ThreadPoolExecutor
The original ThreadPoolExecutor schedules tasks based on:
- An integer number of workers.
- Each submitted task uses exactly one worker.
Xqtor schedules tasks based on general resources units.
- Define quantity of total resource available.
- Each submitted task specifies resource quantity it requires.
Quick start
Install
pip install xqtor
TL;DR
- set
availableto indicate total available resources, when constructingXqtor submittasks with a tuple of(resource_quantity, task).- The rest works the same as regular
ThreadPoolExecutor. - Constructor parameters are passed to an underlying
ThreadPoolExecutor(exceptavailable). I.e. You can still setmax_workersorinitializerif needed.
Example: using a float to represent CPU cores
from xqtor import Xqtor
import time
def task_4_cpu():
print("running task_4_cpu")
time.sleep(1)
def task_2_cpu():
print("running task_2_cpu")
time.sleep(1)
def task_half_cpu():
print("running task_half_cpu")
time.sleep(1)
executor = Xqtor(available=5.5)
futures = []
futures.append(executor.submit((task_4_cpu, 4.0)))
futures.append(executor.submit((task_2_cpu, 2.0)))
futures.append(executor.submit((task_half_cpu, 0.5)))
for future in futures:
future.result()
Output:
# T=0
running task_4_cpu
running task_half_cpu
# T=1
running task_2_cpu
What can I use to represent resources?
For simple cases use a number (float or int). E.g. Each task declares the need for X CPU cores? Or Y GB of RAM?
For more complex cases, define a custom resource. Resource values should support + / - and >=:
Example: CPU + memory compound resource
from xqtor import Xqtor
class CPUAndMemory:
def __init__(self, cpu, memory):
self.cpu = cpu
self.memory = memory
def __add__(self, other):
return CPUAndMemory(self.cpu + other.cpu, self.memory + other.memory)
def __sub__(self, other):
return CPUAndMemory(self.cpu - other.cpu, self.memory - other.memory)
def __ge__(self, other):
return self.cpu >= other.cpu and self.memory >= other.memory
executor = Xqtor(available=CPUAndMemory(16, 64))
executor.submit((lambda: print("running task"), CPUAndMemory(4, 16)))
executor.submit((lambda: print("running task"), CPUAndMemory(0, 16)))
executor.submit((lambda: print("running task"), CPUAndMemory(8, 0)))
executor.submit((lambda: print("running task"), CPUAndMemory(4, 32)))
# The 4 tasks above will run immediately
executor.submit((lambda: print("running task"), CPUAndMemory(4, 16)))
# This one will run when one of the first 4 tasks finishes
Contributions
Contributions are welcome! Some possible areas (not exhaustive):
- Add support for
ProcessPoolExecutor(currently onlyThreadPoolExecutoris supported) - Benchmarks (which may suggest perf improvement tasks)
- Test coverage
- Pluggable scheduling strategies
- General bugs
- General code quality improvements
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file xqtor-0.1.0.tar.gz.
File metadata
- Download URL: xqtor-0.1.0.tar.gz
- Upload date:
- Size: 2.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/23.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9ee408b934e393f4d40525ddf759f8a5b9fcb2b7a3ac010b47b0265ba9bc8e1b
|
|
| MD5 |
654596ef33de70d30f494fcc44f21bbc
|
|
| BLAKE2b-256 |
069c37cabb8847c7e3996411e5bb25e171aaaeb73f01badf866950577ca7f880
|
File details
Details for the file xqtor-0.1.0-py3-none-any.whl.
File metadata
- Download URL: xqtor-0.1.0-py3-none-any.whl
- Upload date:
- Size: 3.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/23.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f50fdadfbd319cceb0fab943be1c76ee6c8d6c78d399daa1e619deb1ad7dd12e
|
|
| MD5 |
0d42a0b710dd1f2196e1f2f151352621
|
|
| BLAKE2b-256 |
576235db460e16bcf034eabbf84900edf675cdbfcc31cc687c14f2ef8101be75
|