Skip to main content

A simple wrapper to build queue multiprocessing pipelines

Project description

QueueAutomator

A Python library that wraps the multiprocessing package. It provides a simple to use API to build queue-based multiprocessing pipelines.

instalation

Run pip install queue-automator (use python >=3.6 )

Summary

QueueAutomator provides a clean decorator API to get started with multiprocessing and queues.

  • This library offers an easy interface to parallelize consecutive tasks that take a long time to finish.

  • As it is build on top of the native python-multiprocessing library, you can run compute intensive tasks without locking the main process

  • All the code that manages queues, spawning and joining processes is already implemented, so you can focus on the task at hand.

How it works:

QueueDiagram

Example

Using only 1 worker function

from queue_automator import QueueAutomator
from time import sleep

# Create an instance of QueueAutomator()
automator = QueueAutomator()

# Register a worker function (if input_queue_name and output_queue_name are not provided
# they will default to 'input' and 'output' respectively). 'input' and 'output'
# are necessary  to mark the start and ending of your pipeline

@automator.register_as_worker_function(process_count=2)
def do_work(item: int) -> int:
    sleep(2)
    result = item*2
    print(f'{item} times two {result}')
    return result

if __name__ == '__main__':
    input_data = range(30)

    # Always set your input data before calling .run()
    automator.set_input_data(input_data)
    results = automator.run()
    print(results)

Using more than 1 worker function

from queue_automator import QueueAutomator
from time import sleep

automator = QueueAutomator()

@automator.register_as_worker_function(output_queue_name='square_queue', process_count=2)
def do_work(item: int) -> int:
    sleep(2)
    result = item*2
    print(f'{item} times two {result}')
    return result

@automator.register_as_worker_function(input_queue_name='square_queue', output_queue_name='cube_queue', process_count=2)
def do_work_2(item: int) -> int:
    sleep(2)
    result = item**2
    print(f'{item} squared {result}')
    return result

@automator.register_as_worker_function(input_queue_name='cube_queue', process_count=2)
def do_work_3(item: int) -> int:
    sleep(2)
    result = item**3
    print(f'{item} cubed {result}')
    return result

# Note that the first and last functions in the pipeline do not need to
# declare the input and output queue names respectively.

if __name__ == '__main__':
    input_data = range(30)

    # Always set your input data before calling .run()
    automator.set_input_data(input_data)
    results = automator.run()
    print(results)

Cautions

As with anything, this is not a silver bullet that gets rid of all problems using python multiprocessing

There are some caveats when using this library:

  • Launching processes in python is an expensive operation, as it spawns a separate instance of the interpreter. The performance gains could be offset by the time it takes to spawn a process

  • Try to keep the number of processes in line with your CPU cores, spawning a ton of them could result in slower performance overall.

  • The input objects of every worker function need to be serializable or pickable. This is a limitation of python multiprocessing. If you are dealing with complex objects try to convert them to a suitable format before processing, or implement the __reduce__, __repr__ or __dict__ methods in your classes.

  • It is important that you try to keep your worker functions pure, which means that they should not have side effects.

  • The .run() method should be called from your main entry point or a function that is called at your main entry point, (this is another limitation of python's multiprocessing)

  • Try to optimize the number of process depending of how long a task takes, prioritize longer running tasks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue-automator-0.0.1.tar.gz (7.2 kB view details)

Uploaded Source

Built Distribution

queue_automator-0.0.1-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file queue-automator-0.0.1.tar.gz.

File metadata

  • Download URL: queue-automator-0.0.1.tar.gz
  • Upload date:
  • Size: 7.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for queue-automator-0.0.1.tar.gz
Algorithm Hash digest
SHA256 96c1a74f6b4ffae1b7c194a1ec492613afffb24ff7914966eb044410ab784743
MD5 f505660e6c14b65e7b4eb7b2ca654a64
BLAKE2b-256 1d1dc262d55d7b9a5d7c72037a1ad6d8c8e5f3def0b62d5ec037fc3e1f317b79

See more details on using hashes here.

File details

Details for the file queue_automator-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: queue_automator-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for queue_automator-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d434fa67de687908934a5ec2706c3304c743625c4132d23ba8010004f84b287d
MD5 6faa39e5a903847fe742f86229e48b50
BLAKE2b-256 5692379ed02b3e7de07403f6f912e2d098e17a98f63ce9e0bbd8a95973b5866c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page