Skip to main content

A simple wrapper to build queue multiprocessing pipelines

Project description

QueueAutomator

A Python library that wraps the multiprocessing package. It provides a simple to use API to build queue-based multiprocessing pipelines.

instalation

Run pip install queue-automator (use python >=3.6 )

Summary

QueueAutomator provides a clean decorator API to get started with multiprocessing and queues.

  • This library offers an easy interface to parallelize consecutive tasks that take a long time to finish.

  • As it is build on top of the native python-multiprocessing library, you can run compute intensive tasks without locking the main process

  • All the code that manages queues, spawning and joining processes is already implemented, so you can focus on the task at hand.

How it works:

QueueDiagram

Example

Using only 1 worker function

from queue_automator import QueueAutomator
from time import sleep

# Create an instance of QueueAutomator()
automator = QueueAutomator()

# Register a worker function (if input_queue_name and output_queue_name are not provided
# they will default to 'input' and 'output' respectively). 'input' and 'output'
# are necessary  to mark the start and ending of your pipeline

@automator.register_as_worker_function(process_count=2)
def do_work(item: int) -> int:
    sleep(2)
    result = item*2
    print(f'{item} times two {result}')
    return result

if __name__ == '__main__':
    input_data = range(30)

    # Always set your input data before calling .run()
    automator.set_input_data(input_data)
    results = automator.run()
    print(results)

Using more than 1 worker function

from queue_automator import QueueAutomator
from time import sleep

automator = QueueAutomator()

@automator.register_as_worker_function(output_queue_name='square_queue', process_count=2)
def do_work(item: int) -> int:
    sleep(2)
    result = item*2
    print(f'{item} times two {result}')
    return result

@automator.register_as_worker_function(input_queue_name='square_queue', output_queue_name='cube_queue', process_count=2)
def do_work_2(item: int) -> int:
    sleep(2)
    result = item**2
    print(f'{item} squared {result}')
    return result

@automator.register_as_worker_function(input_queue_name='cube_queue', process_count=2)
def do_work_3(item: int) -> int:
    sleep(2)
    result = item**3
    print(f'{item} cubed {result}')
    return result

# Note that the first and last functions in the pipeline do not need to
# declare the input and output queue names respectively.

if __name__ == '__main__':
    input_data = range(30)

    # Always set your input data before calling .run()
    automator.set_input_data(input_data)
    results = automator.run()
    print(results)

Cautions

As with anything, this is not a silver bullet that gets rid of all problems using python multiprocessing

There are some caveats when using this library:

  • Launching processes in python is an expensive operation, as it spawns a separate instance of the interpreter. The performance gains could be offset by the time it takes to spawn a process

  • Try to keep the number of processes in line with your CPU cores, spawning a ton of them could result in slower performance overall.

  • The input objects of every worker function need to be serializable or pickable. This is a limitation of python multiprocessing. If you are dealing with complex objects try to convert them to a suitable format before processing, or implement the __reduce__, __repr__ or __dict__ methods in your classes.

  • It is important that you try to keep your worker functions pure, which means that they should not have side effects.

  • The .run() method should be called from your main entry point or a function that is called at your main entry point, (this is another limitation of python's multiprocessing)

  • Try to optimize the number of process depending of how long a task takes, prioritize longer running tasks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue-automator-0.0.2.tar.gz (7.2 kB view details)

Uploaded Source

Built Distribution

queue_automator-0.0.2-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file queue-automator-0.0.2.tar.gz.

File metadata

  • Download URL: queue-automator-0.0.2.tar.gz
  • Upload date:
  • Size: 7.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.27.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for queue-automator-0.0.2.tar.gz
Algorithm Hash digest
SHA256 972da168dfe66f5bbd7396c912487281578c61dd8cc8556acfeb3062e19c8796
MD5 b4cb9f509cb4dcb23ee036f4d1f25ae1
BLAKE2b-256 4424d70d66f5ea02a2cf59dfde754f9a406c51ffc4c5f22ffed2ac7e1809482f

See more details on using hashes here.

File details

Details for the file queue_automator-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: queue_automator-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.27.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for queue_automator-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ed2bb8e730a849777db5ecbb113b1054fb23b16ca6583d7d9d716b10aa829e4b
MD5 4b712fd4fdd9297e36b92adb7c00177d
BLAKE2b-256 a328be4e655af1d4021776ff6b6aa4d7715071914c064ff80de5472328c04779

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page