Skip to main content

Durable messaging for distributed processing

Project description

muppet is Python implementation of mutual. muppet provides RemoteChannel for simple messaging across process or machine boundaries and DurableChannel for durable messaging across process or machine boundaries. Both RemoteChannel and DurableChannel use Redis for message store.

Remote Channel

Remote Channel follows a pub-sub model where every message sent on a channel is broadcast to all the subscribers listening on the channel.

Usage:

from muppet import RemoteChannel

# define the callback to receive messages
def callback(message):
  print("received:", message)
  # we are done with the receiver
  receiver.end()

# redis server details
redis_options = {"redis": {"host": "127.0.0.1", "port": 6379}}
# create a remote channel to send messages
sender = RemoteChannel("greeting", redis_options)
# create a remote channel to receive messages
receiver = RemoteChannel("greeting", redis_options)
# listen for messages by passing the callback
receiver.listen(callback)
# send a message
sender.send("hello")
# we are done with the sender
sender.end()

Durable Channel

Durable Channel follows a queue model, where a message sent on a channel is picked up by any one of the receivers listening on the channel. Using DurableChannel, senders can send messages with a timeout, so they are informed when a message is not replied to within the specified timeout. Every message is guaranteed to be replied to within a specified timeout, if not, sender is informed via a callback.

Usage:

from muppet import DurableChannel

def timeout_callback(message):
  print "timed out:", message
  # we are done with the worker
  worker.end()
  # we are done with dispatcher
  dispatcher.end()

# redis server details
redis_options = {"redis": {"host": "127.0.0.1", "port": 6379}}
# create a durable channel to dispatch messages
dispatcher = DurableChannel("dispatcher.1", redis_options)
# create a durable channel to receive messages, note the 3rd argument which is the callback for handling timeouts
worker = DurableChannel("worker.1", redis_options, timeout_callback)

# dispatch a message to worker.1
dispatcher.send(content="task", to="worker.1")

# receive the message
message = worker.receive()
print "received message:", message["content"]
# reply to the message
worker.reply(message=message, response="reply", timeout=5000)

# receive the reply
reply = dispatcher.receive()
print "received reply:", reply["content"]

# we are happy with the reply
dispatcher.close(reply)

# we are done with dispatcher and worker
worker.end()
dispatcher.end()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muppet-0.1.6.tar.gz (4.3 kB view details)

Uploaded Source

File details

Details for the file muppet-0.1.6.tar.gz.

File metadata

  • Download URL: muppet-0.1.6.tar.gz
  • Upload date:
  • Size: 4.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for muppet-0.1.6.tar.gz
Algorithm Hash digest
SHA256 705cbb73d037cb57072c011083caef5d20eb22fb19cce6e94fe3c225ea1c3494
MD5 d7954473ea15d60dedd51e743df381ee
BLAKE2b-256 f134d2e0922acb70d7c53cd26231bc88d2dc95c276e1dcb10a17e2b702f15343

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page