Skip to main content

Déjà Queue – A fast multiprocessing queue for Python

Project description

Python Version PyPI - Version Conda Version License: MIT GitHub last commit

Déjà Queue

A fast alternative to multiprocessing.Queue. Faster, because it takes advantage of a shared memory ring buffer (rather than slow pipes) and pickle protocol 5 out-of-band data to minimize copies. dejaq.DejaQueue supports any type of picklable Python object, including numpy arrays or nested dictionaries with mixed content.

The speed advantage of DejaQueue becomes substantial for items of > 1 MB size. It enables efficient inter-job communication in big-data processing pipelines using dejaq.Actor or dejaq.stream.

Features:

  • Fast, low-latency, high-throughput inter-process communication
  • Supports any picklable Python object, including numpy arrays and nested dictionaries
  • Zero-copy data transfer with pickle protocol 5 out-of-band data
  • Picklable queue instances (queue object itself can be passed between processes)
  • Peekable (non-destructive read)
  • Actor class for remote method calls and attribute access in a separate process (see dejaq.Actor)

Auto-generated (minimal) API documentation: https://danionella.github.io/dejaq

Installation

  • conda install conda-forge::dejaq

  • or, if you prefer pip: pip install dejaq

  • for development, clone this repository, navigate to the root directory and type pip install -e .

Examples

dejaq.DejaQueue

import numpy as np
from multiprocessing import Process
from dejaq import DejaQueue

def produce(queue):
    for i in range(10):
        arr = np.random.randn(100,200,300)
        data = dict(array=arr, i=i)
        queue.put(data)
        print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\n', flush=True)

def consume(queue, pid):
    while True:
        data = queue.get()
        array, i = data['array'], data['i']
        print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\n', flush=True)

queue = DejaQueue(buffer_bytes=100e6)
producer = Process(target=produce, args=(queue,))
consumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]
for c in consumers:
    c.start()
producer.start()

dejaq.Actor and ActorDecorator

dejaq.Actor allows you to run a class instance in a separate process and call its methods or access its attributes remotely, as if it were local. This is useful for isolating heavy computations, stateful services, or legacy code in a separate process, while keeping a simple Pythonic interface.

Example: Using Actor directly

from dejaq import Actor

class Counter:
    def __init__(self, start=0):
        self.value = start
    def increment(self, n=1):
        self.value += n
        return self.value
    def get(self):
        return self.value

# Start the actor in a separate process
counter = Actor(Counter, start=10)

print(counter.get())         # 10
print(counter.increment())   # 11
print(counter.increment(5))  # 16
print(counter.get())         # 16

counter.close()  # Clean up the process

Example: Using ActorDecorator

from dejaq import ActorDecorator

@ActorDecorator
class Greeter:
    def __init__(self, name):
        self.name = name
    def greet(self):
        return f"Hello, {self.name}!"

greeter = Greeter("Alice")
print(greeter.greet())  # "Hello, Alice!"
greeter.close()

Features

  • Remote method calls: Call methods as if the object was local.
  • Remote attribute access: Get/set attributes of the remote object.
  • Async support: Call method_async() to get a Future for non-blocking calls.
  • Tab completion: Works in Jupyter and most IDEs.

dejaq.Parallel

The following examples show how to use dejaq.Parallel to parallelize a function or a class, and how to create job pipelines.

Here we execute a function and map iterable inputs across 10 workers. To enable pipelining, the results of each stage are provided as iterable generator. Use .run() (or .compute() for backwards compatibility) to get the final result. Results are always ordered.

from time import sleep
from dejaq import Parallel

def slow_function(arg):
    sleep(1.0)
    return arg + 5

input_iterable = range(100)
slow_function = Parallel(n_workers=10)(slow_function)
stage = slow_function(input_iterable)
result = stage.run() # or list(stage)
# or shorter: 
result = Parallel(n_workers=10)(slow_function)(input_iterable).compute()

You can also use Parallel as a function decorator:

@Parallel(n_workers=10)
def slow_function_decorated(arg):
    sleep(1.0)
    return arg + 5

result = slow_function_decorated(input_iterable).run()

Similarly, you can decorate a class. It will be instantiated within a worker. Iterable items will be fed to the __call__ method. Note how the additional init arguments are provided:

@Parallel(n_workers=1)
class Reader:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, item):
        return item + self.arg1

result = Reader(arg1=0.5)(input_iterable).compute()

Finally, you can create pipelines of chained jobs. In this example, we have a single threaded reader and consumer, but a parallel processing stage (an example use case is sequentially reading a file, compressing chunks in parallel and then sequentially writing to an output file):

@Parallel(n_workers=1)
class Producer:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, item):
        return item + self.arg1

@Parallel(n_workers=10)
class Processor:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, arg):
        sleep(1.0) #simulating a slow function
        return arg * self.arg1

@Parallel(n_workers=1)
class Consumer:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, arg):
        return arg - self.arg1

input_iterable = range(100)
stage1 = Producer(0.5)(input_iterable)
stage2 = Processor(10.0)(stage1)
stage3 = Consumer(1000)(stage2)
result = stage3.run()

# or:
result = Consumer(1000)(Processor(10.0)(Producer(0.5)(input_iterable))).run()

See also

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dejaq-0.7.0.tar.gz (41.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dejaq-0.7.0-py3-none-any.whl (32.8 kB view details)

Uploaded Python 3

File details

Details for the file dejaq-0.7.0.tar.gz.

File metadata

  • Download URL: dejaq-0.7.0.tar.gz
  • Upload date:
  • Size: 41.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dejaq-0.7.0.tar.gz
Algorithm Hash digest
SHA256 faa71e5abf3d6cbbe94954bc6c99456f046284083f0c2ad43fb40bee7e6c77a3
MD5 29ca7d528c77606c0e641c62588c44aa
BLAKE2b-256 a4dc692ab33df519d12bbb69c743b39f389941e5d6a70458fd4b754cf150b6c2

See more details on using hashes here.

File details

Details for the file dejaq-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: dejaq-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 32.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dejaq-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1b5396b3f46786abca3bd946f8952a21859eb32b9aff3186aae9fe7f4c46b272
MD5 91f184ee5da3f8e3a79e888be7d65349
BLAKE2b-256 9235dbcf38509ce91c0475a5ef5d38cc4bff0b2993a5dba575a458ca96a69a7c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page