Skip to main content

asyncronous pubsub in django using redis

Project description Code Climate

asyncronous subscription distrobution for django (with websocket support!!!!).


You’ll first need to create some publishable models.


class Correspondence(PublishableModel):

    participants = models.ManyToManyField(settings.AUTH_USER_MODEL, related_name="correspondences")

    def save(self, *args, **kwargs):
        super(Correspondence, self).save(*args, **kwargs)
        # add subscribe all the users to the channel
        channel =
        for subscriber in self.participants:

class Message(PublishableModel):

    correspondence = models.ForeignKey("Correspondence", related_name="messages")
    author = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="sent_messages")
    body = models.TextField()

    def save(self, *args, **kwargs):
        if not hasattr(self, "channel"):
        super(Message, self).save(*args, **kwargs)


def send_message(request, correspondence, *args, **kwargs):
    message = Message.objects.create(
    return render_to_response(request, "messages.html", {"message": message})


def read_messages(ws, params, user, manager):
    subscription = user.subscriptions.get(channel__name="something:unique".format(user.username))
    reader = subscription.get_reader(manager=manager)

    def send_message_alert(channel_name, model):
        alert = json.dumps({"message": "new message from {0}".format(
        return True

    listener = yield from reader.listen()
    yield from listener

In the above example, a client who has established a websocket connection to the handler in will receive alerts as long as the websocket connection remains open. When another client sends a POST request to the send_message view in the message will be published and received by the read_messages.send_message_alert callback where further processing/serialization can occur.


If you choose to use redis_pubsub.contrib.websockets there are additional packages that you will need to install:

$ pip install aiohttp aiohttp_wsgi

Websocket handlers belong in module in your application by the name of This module should export a handlerconf, which is a list of the names of the handlers in the module

@websocket("/")  # this handler will be at
def myhandler(ws, params, **kwargs):

handlerconf = ["myhandler", ]

Websocket requests are handled with the excellent aiohttp package which takes care of the encoding/decoding, handshake, and cleanup of a websocket session. Handlers for websocket requests are coroutines decorated with either the redis_pubsub.contrib.websockets.websocket or redis_pubsub.contrib.websockets.websocket_pubsub wrappers. These wrappers handle converting your handler to a coroutine and passing arguments to your handler. A simple handler that echo’s a message back to the client would look like this

def echo(ws, params, **kwargs):
    message = yield from ws.receive()

The former example shows a websocket handler that waits for a message from a connected client, echo’s the message back to the client and closes the connection.

Websocket Authentication

If you choose to use authenticated websockets you will need to either install djangorestframework and use the rest_framework.authtoken.models.Token object as your authentication method or simply use rest_framework_jwt to distribute and challenge JTW’s provided by your client. to configure authentication with one of these methods (or your own token authentication method) add the module path to the REDIS_PUBSUB config:

    "tokenauth_method": "redis_pubsub.auth.authjwt_method",  # defaults to "redis_pubsub.auth.authtoken_method"

If you do decide to roll your own tokenauth_method, this method must accept a single argument (the token string) and return either None if the token is not valid or an instance of AUTH_USER_MODEL if the token is valid.

Websocket Pubsub

You can access the Pubsub methods provided by redis_pubsub in your websocket handlers by decorating your handler with the redis.pubsub.contrib.websockets.websocket_pubsub wrapper. This wrapper provides an additional argument manager to your handler. The manager can be used to keep track of subscription channels and stop them if necessary


@websocket_pubsub("/messages", authenticate=True)
def message_pusher(ws, params, manager, user, **kwargs):
    subscription = user.subscriptions.get(channel__name="messages")
    reader = subscription.get_reader(manager=manager)

    def callback(channel_name, message):
        to_client = {
            channel_name: {
                "body": message.body
        return True

    listener = yield from reader.listen()
    yield from listener

This example shows the main purpose of the redis_pubsub package, which is to listen for updates on a redis channel and push the publication to a client. Lets break it down line by line

  1. retreive the users subscription
  2. create a managed ChannelReader object for this subscription
  3. register a callback to be executed whenever a new publication is received
  4. begin listening for changes
  5. listen until the channel is closed

The most fruitful method offerd by a SubscriptionManager is listen_to_all_subscriptions which takes two arguments, a subscriber and a callback, and publishes subscriptions as they arrive


@websocket_pubsub("/subscriptions", authenticate=True)
def subscriptions(ws, params, manager, user, **kwargs):

    def callback(channel_name, message):
        return True

    manager.listen_to_all_subscriptions(user, callback)

    while True:
        message = yield from ws.receive()
        if not in (MsgType.error, MsgType.close):
            message = json.loads(message)
            if message["action"] == "unsubscribe":
                subscription = user.subscriptions.get(channel__name=message["channe"])
       = False
                reader = manager.readers[message["channel"]]
                yield from manager.remove(reader)
            elif message["action"] == "subscribe":
                channel = Channel.objects.get(name=message["channel"])
                reader = channel.subscribe(user).get_reader(manager=manager)
                yield from reader.listen()

The callback in this example will keep all subscription channels open and push messages to a client until the websocket has closed. This code provides a simple means of managing users with a multitude of subscriptions. The while loop here also handles unsubscribing and subscribing to new channels


A callback function should never receive from a websocket or else a RuntimeError will be raised.


when deploying an application with websockets/aiohttp you will not be able to use the normal django deployment proceedures. Since your django application will be a component of an AioHttp application object, you will have to use Gunicorn as an application server. Using utilities from the redis_pubsub.contrib.websockets module you can create a deployment file simply


import asyncio

from django.core.wsgi import get_wsgi_application

from aiohttp_wsgi import WSGIHandler

from redis_pubsub.contrib.websockets import setup

wsgi_app = get_wsgi_application()  # django.setup() is called here
wsgi_handler = WSGIHandler(wsgi_app)

loop = asyncio.get_event_loop()
application = setup(loop=loop)
# any url patterns not matched by the Websocket app go to the django app for handling
application.router.add_route("*", "/{path_info:.*}", wsgi_handler.handle_request)

you can then start gunicorn by running:

$ gunicorn deployment:application --bind localhost:8080 --worker-class aiohttp.worker.GunicornWebWorker

Project details

Release history Release notifications | RSS feed

This version


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django-redis-pubsub-1.0.tar.gz (13.0 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page