Skip to main content

Xqtor - a more flexible ThreadPoolExecutor

Project description

Xqtor - a more flexible concurrent.futures.ThreadPoolExecutor

The original ThreadPoolExecutor schedules tasks based on:

  • An integer number of workers.
  • Each submitted task uses exactly one worker.

Xqtor schedules tasks based on general resources units.

  • Define quantity of total resource available.
  • Each submitted task specifies resource quantity it requires.

Quick start

Install

pip install xqtor

TL;DR

  • set available to indicate total available resources, when constructing Xqtor
  • submit tasks with a tuple of (resource_quantity, task).
  • The rest works the same as regular ThreadPoolExecutor.
  • Constructor parameters are passed to an underlying ThreadPoolExecutor (except available). I.e. You can still set max_workers or initializer if needed.

Example: using a float to represent CPU cores

from xqtor import Xqtor
import time


def task_4_cpu():
  print("running task_4_cpu")
  time.sleep(1)


def task_2_cpu():
  print("running task_2_cpu")
  time.sleep(1)


def task_half_cpu():
  print("running task_half_cpu")
  time.sleep(1)


executor = Xqtor(available=5.5)

futures = []
futures.append(executor.submit((task_4_cpu, 4.0)))
futures.append(executor.submit((task_2_cpu, 2.0)))
futures.append(executor.submit((task_half_cpu, 0.5)))

for future in futures:
  future.result()

Output:

# T=0
running task_4_cpu
running task_half_cpu

# T=1
running task_2_cpu

What can I use to represent resources?

For simple cases use a number (float or int). E.g. Each task declares the need for X CPU cores? Or Y GB of RAM?

For more complex cases, define a custom resource. Resource values should support + / - and >=:

Example: CPU + memory compound resource

from xqtor import Xqtor


class CPUAndMemory:
    def __init__(self, cpu, memory):
        self.cpu = cpu
        self.memory = memory

    def __add__(self, other):
        return CPUAndMemory(self.cpu + other.cpu, self.memory + other.memory)

    def __sub__(self, other):
        return CPUAndMemory(self.cpu - other.cpu, self.memory - other.memory)

    def __ge__(self, other):
        return self.cpu >= other.cpu and self.memory >= other.memory


executor = Xqtor(available=CPUAndMemory(16, 64))
executor.submit((lambda: print("running task"), CPUAndMemory(4, 16)))
executor.submit((lambda: print("running task"), CPUAndMemory(0, 16)))
executor.submit((lambda: print("running task"), CPUAndMemory(8, 0)))
executor.submit((lambda: print("running task"), CPUAndMemory(4, 32)))
# The 4 tasks above will run immediately
executor.submit((lambda: print("running task"), CPUAndMemory(4, 16)))
# This one will run when one of the first 4 tasks finishes

Contributions

Contributions are welcome! Some possible areas (not exhaustive):

  • Add support for ProcessPoolExecutor (currently only ThreadPoolExecutor is supported)
  • Benchmarks (which may suggest perf improvement tasks)
  • Test coverage
  • Pluggable scheduling strategies
  • General bugs
  • General code quality improvements

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xqtor-0.1.0.tar.gz (2.7 kB view details)

Uploaded Source

Built Distribution

xqtor-0.1.0-py3-none-any.whl (3.1 kB view details)

Uploaded Python 3

File details

Details for the file xqtor-0.1.0.tar.gz.

File metadata

  • Download URL: xqtor-0.1.0.tar.gz
  • Upload date:
  • Size: 2.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/23.5.0

File hashes

Hashes for xqtor-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9ee408b934e393f4d40525ddf759f8a5b9fcb2b7a3ac010b47b0265ba9bc8e1b
MD5 654596ef33de70d30f494fcc44f21bbc
BLAKE2b-256 069c37cabb8847c7e3996411e5bb25e171aaaeb73f01badf866950577ca7f880

See more details on using hashes here.

File details

Details for the file xqtor-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: xqtor-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 3.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/23.5.0

File hashes

Hashes for xqtor-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f50fdadfbd319cceb0fab943be1c76ee6c8d6c78d399daa1e619deb1ad7dd12e
MD5 0d42a0b710dd1f2196e1f2f151352621
BLAKE2b-256 576235db460e16bcf034eabbf84900edf675cdbfcc31cc687c14f2ef8101be75

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page