Skip to main content

Celery integration for django-tenant-schemas and django-tenants

Project description

tenant-schemas-celery

Celery application implementation that allows celery tasks to cooperate with multi-tenancy provided by django-tenant-schemas and django-tenants packages.

This project might not seem frequently updated, but it just has all the functionality needed. Issues and questions are answered quickly.

Installation

   $ pip install tenant-schemas-celery
   $ pip install django-tenants

Usage

  • Define a celery app using given CeleryApp class.
   import os
   os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

   from django.conf import settings

   from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp

   app = TenantAwareCeleryApp()
   app.config_from_object('django.conf:settings')
   app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

This assumes a fresh Celery 5.2.0 application. For previous versions, the key is to create a new CeleryApp instance that will be used to access task decorator from.

  • Replace your @task decorator with @app.task
   from celery import shared_task
   from django.db import connection
   from myproject.celery import app
   from tenant_schemas_celery.task import TenantTask

   @app.task
   def my_task():
      print(connection.schema_name)

   @shared_task(base=TenantTask, bind=True)
   def my_shared_task():
      print("foo")
  • Run celery worker (myproject.celery is where you've defined the app variable)
    $ celery worker -A myproject.celery
  • Post registered task. The schema name will get automatically added to the task's arguments.
   from myproject.tasks import my_task
   my_task.delay()

The TenantTask class transparently inserts current connection's schema into the task's kwargs. The schema name is then popped from the task's kwargs in task_prerun signal handler, and the connection's schema is changed accordingly.

Multiple databases support

New in 2.0.0.

Inside your celery tasks you might be working with multiple databases. You might want to change the schema for all of the connections, or just a subset of them.

You can now use the CELERY_TASK_TENANT_CACHE_SECONDS django setting, or TASK_TENANT_CACHE_SECONDS celery setting, or the tenant_databases attribute of the TenantTask to a list of database names (the key in the settings.DATABASES dictionary).

If not set, the settings defaults to ["default"].

Tenant objects cache

New in 0.3.0.

Every time a celery task is executed, the tenant object of the connection object is being refetched. For some use cases, this can introduce significant performance hit.

In such scenarios, you can pass tenant_cache_seconds argument to the @app.task() decorator. This will cause the tenant objects to be cached for given period of time. 0 turns this off. You can also enable cache globally by setting celery's TASK_TENANT_CACHE_SECONDS (app-specific, usually it's CELERY_TASK_TENANT_CACHE_SECONDS).

@app.task(tenant_cache_seconds=30)
def some_task():
    ...

Celery beat integration

In order to run celery beat tasks in a multi-tenant environment, you've got two options:

  • Use a dispatching task that will send a task for each tenant
  • Use a custom scheduler

i.e: Let's say that you would like to run a reset_remaining_jobs tasks periodically, for every tenant that you have.

Dispatcher task pattern

You can schedule one dispatcher task that will iterate over all schemas and send that task within the schema's context:

from django_tenants.utils import get_tenant_model, tenant_context
from django_tenant_schemas.utils import get_tenant_model, tenant_context

@app.task
def reset_remaining_jobs_in_all_schemas():
    for tenant in get_tenant_model().objects.exclude(schema_name='public'):
        with tenant_context(tenant):
            reset_remaining_jobs_in_schema.delay()

@app.task
def reset_remaining_jobs_in_schema():
    <do some logic>

The reset_remaining_jobs_in_all_schemas task (called the dispatch task) should be registered in your celery beat schedule. The reset_remaining_jobs_in_schema task should be called from the dispatch task.

That way you have full control over which schemas the task should be scheduled in.

Custom scheduler

If you are using the standard Scheduler or PersistentScheduler classes provided by celery, you can transition to using this package's TenantAwareScheduler or TenantAwarePersistentScheduler classes. You should then specify the scheduler you want to use in the celery beat config or your invocation to beat. i.e:

celery -A proj beat --scheduler=tenant_schemas_celery.scheduler.TenantAwareScheduler

Caveats

  • There's a chance that celery beat will try to run a task for a newly created tenant before its migrations are ready, which could potentially lead to deadlocks. This is specially true with big projects with a lot of migrations and very frequent tasks (i.e: every minute). In order to mitigate it, one could do the following:

    1. Subclass any of TenantAwareScheduler or TenantAwarePersistentScheduler and override the get_queryset method to match your definition of "ready" tenants. For example, imagine that you had a ready flag in your tenant model. You could do the following:
    # tenants_app/scheduler.py
    class MyTenantAwareScheduler(TenantAwareScheduler):
        @classmethod
        def get_queryset(cls):
            return super().get_queryset().filter(ready=True)
    
    1. Use the new scheduler in your celery beat config or invocation:
    celery -A proj beat --scheduler=tenants_app.scheduler.MyTenantAwareScheduler
    
  • TenantAwareSchedulerMixin uses a subclass of SchedulerEntry that allows the user to provide specific schemas to run a task on. This might prove useful if you have a task you only want to run in the public schema or to a subset of your tenants. In order to use set that, you must configure tenant_schemas in the tasks definition as such:

app.conf.beat_schedule = {
    "my-task": {
        "task": "myapp.tasks.my_task",
        "schedule": schedules.crontab(minute="*/5"),
        "tenant_schemas": ["public"]
    }
}

Compatibility changes

The >=2.1 series drop support for tenant-schemas. It hasn't been maintainted for a long time.

The 2.x series support Python>=3.7.

The 1.x series support Python>=3.6. Python 3.6 reached EOL 2021-12.

The 0.x series are the last one to support Python<3.6.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tenant_schemas_celery-3.0.0.tar.gz (16.0 kB view details)

Uploaded Source

Built Distribution

tenant_schemas_celery-3.0.0-py3-none-any.whl (18.1 kB view details)

Uploaded Python 3

File details

Details for the file tenant_schemas_celery-3.0.0.tar.gz.

File metadata

  • Download URL: tenant_schemas_celery-3.0.0.tar.gz
  • Upload date:
  • Size: 16.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.12

File hashes

Hashes for tenant_schemas_celery-3.0.0.tar.gz
Algorithm Hash digest
SHA256 6be3ae1a5826f262f0f3dd343c6a85a34a1c59b89e04ae37de018f36562fed55
MD5 5d2a854392b6a413a6965b25bf077204
BLAKE2b-256 d0fecfe19eb7cc3ad8e39d7df7b7c44414bf665b6ac6660c998eb498f89d16c6

See more details on using hashes here.

File details

Details for the file tenant_schemas_celery-3.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for tenant_schemas_celery-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ca0f69e78ef698eb4813468231df5a0ab6a660c08e657b65f5ac92e16887eec8
MD5 3d788c51638989bb8ed63a7e6c65864a
BLAKE2b-256 db2c376e1e641ad08b374c75d896468a7be2e6906ce3621fd0c9f9dc09ff1963

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page