Skip to main content

An advanced executor library for Python with progress tracking, throttling, and more.

Project description

ExecutorX

License

ExecutorX is an advanced executor library for Python that extends concurrent.futures.Executor with new capabilities, including task progress tracking, submission throttling, modular lifecycle hooks, and improved support for worker initialization. Whether you're running CPU-bound tasks in processes, IO-bound tasks in threads, or debugging in the main thread, ExecutorX makes parallelism more flexible and powerful.


Features

Enhanced Executors

  • ProcessPoolExecutor and ThreadPoolExecutor:
    • Advanced executors with support for addons, task tracking, and flexible worker initialization.
  • ImmediateExecutor:
    • Executes tasks in the main thread synchronously for debugging and testing.

Advanced Features

  1. Progress Tracking: Monitor task completion and pending tasks in real time.
  2. Submission Throttling: Limit the number of concurrent tasks or submission rate to prevent overloading workers.
  3. Worker Initialization: Define custom initialization logic for workers (e.g., resource setup).
  4. Worker Identification: Assign and track unique worker IDs.
  5. Lifecycle Addons: Extend executor behavior with modular addons for logging, monitoring, or custom task logic.

Utilities

  • Support for spawn Context:
    • Ensures compatibility with platforms like Windows, which lack support for the fork multiprocessing context.
  • Graceful Shutdown:
    • Ensures all tasks complete before the executor shuts down, even when using custom addons.

Installation

pip install ExecutorX

Quick Start

Process Pool with Progress Tracking

from executorx.addons.progress import ProgressAddon
from executorx.futures.executors import ProcessPoolExecutor


def my_function(x):
    return x * x


# Initialize a process pool with progress tracking
executor = ProcessPoolExecutor(max_workers=4, addons=[ProgressAddon])

# Submit tasks
futures = [executor.submit(my_function, i) for i in range(10)]

# Collect results
for future in futures:
    print(future.result())

executor.shutdown()

Immediate Execution Mode (Single-Threaded)

The ImmediateExecutor runs all tasks in the main thread synchronously. This is useful for debugging or environments where parallelism isn't required.

from executorx.futures.executors import ImmediateExecutor


def debug_task(x):
    print(f"Processing {x} in the main thread")
    return x * x


executor = ImmediateExecutor()

# Submit tasks
futures = [executor.submit(debug_task, i) for i in range(5)]

# Collect results
for future in futures:
    print(future.result())

executor.shutdown()

Throttling Task Submissions

Control the number of concurrent or submitted tasks using the ThrottleAddon.

from executorx.addons.throttle import ThrottleAddon
from executorx.futures.executors import ThreadPoolExecutor

executor = ThreadPoolExecutor(
    max_workers=4, addons=[ThrottleAddon(max_concurrent_tasks=2)]
)

# Submit tasks (at most 2 will run concurrently)
futures = [executor.submit(print, f"Task {i}") for i in range(10)]

executor.shutdown()

Worker Initialization

Set up custom initialization logic for workers, such as loading shared resources.

from executorx.futures.executors import ProcessPoolExecutor


def initialize_worker():
    print("Initializing worker...")


executor = ProcessPoolExecutor(max_workers=4, initializer=initialize_worker)

executor.submit(print, "Task executed with worker initialization")
executor.shutdown()

Addons: Extending Executors

Addons provide modular hooks into the executor lifecycle, allowing custom logic for task submission, initialization, and shutdown.

Creating a Custom Addon

from executorx.futures.addon import PoolExecutorAddon


class MyCustomAddon(PoolExecutorAddon):
    def before_submit(self):
        print("Preparing to submit task")

    def after_submit(self, future):
        print("Task submitted")


# Use the addon
from executorx.futures.executors import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4, addons=[MyCustomAddon])
executor.submit(print, "Hello from ExecutorX!")
executor.shutdown()

Why ImmediateExecutor?

While setting max_workers=0 in a process or thread pool allows single-threaded execution, ImmediateExecutor provides distinct advantages:

  1. Code Clarity:

    • It explicitly communicates the intent to disable parallelism while keeping the API consistent with other executors.
  2. Debugging Support:

    • Addons, worker initialization, and task tracking remain functional, making it easy to debug parallelized code in a single-threaded environment.
  3. Feature Parity:

    • ImmediateExecutor retains all features of other executors (e.g., addons, lifecycle management), ensuring that testing environments mirror production.

Contributing

Contributions are welcome! To get started:

  1. Fork the repository.
  2. Create a feature branch (git checkout -b feature-name).
  3. Commit your changes (git commit -am 'Add feature-name').
  4. Push to the branch (git push origin feature-name).
  5. Create a pull request.

License

ExecutorX is licensed under the GNU Lesser General Public License (LGPL) v2.1. See the LICENSE file for details.


Future Roadmap

  • Picklable Executors: Allow executors to be serialized and passed across processes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

executorx-1.0.9.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ExecutorX-1.0.9-py3-none-any.whl (23.9 kB view details)

Uploaded Python 3

File details

Details for the file executorx-1.0.9.tar.gz.

File metadata

  • Download URL: executorx-1.0.9.tar.gz
  • Upload date:
  • Size: 21.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.8.18

File hashes

Hashes for executorx-1.0.9.tar.gz
Algorithm Hash digest
SHA256 7b2f3b4a11dc8fde5a30fe209351ea265fbbf69ef966227adc9adabfc51fa5cd
MD5 08c20c6d7a1247c0d31324eb923e883a
BLAKE2b-256 4363b1146f0aff28f058358c734c736defac0ce9c44b67cbc7bd5a03a0e1747e

See more details on using hashes here.

File details

Details for the file ExecutorX-1.0.9-py3-none-any.whl.

File metadata

  • Download URL: ExecutorX-1.0.9-py3-none-any.whl
  • Upload date:
  • Size: 23.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.8.18

File hashes

Hashes for ExecutorX-1.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 1a258d6eb07c46dc2fb2ca5a0eeea0cc198d9d2d9dbb0fa41f950f8ab2cc8066
MD5 0f902a4f9e33d9d043aa33f1d191a553
BLAKE2b-256 563e26a9f1d815a07570f29af0ec6ee50089d251f9f3178a333d856d9ff32843

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page