Skip to main content

A collection of helpful queue utilities. Pipes, timers, tickers, multiplexors, multicasters and queue select.

Project description

queue-utilities

Let's make using Queues great again! Queue utilities and conveniences for those using sync libraries.

Currently implements using threads and threading queues only, multiprocessing queues and process support will be implemented soon.

This utilities package contains the following classes:

  1. Pipe - Pipe messages from one queue to another.
  2. Timer - Threaded timer that emits time on internal or provided queue after given wait time period. Can be cancelled.
  3. Ticker - Same as timer but emits time at regular intervals until stopped.
  4. Multiplex - Many-to-One (fan-in) queue management helper.
  5. Multicast - One-to-Many (fan-out) queue management helper.
  6. Select - Like Multiplex but output payload contains message source queue to be used in dynamic message based switching. Inspired by Golangs select statements using channels.

Note that this package is early stages of development.

Installation

python3 -m pip install queue-utilities

Usage

Pipe

from queue_utilities import Pipe

original_q, output_q = _queue.Queue(), _queue.Queue()

p = Pipe(original_q, output_q)

# put an item into the original queue
original_q.put(1)

# get the message off the output queue
recv = output_q.get()
print(recv)  # 1

# don't forget to stop the pipe after you've finished.
p.stop()

Timer

from queue_utilities import Timer

# emit time after 5 seconds
five_seconds = Timer(5)
five_seconds.get()

# cancel a timer
to_cancel = Timer(60)
print(to_cancel._is_finished) # False
to_cancel.stop()
print(to_cancel._is_finished) # True

Ticker

from queue_utilities import Ticker

# print the time every 5 seconds 4 times
tick = Ticker(5)
for _ in range(4):
    print(f"The time is: {tick.get()}")

# cancel the ticker thread
tick.stop()

Multiplex

from queue_utilities import Multiplex

# TODO

Multicast

from queue_utilities import Multicast

# TODO

Select

Timeout a function with Timer

from threading import Thread
import time
from queue import Queue
from queue_utilities import Select, Timer


def do_something_slow(q: Queue) -> None:
    # do something slow
    time.sleep(3)
    q.put("Done")


output_q = Queue()
Thread(target=do_something_slow, args=(output_q,)).start()

timeout = Timer(2)

select = Select(output_q, timeout._output_q)

for (which_q, result) in select:
    if which_q is output_q:
        print("Received result", result)
    else:
        print("Function timed out!")
    break

select.stop()

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for queue-utilities, version 0.0.2
Filename, size File type Python version Upload date Hashes
Filename, size queue_utilities-0.0.2-py3-none-any.whl (7.3 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size queue_utilities-0.0.2.tar.gz (4.4 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page