Skip to main content

Déjà Queue – A fast multiprocessing queue for Python

Project description

Python Version PyPI - Version Conda Version License: MIT GitHub last commit

Déjà Queue

A fast alternative to multiprocessing.Queue. Faster, because it takes advantage of a shared memory ring buffer (rather than slow pipes) and pickle protocol 5 out-of-band data to minimize copies. dejaq.DejaQueue supports any type of picklable Python object, including numpy arrays or nested dictionaries with mixed content.

The speed advantege of DejaQueue becomes substantial for items of > 1 MB size. It enables efficient inter-job communication in big-data processing pipelines, which can be implemented in a few lines of code with dejaq.Parallel.

Auto-generated (minimal) API documentation: https://danionella.github.io/dejaq

Installation

  • conda install danionella::dejaq

  • or, if you prefer pip: pip install dejaq

  • for development, clone this repository, navigate to the root directory and type pip install -e .

Examples

dejaq.DejaQueue

import numpy as np
from multiprocessing import Process
from dejaq import DejaQueue

def produce(queue):
    for i in range(10):
        arr = np.random.randn(100,200,300)
        data = dict(array=arr, i=i)
        queue.put(data)
        print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\n', flush=True)

def consume(queue, pid):
    while True:
        data = queue.get()
        array, i = data['array'], data['i']
        print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\n', flush=True)

queue = DejaQueue(buffer_bytes=100e6)
producer = Process(target=produce, args=(queue,))
consumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]
for c in consumers:
    c.start()
producer.start()

dejaq.Parallel

The following examples show how to use dejaq.Parallel to parallelize a function or a class, and how to create job pipelines.

Here we execute a function and map iterable inputs across 10 workers. To enable pipelining, the results of each stage are provided as iterable generator. Use the .compute() method to get the final result (note that each stage pre-fetches results from n_workers calls, so some of the execution already starts before .compute). Results are always ordered.

from time import sleep
from dejaq import Parallel

def slow_function(arg):
    sleep(1.0)
    return arg + 5

input_iterable = range(100)
slow_function = Parallel(n_workers=10)(slow_function)
stage = slow_function(input_iterable)
result = stage.compute() # or list(stage)
# or shorter: 
result = Parallel(n_workers=10)(slow_function)(input_iterable).compute()

You can also use Parallel as a function decorator:

@Parallel(n_workers=10)
def slow_function_decorated(arg):
    sleep(1.0)
    return arg + 5

result = slow_function_decorated(input_iterable).compute()

Similarly, you can decorate a class. It will be instantiated within a worker. Iterable items will be fed to the __call__ method. Note how the additional init arguments are provided:

@Parallel(n_workers=1)
class Reader:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, item):
        return item + self.arg1

result = Reader(arg1=0.5)(input_iterable).compute()

Finally, you can create pipelines of chained jobs. In this example, we have a single threaded reader and consumer, but a parallel processing stage (an example use case is sequentially reading a file, compressing chunks in parallel and then sequentially writing to an output file):

@Parallel(n_workers=1)
class Producer:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, item):
        return item + self.arg1

@Parallel(n_workers=10)
class Processor:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, arg):
        sleep(1.0) #simulating a slow function
        return arg * self.arg1

@Parallel(n_workers=1)
class Consumer:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, arg):
        return arg - self.arg1

input_iterable = range(100)
stage1 = Producer(0.5)(input_iterable)
stage2 = Processor(10.0)(stage1)
stage3 = Consumer(1000)(stage2)
result = stage3.compute()

# or:
result = Consumer(1000)(Processor(10.0)(Producer(0.5)(input_iterable))).compute()

See also

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dejaq-0.0.8.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

dejaq-0.0.8-py3-none-any.whl (8.7 kB view details)

Uploaded Python 3

File details

Details for the file dejaq-0.0.8.tar.gz.

File metadata

  • Download URL: dejaq-0.0.8.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.20

File hashes

Hashes for dejaq-0.0.8.tar.gz
Algorithm Hash digest
SHA256 9c55943c89bc30274eb42f3f866524f3e7660274ad81160ba892a989702b0273
MD5 63976954fcb5b3f0d3701e040c4e2010
BLAKE2b-256 a1e65751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a

See more details on using hashes here.

File details

Details for the file dejaq-0.0.8-py3-none-any.whl.

File metadata

  • Download URL: dejaq-0.0.8-py3-none-any.whl
  • Upload date:
  • Size: 8.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.20

File hashes

Hashes for dejaq-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 e11a59c86fbc521ddddc86fd2d09f67208ec6d8558c210295d630c4975d5aa63
MD5 f275f595cb0a07259d5b85068feaa626
BLAKE2b-256 68ed03332b7a5e0f026b67335123e4cb9c530b80ee6855034129b5fc25aa9ccf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page