Skip to main content

Buffered pipe through shared memory.

Project description

buffered_pipe

Buffered pipe through shared memory.

core features

  • Pipe: alias for Generic_Pipe
  • Generic_Pipe: free-length data, bytes or picklable
  • Static_Pipe: fixed length data, only bytes supported

install


pip install buffered-pipe

usage


  • Generic_Pipe
import multiprocessing
import threading

from buffered_pipe import Generic_Pipe


def foo(barrier, pipe_send):
    pipe_send.register()
    barrier.wait()
    pipe_send.send("Hello, world!")


def bar(barrier, pipe_recv):
    pipe_recv.register()
    barrier.wait()
    print(pipe_recv.recv())


if __name__ == "__main__":
    barrier = multiprocessing.Barrier(3)
    pipe_recv, pipe_send = Generic_Pipe(
        buffer_size=2048, duplex=False
    )
    P = multiprocessing.Process(target=foo, args=(barrier, pipe_send))
    T = threading.Thread(target=bar, args=(barrier, pipe_recv))
    P.start()
    T.start()
    barrier.wait()
    P.join()
    T.join()
  • Static_Pipe
import multiprocessing
import threading

from buffered_pipe import Static_Pipe


def foo(barrier, pipe_send):
    pipe_send.register()
    barrier.wait()
    ret_string = "Hello, world!"
    ret_string = ret_string + " " * (32 - len(ret_string))
    ret_string = ret_string.encode()
    pipe_send.send(ret_string)


def bar(barrier, pipe_recv):
    pipe_recv.register()
    barrier.wait()
    ret_string = pipe_recv.recv()
    print(ret_string.decode())


if __name__ == "__main__":
    barrier = multiprocessing.Barrier(3)
    pipe_recv, pipe_send = Static_Pipe(object_size=32, object_count=16, duplex=False)
    P = multiprocessing.Process(target=foo, args=(barrier, pipe_send))
    T = threading.Thread(target=bar, args=(barrier, pipe_recv))
    P.start()
    T.start()
    barrier.wait()
    P.join()
    T.join()
  • When you create a process or thread, call register of Pipe
  • The buffer_size of Generic_Pipe is in bytes.
  • The object_size of Static_Pipe is in bytes.
  • object_count in Static_Pipe is the maximum number of objects placed in the buffer.
  • duplex is equivalent to multiprocessing.Pipe.

tests


> benchmark_result.txt
for f in benchmarks/*.py; do python3 "$f" &>> benchmark_result.txt; done
> test_result.txt
for f in tests/*.py; do python3 "$f" &>> test_result.txt; done

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

buffered_pipe-0.1.0.tar.gz (11.5 kB view details)

Uploaded Source

File details

Details for the file buffered_pipe-0.1.0.tar.gz.

File metadata

  • Download URL: buffered_pipe-0.1.0.tar.gz
  • Upload date:
  • Size: 11.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for buffered_pipe-0.1.0.tar.gz
Algorithm Hash digest
SHA256 bebe659c1709c7042fb5dd3a5e6794e634340fc027edec607306e3bdb0d0a6d7
MD5 48fd5875984962fa4e9844c59588ae17
BLAKE2b-256 13ca88ac9ba03ccba7bf74989682e015030e5bbfff40d8160b665390e158f524

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page