Skip to main content

Django CQRS data synchronisation

Project description

Django CQRS

pyversions PyPI Docs Coverage GitHub Workflow Status PyPI status Quality Gate Status PyPI Downloads GitHub

django-cqrs is an Django application, that implements CQRS data synchronisation between several Django microservices.


In Connect we have a rather complex Domain Model. There are many microservices, that are decomposed by subdomain and which follow database-per-service pattern. These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. Many of these services aggregate data from other ones and usually API Composition is totally enough. But, some services are working too slowly with API JOINS, so another pattern needs to be applied.

The pattern, that solves this issue is called CQRS - Command Query Responsibility Segregation. Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to Domain events published by the service that owns the data. Data is eventually consistent and that's okay for non-critical business transactions.


Full documentation is available at


You can find an example project here


  • Setup RabbitMQ
  • Install django-cqrs
  • Apply changes to master service, according to RabbitMQ settings

from django.db import models
from dj_cqrs.mixins import MasterMixin, RawMasterMixin

class Account(MasterMixin, models.Model):
    CQRS_ID = 'account'
    CQRS_PRODUCE = True  # set this to False to prevent sending instances to Transport
class Author(MasterMixin, models.Model):
    CQRS_ID = 'author'
    CQRS_SERIALIZER = 'app.api.AuthorSerializer'

# For cases of Diamond Multi-inheritance or in case of Proxy Django-models the following approach could be used:
from mptt.models import MPTTModel
from dj_cqrs.metas import MasterMeta

class ComplexInheritanceModel(MPTTModel, RawMasterMixin):
    CQRS_ID = 'diamond'

class BaseModel(RawMasterMixin):
    CQRS_ID = 'base'

class ProxyModel(BaseModel):
    class Meta:
        proxy = True


CQRS = {
    'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
    'host': RABBITMQ_HOST,
    'port': RABBITMQ_PORT,
    'password': RABBITMQ_PASSWORD,
  • Apply changes to replica service, according to RabbitMQ settings
from django.db import models
from dj_cqrs.mixins import ReplicaMixin

class AccountRef(ReplicaMixin, models.Model):
    CQRS_ID = 'account'
    id = models.IntegerField(primary_key=True)

class AuthorRef(ReplicaMixin, models.Model):
    CQRS_ID = 'author'
    def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
        # Override here
    def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
        # Override here

CQRS = {
    'transport': 'dj_cqrs.transport.RabbitMQTransport',
    'queue': 'account_replica',
    'host': RABBITMQ_HOST,
    'port': RABBITMQ_PORT,
    'password': RABBITMQ_PASSWORD,
  • Apply migrations on both services
  • Run consumer worker on replica service. Management command: python cqrs_consume -w 2


  • When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations within atomic transactions. CQRS sync will happen on transaction commit.
  • Please, avoid saving different instances of the same entity within transaction to reduce syncing and potential racing on replica side.
  • Updating of related model won't trigger CQRS automatic synchronization for master model. This needs to be done manually.
  • By default update_fields doesn't trigger CQRS logic, but it can be overridden for the whole application in settings:
settings.CQRS = {
    'master': {

or a special flag can be used in each place, where it's required to trigger CQRS flow:['name'], update_cqrs_fields=True)
  • When only needed instances need to be synchronized, there is a method is_sync_instance to set filtering rule. It's important to understand, that CQRS counting works even without syncing and rule is applied every time model is updated.


class FilteredSimplestModel(MasterMixin, models.Model):
    CQRS_ID = 'filter'

    name = models.CharField(max_length=200)

    def is_sync_instance(self):
        return len(str( > 2

Django Admin

Add action to synchronize master items from Django Admin page.

from django.db import models
from django.contrib import admin

from dj_cqrs.admin_mixins import CQRSAdminMasterSyncMixin

class AccountAdmin(CQRSAdminMasterSyncMixin, admin.ModelAdmin):
    ..., AccountAdmin)
  • If necessary, override _cqrs_sync_queryset from CQRSAdminMasterSyncMixin to adjust the QuerySet and use it for synchronization.


Bulk synchronizer without transport (usage example: it may be used for initial configuration). May be used at planned downtime.

  • On master service: python cqrs_bulk_dump --cqrs-id=author -> author.dump
  • On replica service: python cqrs_bulk_load -i=author.dump

Filter synchronizer over transport (usage example: sync some specific records to a given replica). Can be used dynamically.

  • To sync all replicas: python cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}
  • To sync all instances only with one replica: python cqrs_sync --cqrs-id=author -f={} -q=replica

Set of diff synchronization tools:

  • To get diff and synchronize master service with replica service in K8S:
kubectl exec -i MASTER_CONTAINER -- python cqrs_diff_master --cqrs-id=author | 
    kubectl exec -i REPLICA_CONTAINER -- python cqrs_diff_replica |
    kubectl exec -i MASTER_CONTAINER -- python cqrs_diff_sync
  • If it's important to check sync and clean up deleted objects within replica service in K8S:
kubectl exec -i REPLICA_CONTAINER -- python cqrs_deleted_diff_replica --cqrs-id=author | 
    kubectl exec -i MASTER_CONTAINER -- python cqrs_deleted_diff_master |
    kubectl exec -i REPLICA_CONTAINER -- python cqrs_deleted_sync_replica


  1. Python >= 3.8
  2. Install dependencies requirements/dev.txt
  3. We use isort library to order and format our imports, and black - to format the code. We check it using flake8-isort and flake8-black libraries (automatically on flake8 run).
    For convenience you may run isort . && black . to format the code.


Unit testing

  1. Python >= 3.8
  2. Install dependencies requirements/test.txt
  3. export PYTHONPATH=/your/path/to/django-cqrs/

Run tests with various RDBMS:

  • cd integration_tests
  • DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test
  • DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test

Check code style: flake8 Run tests: pytest

Tests reports are generated in tests/reports.

  • out.xml - JUnit test results
  • coverage.xml - Coverage xml results

To generate HTML coverage reports use: --cov-report html:tests/reports/cov_html

Integrational testing

  1. docker-compose
  2. cd integration_tests
  3. docker-compose run master

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_cqrs-2.7.3.tar.gz (40.1 kB view hashes)

Uploaded Source

Built Distribution

django_cqrs-2.7.3-py3-none-any.whl (54.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page