Skip to main content
Join the official 2020 Python Developers SurveyStart the survey!

Robust, high-volume, message based communication made easy

Project description

kiwiPy

kiwiPy Travis CI Docs status Latest Version https://img.shields.io/pypi/wheel/kiwipy.svg https://img.shields.io/pypi/pyversions/kiwipy.svg https://img.shields.io/pypi/l/kiwipy.svg

kiwiPy is a library that makes remote messaging using RabbitMQ (and possibly other message brokers) EASY. It was designed to support high-throughput workflows in big-data and computational science settings and is currently used by AiiDA for computational materials research around the world. That said, kiwiPy is entirely general and can be used anywhere where high-throughput and robust messaging are needed.

Here’s what you get:

  • RPC
  • Broadcast (with filters)
  • Task queue messages

Let’s dive in, with some examples taken from the rmq tutorial. To see more detail head over to the documentation.

RPC

The client:

import kiwipy

with kiwipy.connect('amqp://localhost') as comm:
    # Send an RPC message
    print(" [x] Requesting fib(30)")
    response = comm.rpc_send('fib', 30).result()
    print((" [.] Got %r" % response))

(rmq_rpc_client.py source)

The server:

import threading
import kiwipy

def fib(comm, num):
    if num == 0:
        return 0
    if num == 1:
        return 1

    return fib(comm, num - 1) + fib(comm, num - 2)

with kiwipy.connect('amqp://127.0.0.1') as comm:
    # Register an RPC subscriber with the name 'fib'
    comm.add_rpc_subscriber(fib, 'fib')
    # Now wait indefinitely for fibonacci calls
    threading.Event().wait()

(rmq_rpc_server.py source)

Worker

Create a new task:

import sys
import kiwipy

message = ' '.join(sys.argv[1:]) or "Hello World!"

with rmq.connect('amqp://localhost') as comm:
    comm.task_send(message)

(rmq_new_task.py source)

And the worker:

import time
import threading
import kiwipy

print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(_comm, task):
    print((" [x] Received %r" % task))
    time.sleep(task.count(b'.'))
    print(" [x] Done")


try:
    with kiwipy.connect('amqp://localhost') as comm:
        comm.add_task_subscriber(callback)
        threading.Event().wait()
except KeyboardInterrupt:
    pass

(rmq_worker.py source)

Versioning

This software follows Semantic Versioning

Contributing

Want a new feature? Found a bug? Want to contribute more documentation or a translation perhaps?

Help is always welcome, get started with the contributing guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for kiwipy, version 0.6.1
Filename, size File type Python version Upload date Hashes
Filename, size kiwipy-0.6.1-py2.py3-none-any.whl (27.6 kB) File type Wheel Python version py2.py3 Upload date Hashes View
Filename, size kiwipy-0.6.1.tar.gz (44.0 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page