Robust, high-volume, message based communication made easy
Project description
kiwiPy
kiwiPy is a library that makes remote messaging using RabbitMQ (and possibly other message brokers) EASY. It was designed to support high-throughput workflows in big-data and computational science settings and is currently used by AiiDA for computational materials research around the world. That said, kiwiPy is entirely general and can be used anywhere where high-throughput and robust messaging are needed.
Here’s what you get:
RPC
Broadcast (with filters)
Task queue messages
Let’s dive in, with some examples taken from the rmq tutorial. To see more detail head over to the documentation.
RPC
The client:
import kiwipy
with kiwipy.connect('amqp://localhost') as comm:
# Send an RPC message
print(" [x] Requesting fib(30)")
response = comm.rpc_send('fib', 30).result()
print((" [.] Got %r" % response))
The server:
import threading
import kiwipy
def fib(comm, num):
if num == 0:
return 0
if num == 1:
return 1
return fib(comm, num - 1) + fib(comm, num - 2)
with kiwipy.connect('amqp://127.0.0.1') as comm:
# Register an RPC subscriber with the name 'fib'
comm.add_rpc_subscriber(fib, 'fib')
# Now wait indefinitely for fibonacci calls
threading.Event().wait()
Worker
Create a new task:
import sys
import kiwipy
message = ' '.join(sys.argv[1:]) or "Hello World!"
with rmq.connect('amqp://localhost') as comm:
comm.task_send(message)
And the worker:
import time
import threading
import kiwipy
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(_comm, task):
print((" [x] Received %r" % task))
time.sleep(task.count(b'.'))
print(" [x] Done")
try:
with kiwipy.connect('amqp://localhost') as comm:
comm.add_task_subscriber(callback)
threading.Event().wait()
except KeyboardInterrupt:
pass
Versioning
This software follows Semantic Versioning
Contributing
Want a new feature? Found a bug? Want to contribute more documentation or a translation perhaps?
Help is always welcome, get started with the contributing guide.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kiwipy-0.6.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | cb0f6e5daa51df1c1f52972d8dc8b091a6925d0ed60521f319ae42ae41972b7b |
|
MD5 | 19e10985f4ccd02561b8b4d4f31390f0 |
|
BLAKE2b-256 | 2a600cd2c543f6bb9662f4bdf0b68ac83bd5be1491d065c9c5b0098750c8309b |