Skip to main content

A ring buffer (circular buffer, circular queue, cyclic buffer) implemented in pure python. Includes flexible locking, waiting, and Disruptor variants.

Project description

PyRing

A pure python implementation of a ring buffer with optional factory for alternate memory allocation. Variants included are Blocking (with a read cursor) and Locked (all manipulations are secured with a lock) ring buffers.

You may not call it a ring buffer, they also go by other names like circular buffer, circular queue or cyclic buffer.

Installation

python3 -m pip install pyring

Usage

Included are several variations of a ring buffer:

  1. RingBuffer - The most basic, non-blocking ring, supports optional RLocks as constructor argument.
  2. LockedRingBuffer - The same as RingBuffer but secured by a lock (handy for multithread / multiproc)
  3. BlockingRingBuffer - Extension of RingBuffer with a read method next() which increments a read cursor and the writer cannot advance past the read cursor
  4. BlockingLockedRingBuffer - The same as BlockingRingBuffer but secured by a lock (handy for multithread / multiproc)
  5. WaitingBlockingRingBuffer - The same as BlockingRingBuffer but calls to next and put block and wait (with optional timeout arg)
  6. SingleProducerDisruptor - The same as WaitingBlockingRingBuffer but allows multiple subscribers to a single ring buffer by calling subscribe() which returns a DisruptorSubscriber object. No next method on disruptor, instead it is on the DisruptorSubscriber objects.

Basic usage with default size and factory

import pyring

# create ring buffer
ring_buffer = pyring.RingBuffer()

# add to ring
ring_buffer.put("Something new!")

# get latest from ring
sequence, value = ring_buffer.get_latest()
print(sequence, value) # 0 Something new!

Custom ring size

import pyring

ring_buffer = pyring.RingBuffer(size=128) # size must be a power of 2

Custom factory

import pyring

# custom factory that takes lists of ints and returns the sum
# must declare the get and set methods
class CustomSumFactory(pyring.RingFactory):
    value: typing.List[int] = []

    def get(self):
        return sum(self.value)

    def set(self, values):
        self.value = values

ring_buffer = pyring.RingBuffer(factory=CustomSumFactory)

ring_buffer.put([1,2,3,4,5])

sequence, value = ring_buffer.get_latest()

print(sequence, value) # 0 15

Multiprocess C-Types Example

The below is a demonstration of using a RingBuffer across multiple processes, this requires firstly a LockedRingBuffer (with a multiproc lock), in addition to a custom cursor_position_value to increment sequence counts across processes.

This is more a demonstration on how flexible this ring buffer implementation can be rather than where you should use it, the below approach would most likely (with caveats on datasize) be better handled with threads reading off the ring buffer and passing messages via queues to worker processes.

import multiprocessing as mp
import time
from pyring import LockedRingBuffer, RingFactory

# using r lock due to the reuse for the cursor_position_value
mp_lock = mp.RLock()

# note if using the same lock it must be recursive lock otherwise you will get deadlocks
cursor_position_value = mp.Value("i", 0, lock=mp_lock)

# using multiproc compatible c-types.
class MultiprocFactory(RingFactory):
    def __init__(self):
        self.value = mp.Value("i")

    def set(self, v: int):
        self.value.value = v

    def get(self):
        return self.value.value


ring_buffer = LockedRingBuffer(
    factory=MultiprocFactory, lock=mp_lock, cursor_position_value=cursor_position_value
)


def worker_routine(worker_ring_buffer: LockedRingBuffer):
    for i in range(10):
        worker_ring_buffer.put(i)


proc = mp.Process(target=worker_routine, args=(ring_buffer,))
proc.start()
time.sleep(0.01)

for i in range(10):
    sequence, value = ring_buffer.get(i)
    print(sequence, value)

proc.join()

Single Producer Disruptor (Multiple Subscribers)

from pyring import SingleProducerDisruptor

disruptor = SingleProducerDisruptor()

subscriber_one = disruptor.subscribe()
subscriber_two = disruptor.subscribe()

for i in range(100):
    disruptor.put(i ** 2)
    sequence_one, res_one = subscriber_one.next()
    sequence_two, res_two = subscriber_two.next()

# releases the subscribers barriers and allows disruptor to continue
subscriber_one.unregister()
subscriber_two.unregister()

Examples of Usage

COMING SOON

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyring-0.0.12.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

pyring-0.0.12-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file pyring-0.0.12.tar.gz.

File metadata

  • Download URL: pyring-0.0.12.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.0.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.7

File hashes

Hashes for pyring-0.0.12.tar.gz
Algorithm Hash digest
SHA256 c3819a26dee5cb167271d4adaa84790d8f7f2ba80d7d80f1309ac749155124b0
MD5 30d8b3ed9c95da160c6311bd7054e2a5
BLAKE2b-256 8a31dc65b00d12e8d324b635377307140b5c2fccb69b9a06f67e80e0f398ef52

See more details on using hashes here.

File details

Details for the file pyring-0.0.12-py3-none-any.whl.

File metadata

  • Download URL: pyring-0.0.12-py3-none-any.whl
  • Upload date:
  • Size: 12.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.0.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.7

File hashes

Hashes for pyring-0.0.12-py3-none-any.whl
Algorithm Hash digest
SHA256 611f1af3234c2acb1f3411c096c8f6d3f083020412af056d1d3c08cbb89daf5a
MD5 3492d7499ac7dfbbac65499a0a6f640a
BLAKE2b-256 000331ebf9aa930e060313c3c449e88d4ddb36a5e53243e19f03c8d22f9bbf3b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page