Skip to main content

Django CQRS data synchronisation

Project description

Django CQRS

pyversions PyPi Status codecov Build Status PyPI status Quality Gate Status

django-cqrs is an Django application, that implements CQRS data synchronisation between several Django microservices.

CQRS

In Connect we have a rather complex Domain Model. There are many microservices, that are decomposed by subdomain and which follow database-per-service pattern. These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. Many of these services aggregate data from other ones and usually API Composition is totally enough. But, some services are working too slowly with API JOINS, so another pattern needs to be applied.

The pattern, that solves this issue is called CQRS - Command Query Responsibility Segregation. Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to Domain events published by the service that owns the data. Data is eventually consistent and that's okay for non-critical business transactions.

Examples

Integration

  • Setup RabbitMQ
  • Install django-cqrs
  • Apply changes to master service, according to RabbitMQ settings
# models.py

from django.db import models
from dj_cqrs.mixins import MasterMixin, RawMasterMixin


class Account(MasterMixin, models.Model):
    CQRS_ID = 'account'
    CQRS_PRODUCE = True  # set this to False to prevent sending instances to Transport


class Author(MasterMixin, models.Model):
    CQRS_ID = 'author'
    CQRS_SERIALIZER = 'app.api.AuthorSerializer'


# For cases of Diamond Multiinheritance the following approach could be used:
from mptt.models import MPTTModel
from dj_cqrs.metas import MasterMeta

class ComplexInheritanceModel(MPTTModel, RawMasterMixin):
    pass

MasterMeta.register(ComplexInheritanceModel)
# settings.py

CQRS = {
    'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
    'host': RABBITMQ_HOST,
    'port': RABBITMQ_PORT,
    'user': RABBITMQ_USERNAME,
    'password': RABBITMQ_PASSWORD,
}
  • Apply changes to replica service, according to RabbitMQ settings
from django.db import models
from dj_cqrs.mixins import ReplicaMixin


class AccountRef(ReplicaMixin, models.Model):
    CQRS_ID = 'account'

    id = models.IntegerField(primary_key=True)


class AuthorRef(ReplicaMixin, models.Model):
    CQRS_ID = 'author'
    CQRS_CUSTOM_SERIALIZATION = True

    @classmethod
    def cqrs_create(cls, sync, **mapped_data):
        # Override here
        pass

    def cqrs_update(self, sync, **mapped_data):
        # Override here
        pass
# settings.py

CQRS = {
    'transport': 'dj_cqrs.transport.RabbitMQTransport',
    'queue': 'account_replica',
    'host': RABBITMQ_HOST,
    'port': RABBITMQ_PORT,
    'user': RABBITMQ_USERNAME,
    'password': RABBITMQ_PASSWORD,
}
  • Apply migrations on both services
  • Run consumer worker on replica service. Management command: python manage.py cqrs_consume -w 2

Notes

When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations within atomic transactions. CQRS sync will happen on transaction commit. Please, avoid saving master model within transaction more then once to reduce syncing and potential racing on replica side. Updating of related model won't trigger CQRS automatic synchronization for master model. This needs to be done manually.

Example:

with transaction.atomic():
    publisher = models.Publisher.objects.create(id=1, name='publisher')
    author = models.Author.objects.create(id=1, name='author', publisher=publisher)

with transaction.atomic():
    publisher.name = 'new'
    publisher.save()

    author.save()

When only needed instances need to be synchronized, there is a method is_sync_instance to set filtering rule. It's important to understand, that CQRS counting works even without syncing and rule is applied every time model is updated.

Example:

class FilteredSimplestModel(MasterMixin, models.Model):
    CQRS_ID = 'filter'

    name = models.CharField(max_length=200)

    def is_sync_instance(self):
        return len(str(self.name)) > 2

Utilities

Bulk synchronizer without transport (usage example: it may be used for initial configuration). May be used at planned downtime.

  • On master service: python manage.py cqrs_bulk_dump --cqrs-id=author -> author.dump
  • On replica service: python manage.py cqrs_bulk_load -i=author.dump

Filter synchronizer over transport (usage example: sync some specific records to a given replica). Can be used dynamically.

  • To sync all replicas: python manage.py cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}
  • To sync all instances only with one replica: python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica

Set of diff synchronization tools ()

  • To get diff and synchronize master service with replica service in K8S:
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author | 
    kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica |
    kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync

Development

  1. Python 3.5+
  2. Install dependencies requirements/dev.txt

Testing

Unit testing

  1. Python 3.5+
  2. Install dependencies requirements/test.txt
  3. export PYTHONPATH=/your/path/to/django-cqrs/

Check code style: flake8 Run tests: pytest

Tests reports are generated in tests/reports.

  • out.xml - JUnit test results
  • coverage.xml - Coverage xml results

To generate HTML coverage reports use: --cov-report html:tests/reports/cov_html

Integrational testing

  1. docker-compose
  2. cd integration_tests
  3. docker-compose run master

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django-cqrs-enkonix-1.3.2.tar.gz (45.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_cqrs_enkonix-1.3.2-py3-none-any.whl (67.4 kB view details)

Uploaded Python 3

File details

Details for the file django-cqrs-enkonix-1.3.2.tar.gz.

File metadata

  • Download URL: django-cqrs-enkonix-1.3.2.tar.gz
  • Upload date:
  • Size: 45.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.8.3

File hashes

Hashes for django-cqrs-enkonix-1.3.2.tar.gz
Algorithm Hash digest
SHA256 2e7e690f07886ddfd8065618fa83179601436984d926fece5c968cef0ded6b76
MD5 fe42f599e99545df921b21971d94848a
BLAKE2b-256 3d4ea59905fec02764a373d0b8bd5864d01bc1e8ce759bfb7bcb60e32c20862b

See more details on using hashes here.

File details

Details for the file django_cqrs_enkonix-1.3.2-py3-none-any.whl.

File metadata

  • Download URL: django_cqrs_enkonix-1.3.2-py3-none-any.whl
  • Upload date:
  • Size: 67.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.8.3

File hashes

Hashes for django_cqrs_enkonix-1.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f01a3970f6a69a08520564d45ca048551f340b6c15a89b6c27dc398afc37a261
MD5 9073ee34d889a86d1466e31df2e05e63
BLAKE2b-256 3372547d61aa549bb4e53a4f69e58c4f0e7598ef9ee3188ec6b0191d80c59e69

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page