Remote access from one system to models and functions of another one using Celery machinery.

Relies on three outstanding python projects:

Main features

Client and server are designed to:

  • filter models with Django ORM lookups, Q-objects and excludes;
  • change model state (create, update, update or create, delete);
  • change model state in bulk mode (more than one object per request);
  • atomic get-set model state with bulk mode support;
  • call function;
  • client does not require Django;


pip install djangoceleryrpc

Basic Configuration

Default configuration of django-celery-rpc must be overridden in by CELERY_RPC_CONFIG. The CELERY_RPC_CONFIG is a dict which must contains at least two keys: BROKER_URL and CELERY_RESULT_BACKEND. Any Celery config params also permitted (see Configuration and defaults)

server span

# minimal required configuration
	'BROKER_URL': 'amqp://guest:guest@rabbitmq:5672//',
	'CELERY_RESULT_BACKEND': 'redis://redis:6379/0',

server eggs

# alternate request queue and routing key
	'BROKER_URL': 'amqp://guest:guest@rabbitmq:5672/',
	'CELERY_RESULT_BACKEND': 'amqp://guest:guest@rabbitmq:5672/',
	'CELERY_DEFAULT_QUEUE': 'celery_rpc.requests.alter_queue',
	'CELERY_DEFAULT_ROUTING_KEY': 'celery_rpc.alter_routing_key'


# this settings will be used in clients by default
	'BROKER_URL': 'amqp://guest:guest@rabbitmq:5672/',
	'CELERY_RESULT_BACKEND': 'redis://redis:6379/0',

# 'eggs' alternative configuration will be explicitly passed to the client constructor
	# BROKER_URL will be used by default from section above
	'CELERY_RESULT_BACKEND': 'amqp://guest:guest@rabbitmq:5672/',
	'CELERY_DEFAULT_QUEUE': 'celery_rpc.requests.alter_queue',
	'CELERY_DEFAULT_ROUTING_KEY': 'celery_rpc.alter_routing_key'


  2. different server must serve different request queues with different routing keys or must work with different exchanges*

from celery_rpc.client import Client
from django.conf import settings

# create client with default settings
span_client = Client()

# create client for `eggs` server
eggs_client = Client(CELERY_RPC_EGGS_CLIENT)

Using client

You can find more examples in tests.


Simple filtering example

span_client.filter('app.models:MyModel', kwargs=dict(filters={'a__exact':'a'}))

Filtering with Q object

from django.db.models import Q
span_client.filter('app.models:MyModel', kwargs=dict(filters_Q=(Q(a='1') | Q(b='1')))

Also, we can use both Q and lookups

span_client.filter('app.models:MyModel', kwargs=dict(filters={'c__exact':'c'}, filters_Q=(Q(a='1') | Q(b='1')))

Exclude supported

span_client.filter('app.models:MyModel', kwargs=dict(exclude={'c__exact':'c'}, exclude_Q=(Q(a='1') | Q(b='1')))

You can mix filters and exclude, Q-object with lookups. Try it yourself. ;)

Full list of available kwargs:

filters - dict of terms compatible with django lookup fields
offset - offset from which return a results
limit - max number of results
fields - list of serializer fields, which will be returned
exclude - lookups for excluding matched models
order_by - order of results (list, tuple or string),
    minus ('-') set reverse order, default = []
filters_Q - django Q-object for filtering models
exclude_Q - django Q-object for excluding matched models

List of all MyModel objects with high priority

span_client.filter('app.models:MyModel', high_priority=True)


Create one object

span_client.create('apps.models:MyModel', data={"a": "a"})

Bulk creating

span_client.create('apps.models:MyModel', data=[{"a": "a"}, {"a": "b"}])


Update one object by PK field name

span_client.update('apps.models:MyModel', data={"id": 1, "a": "a"})

Update one object by special alias 'pk' which matched automatically to PK field

span_client.update('apps.models:MyModel', data={"id": 1, "a": "a"})

Attention! Magic area! Update one object by any field you wish

span_client.update('apps.models:MyModel', data={"alternative_key_field": 42, "a": "a"}, 
				   {'identity': 'alternative_key_field'})

Update or create, Delete and so on

All cases are very similar. Try it you console!

Full list of supported model methods

  • filter - select models
  • create - create new models, raise exception if model exists
  • update - update existing models
  • update_or_create - update if exist or create new
  • delete - delete existing models
  • getset - set new state and return old state atomically

All method support options:

  • fields - shrink result fields
  • serializer_cls - fully qualified symbol name to DRF serializer class on server
  • identity - field name which will be used rather than PK field (mindless for filter)


It's possible to pipeline tasks, so they will be executed in one transaction.

p = span_client.pipe()
p = p.create('apps.models:MyModel', data={"a": "a"})
p = p.create('apps.models:MyAnotherModel', data={"b": "b"})

You can pass some arguments from previous task to the next.

Suppose you have those models on the server

class MyModel(models.Model):
    a = models.CharField()

class MyAnotherModel(models.Model):
    fk = models.ForeignKey(MyModel)
    b = models.CharField()

You need to create instance of MyModel and instance of MyAnotherModel which reffers to MyModel

p = span_client.pipe()
p = p.create('apps.models:MyModel', data={"a": "a"})
p = p.translate({"fk": "id"}, defaults={"b": "b"})
p = p.create('apps.models:MyAnotherModel')

In this example the translate task:

  • take result of the previous create task
  • extract value of "id" field from it
  • add this value to "defaults" by key "fk"

After that next create task takes result of translate as input data

Add/delete m2m relations

Lets take such models:

class MyModel(models.Model):
    str = models.CharField()

class MyManyToManyModel(models.Model):
    m2m = models.ManyToManyField(MyModel, null=True)

Add relation between existing objects

my_models = span_client.create('apps.models:MyModel', 
							   [{'str': 'monthy'}, {'str': 'python'}])
m2m_model = span_client.create('apps.models:MyManyToManyModel',
                               {'m2m': [my_models[0]['id']]})

# Will add 'python' to m2m_model.m2m where 'monty' already is
data = {'mymodel': my_models[1]['id'], 'mymanytomanymodel': m2m_model['id']}
through = span_client.create('apps.models:MyManyToManyModel.m2m.through', data)

And then delete some of existing relations

# Next `pipe` will eliminate all relations where `mymodel__str` equals 'monty'
p = span_client.pipe()
p = p.filter('apps.models:MyManyToManyModel.m2m.through', {'mymodel__str': 'monthy'})
p = p.delete('apps.models:MyManyToManyModel.m2m.through')

Run server instance

celery worker -A

Server with support task consuming prioritization

celery multi start 2 -A -Q:1 celery_rpc.requests.high_priority

Note, you must replace 'celery_rpc.request' with actual value of config param CELERY_DEFAULT_QUEUE

Command will start two instances. First instance will consume from high priority queue only. Second instance will serve both queues.

For daemonization see Running the worker as a daemon

Run tests

python django-celery-rpc/celery_rpc/runtests/

More Configuration

Overriding base task class

    'ModelTask': 'package.module.MyModelTask',
    'ModelChangeTask': 'package.module.MyModelChangeTask',
    'FunctionTask': 'package.module.MyFunctionTask'

Supported class names: ModelTask, ModelChangeTask, FunctionTask

Handling remote exceptions individually

# Both server and client

After enabling remote exception wrapping client will raise same errors happened on the server side. If client side has no error defined (i.e. no package installed), Client.RemoteError will be raised. Also, Client.RemoteError is a base for all exceptions on the client side.

For unknown exceptions this code is valid:

    result ="remote_func")
except rpc_client.errors.SomeUnknownError as e:
    # here a stub for remote SomeUnknownError is handled
    print (e.args)

For known exceptions both variants work:

    result ="remote_func")
except rpc_client.errors.MultipleObjectsReturned as e:
    # django.core.exceptions.MultipleObjectsReturned
except django.core.exceptions.ObjectDoesNotExist as e:
    # django.core.exceptions.ObjectDoesNotExist 

If original exception hierarchy is needed:

SomeBaseError = rpc_client.errors.SomeBaseError

DerivedError = rpc_client.errors.subclass(SomeBaseError, "DerivedError")


  • Set default non-generic model serializer.
  • Test support for RPC result backend from Celery.
  • Token auth and permissions support (like DRF).
  • Resource map and strict mode.
  • ...


Thanks for all who contributing to this project:

