Skip to main content

A python remote communications library

Project description

kiwipy

Travis CI Latest Version https://img.shields.io/pypi/wheel/kiwipy.svg https://img.shields.io/pypi/pyversions/kiwipy.svg https://img.shields.io/pypi/l/kiwipy.svg

kiwipy is a library that makes remote messaging using RabbitMQ (and any other protocol for which a backend is written) EASY. I don’t know about you but I find RabbitMQ HARD. It’s all too easy to make a configuration mistake which is then difficult to debug. With kiwipy, there’s none of this, just messaging, made simple, with all the nice properties and guarantees of AMQP.

Here’s what you get:

  • RPC
  • Broadcast (with filters)
  • Task queue messages

Let’s dive in, with some examples taken from the rmq tutorial.

RPC

The client:

from kiwipy import rmq

communicator = rmq.RmqThreadCommunicator.connect(connection_params={'url': 'amqp://localhost'})

# Send an RPC message
print(" [x] Requesting fib(30)")
response = communicator.rpc_send('fib', 30).result()
print((" [.] Got %r" % response))

(rmq_rpc_client.py source)

The server:

import threading

from kiwipy import rmq


def fib(comm, num):
    if num == 0:
        return 0
    if num == 1:
        return 1

    return fib(comm, num - 1) + fib(comm, num - 2)


communicator = rmq.RmqThreadCommunicator.connect(connection_params={'url': 'amqp://localhost'})

# Register an RPC subscriber with the name square
communicator.add_rpc_subscriber(fib, 'fib')
# Now wait indefinitely for fibonacci calls
threading.Event().wait()

(rmq_rpc_server.py source)

Worker

Create a new task:

import sys

from kiwipy import rmq

message = ' '.join(sys.argv[1:]) or "Hello World!"

with rmq.RmqThreadCommunicator.connect(connection_params={'url': 'amqp://localhost'}) as communicator:
    communicator.task_send(message)

(rmq_new_task.py source)

And the worker:

import time
import threading

from kiwipy import rmq

print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(_comm, task):
    print((" [x] Received %r" % task))
    time.sleep(task.count(b'.'))
    print(" [x] Done")


try:
    with rmq.RmqThreadCommunicator.connect(connection_params={'url': 'amqp://localhost'}) as communicator:
        communicator.add_task_subscriber(callback)
        threading.Event().wait()
except KeyboardInterrupt:
    pass

(rmq_worker.py source)

Versioning

This software follows Semantic Versioning

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for kiwipy, version 0.5.1
Filename, size & hash File type Python version Upload date
kiwipy-0.5.1-py2.py3-none-any.whl (21.3 kB) View hashes Wheel py2.py3
kiwipy-0.5.1.tar.gz (36.2 kB) View hashes Source None

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page