Skip to main content

Message handling for Django channels

Project description

Envelope

Note: While some of this may be work in progress, we'll make sure to bump major version number whenever we change anything that might break stuff.

We haven't written docs for most things, but the source code should hopefully be clear enough.

Introduction

While channels handles routing of messages within the async domain, it's very open in terms of what gets transported.

Envelope is basically a system to structure and handle what different messages mean and what they do.

Some core principles:

  • Readability above speed Messages are serialized and deserialized as real objects that contain metadata about their origins and have methods on them. If you need to process 1M messages / sec you shouldn't use channels anyway.
  • Keep the async and sync world as separate as possible If something needs to work with the database, defer it to a queue. Mixing these makes testing very hard and causes a lot of unexpected errors.
  • Pluggable Allow other applications to build on this and inject functionality.
  • Predictability and type checking We use Pydantic and Pythons type annotations. All valid message types must be registered as valid within each specific communication channel to be valid.
  • Message validation in different steps First the basic message structure, then the payload itself and any expensive validations (like db queries) outside the async consumer.
  • No surprises If a payload doesn't conform to an actual message that's supposed to be communicated through that channel, it will be dropped.

Prerequisites

  • Basic Django knowledge.
  • Read up on Django Channels and what it does - especially consumers.

Dependencies - and why

  • RQ with django_rq handles queues and deferred actions.
  • async-signals for exactly what the package says. Since Django 5 this works in Djangos default signals, we may refactor later.
  • pydantic for all schemas, but with a bit of refactoring it would be possible to use any serializer.

Core concepts

Envelope

Keeps track of what kind of messages to accept and how to handle them. Performs serialization/deserialization and basic validation of message payload. The registered envelopes always have a single direction via a single transport type, for instance incoming websocket message, or outgoing websocket message.

Envelopes have short keys where only t is required.

Envelope schema - keys

  • t Type of message. Must exist within a message registry as key.
  • i Message trace id. Any error or response caused by this message will be returned with the same trace id. It's a good idea to pass this along to whatever action this message will cause. (A process queue for instance)
  • p Payload. Can be blank, normally a dict, but it's up to the envelopes schema to define this.
  • s State - an outbound message that's caused by another message usually has state to indicate the result of the action. By using state and i together it's possible to build request/response functionality.

Default message states

  • a Acknowledged - message received and passed at least basic validation.
  • q Queued - Waiting within a process queue.
  • r Running - process worker started working on this task. (It may be a good idea to send periodic updates with this marker if it's a long-running task)
  • s Success. Any kind of thumbs up reply, doesn't have to be more dramatic than a ping/pong reply.
  • f Failed (Through exception or something else)

Message

A class that knows what to do with a specific message. It may have other actions it will perform when it's received, and it may define a Pydantic schema for its payload.

Deserialized messages also have metadata that keeps track of their origin and possible trace id.

Message registry

Not much more than a dict where the key is a string corresponding to message type, and the value is a message class.

Registries have names that correspond to their communication channel. They're always one direction, but messages can be added to different registries.

Use names that explain the direction, for instance 'websocket_incoming'.

Connection signals

The async signals consumer_connected and consumer_closed fires as soon as a connection is accepted or closed. They in turn will que a job with the sync signals connection_created and connection_closed. That way the sync code won't block the connection.

>>> from envelope.async_signals import consumer_connected, consumer_closed
>>> from envelope.signals import connection_created, connection_closed

Message signals

Any recipient of a message will generate an event that other parts of the application can react to. That way functionality can be added to messages. For instance any message that inherits envelope.core.message.AsyncRunnable will call the method run.

Each message direction has its own async signal.

>>> from envelope.async_signals import incoming_websocket_message
>>> from envelope.async_signals import outgoing_websocket_message
>>> from envelope.async_signals import outgoing_websocket_error

Settings

Most of the required settings will be handled by django_rq and channels. But there are some aspects of envelope that can be tweaked.

Remember that message registries will contain some mesasges by default - if you don't want that, make sure to clear them.

ENVELOPE_CONNECTIONS_QUEUE (str) - default: None

: Name of the RQ queue to use for jobs that will create Connection objects. None disables functionality.

ENVELOPE_TIMESTAMP_QUEUE (str) - default: None

: Name of the RQ queue to use for timestamp updates for Connection objects. None disables functionality.

ENVELOPE_CONNECTION_UPDATE_INTERVAL (int) - in seconds, default: 180

: How often should a timestamp job be queued? None disables functionality.

ENVELOPE_BATCH_MESSAGE (str) - default: envelope.messages.common.BatchMessage

: Which class to use for batch messages.

ENVELOPE_SENDER_UTIL (str) - default: envelope.utils.SenderUtil

: Which class to use for sender util.

ENVELOPE_USER_CHANNEL_SEND_SUBSCRIBE (bool) - default: False

: Send a subscribe message to the consumer when user connects. This behaves the same way as if the client sent a subscribe message manually and will transmit app_state.

ENVELOPE_ALLOW_UNAUTHENTICATED (bool) - default: False

: Experimental

Usage examples

Sending messages when content is changed

In this example, we'll signal the users own channel when the user object is changed. A user that has several tabs connected to the same server will see the change in all tabs instantly.

Make sure envelope.app.user_channel is in INSTALLED_APPS for this example to work.

>>> from pydantic import BaseModel
>>> from django.contrib.auth import get_user_model
>>> from django.dispatch import receiver
>>> from django.db.models.signals import post_save
>>> from envelope.core.message import Message
>>> from envelope.app.user_channel.channel import UserChannel
>>> from envelope import WS_OUTGOING
>>> from envelope.decorators import add_message
    
>>> User = get_user_model()
    
>>> class UserSchema(BaseModel):
...     username: str
...     first_name: str = ""
...     last_name: str = ""
...     email: str = ""
    
    
>>> @add_message(WS_OUTGOING)
... class UserDetails(Message):
...     name = "user.details"
...     schema = UserSchema
    
    
>>> @receiver(post_save, sender=User)
... def send_user_details_on_change(instance: User, **kwargs):
...     data = {k: getattr(instance, k) for k in UserSchema.schema()['properties']}
...     msg = UserDetails(**data)
...     channel = UserChannel.from_instance(instance)
...     channel.sync_publish(msg)
    
# We'll mock the channels layer to catch the message
# By default, sync messages will only be sent when the transaction commits
# to avoid sending messages for things that may not happen.

>>> from json import loads
>>> from unittest.mock import patch
>>> from channels.layers import get_channel_layer
>>> layer = get_channel_layer()

# Test here is Djangos instance of TestCase
>>> with patch.object(layer, 'group_send') as mock_send:
...     with test.captureOnCommitCallbacks(execute=True):
...         new_user = User.objects.create(username="jane", first_name="Jane", last_name="Doe")
...         first = mock_send.called  # No message yet since db hasn't committed!
...     second = mock_send.called


>>> first
False

>>> second
True

>>> mock_send.mock_calls[0].args[0] == f"user_{new_user.pk}"
True

>>> data = loads(mock_send.mock_calls[0].args[1]['text_data'])
>>> data['p']
{'username': 'jane', 'first_name': 'Jane', 'last_name': 'Doe', 'email': ''}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

channels_envelope-1.1.0.tar.gz (34.6 kB view details)

Uploaded Source

Built Distribution

channels_envelope-1.1.0-py3-none-any.whl (48.7 kB view details)

Uploaded Python 3

File details

Details for the file channels_envelope-1.1.0.tar.gz.

File metadata

  • Download URL: channels_envelope-1.1.0.tar.gz
  • Upload date:
  • Size: 34.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for channels_envelope-1.1.0.tar.gz
Algorithm Hash digest
SHA256 2742e474df64560d317deb700ea4559e1d254b4be5f4ac83b70be40c0d7bd0ca
MD5 66aa87f1ad7134032c83abeeb2da84e2
BLAKE2b-256 ef220199bd0c0e2c88dc63ae47f6041407489d6680ebd19376306d5b45ae3a4e

See more details on using hashes here.

File details

Details for the file channels_envelope-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for channels_envelope-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2cd2fa58eb7b100bfa035bb53626763910955638d5515c911e23f78a419ba511
MD5 1b4f3188cc820b4b6a1453699520a273
BLAKE2b-256 4615d60ad5c6ce418d79b65046606d9cc3b88f5374a302209902a9d28b59f7d4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page