Skip to main content

Django app for PostgreSQL LISTEN/NOTIFY with persistence and playback

Project description

django-pgwatch

A Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.

Table of Contents

Quick Start

Get up and running in 3 steps:

  1. Install and migrate:

    # Add 'django_pgwatch' to INSTALLED_APPS
    python manage.py migrate django_pgwatch
    
  2. Send a notification:

    from django_pgwatch.utils import smart_notify
    smart_notify('my_channel', {'event': 'test', 'user_id': 123})
    
  3. Create and run a consumer:

    # myapp/consumers.py
    from django_pgwatch.consumer import BaseConsumer, NotificationHandler
    
    class MyConsumer(BaseConsumer):
        consumer_id = 'my_consumer'
        channels = ['my_channel']
        
        def handle_notification(self, handler: NotificationHandler):
            print(f"Received: {handler.data}")
    
    python manage.py pgwatch_listen
    

That's it! Your consumer will process all notifications and stay running for new ones.

Features

  • Guaranteed Delivery: All notifications are persisted to a database table
  • Playback Capability: Consumers can catch up on missed notifications after reconnecting
  • Multiple Consumers: Track which consumers have processed each notification
  • Large Payload Support: Automatically handles payloads larger than PostgreSQL's 8KB limit
  • Gap Detection: Automatically detects and fills missed notifications during operation
  • Consumer Management: Track consumer progress and handle consumer-specific processing
  • Django Integration: Native Django models, admin interface, and management commands

Installation

  1. Add django_pgwatch to your INSTALLED_APPS:

    INSTALLED_APPS = [
        # ... other apps
        'django_pgwatch',
    ]
    
  2. Run migrations (automatically installs PostgreSQL functions):

    python manage.py migrate django_pgwatch
    

Basic Usage

1. Sending Notifications

from django_pgwatch.utils import smart_notify

# Send a simple notification
notification_log_id = smart_notify('my_channel', {
    'event_type': 'user_login',
    'user_id': 123,
    'timestamp': '2024-01-01T10:00:00Z'
})

# The notification is automatically persisted and sent via PostgreSQL NOTIFY

2. Creating a Consumer

Create a consumers.py file in your Django app:

# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler

class MyConsumer(BaseConsumer):
    # Class attributes for auto-discovery
    consumer_id = 'my_consumer'
    channels = ['my_channel', 'data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        print(f"Received: {handler.data}")
        
        # Access notification details
        print(f"Notification Log ID: {handler.notification_log_id}")
        print(f"Channel: {handler.channel}")
        print(f"Is replay: {handler.is_replay}")
        
        # For database change notifications
        if handler.get_table() == 'users':
            if handler.is_insert():
                print(f"New user: {handler.get_new_data()}")
            elif handler.is_update():
                print(f"User updated: {handler.get_old_data()} -> {handler.get_new_data()}")
            elif handler.is_delete():
                print(f"User deleted: {handler.get_old_data()}")

3. Running Consumers

The management command automatically discovers all consumers from your INSTALLED_APPS:

# Run all discovered consumers
python manage.py pgwatch_listen

# List all discoverable consumers
python manage.py pgwatch_listen --list-consumers

# Run consumers from specific apps only
python manage.py pgwatch_listen --apps myapp otherapp

# Run specific consumers by ID
python manage.py pgwatch_listen --consumers my_consumer webhook_sender

# Exclude specific consumers
python manage.py pgwatch_listen --exclude-consumers heavy_processor

Database Change Notifications

Setting up Triggers

The app provides a trigger function that automatically sends notifications for database changes.

Recommended Approach: Django Migrations (Preferred)

The best way to create database triggers is using Django migrations with RunSQL:

# In your app's migration file (e.g., migrations/0002_create_triggers.py)
from django.db import migrations

class Migration(migrations.Migration):
    dependencies = [
        ('your_app', '0001_initial'),
        # Ensure notify_data_change() function exists
        ('django_pgwatch', '0002_install_pg_functions'),
    ]

    operations = [
        migrations.RunSQL(
            # Forward migration - create trigger
            sql="""
            CREATE TRIGGER notify_users_changes
                AFTER INSERT OR UPDATE OR DELETE ON users
                FOR EACH ROW 
                EXECUTE FUNCTION notify_data_change();
            """,
            # Reverse migration - drop trigger
            reverse_sql="""
            DROP TRIGGER IF EXISTS notify_users_changes ON users;
            """
        ),
    ]

Benefits of using migrations:

  • ✅ Automatic deployment with python manage.py migrate
  • ✅ Version controlled and reversible
  • ✅ Consistent across environments
  • ✅ No manual intervention required

Alternative Approaches

Direct SQL:

-- Create a trigger on any table
CREATE TRIGGER notify_users_changes
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW EXECUTE FUNCTION notify_data_change();

Python helper function:

from django_pgwatch.examples import create_trigger_for_table

# Create trigger for the users table
create_trigger_for_table('users')
create_trigger_for_table('orders', 'order_events')  # Custom channel

Note: The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.

Consuming Database Changes

# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler

class DatabaseChangeConsumer(BaseConsumer):
    consumer_id = 'database_changes'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        table = handler.get_table()
        action = handler.get_action()  # INSERT, UPDATE, DELETE
        
        if table == 'users' and action == 'INSERT':
            user_data = handler.get_new_data()
            self.send_welcome_email(user_data)
        elif table == 'orders' and action == 'UPDATE':
            old_data = handler.get_old_data()
            new_data = handler.get_new_data()
            
            if old_data['status'] != new_data['status']:
                self.send_status_update(new_data)

Common Patterns

Cache Invalidation

class CacheInvalidationConsumer(BaseConsumer):
    consumer_id = 'cache_invalidator'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        if handler.is_database_change():
            cache_key = f"{handler.get_table()}:{handler.get_record_id()}"
            cache.delete(cache_key)

Webhook Integration

class WebhookConsumer(BaseConsumer):
    consumer_id = 'webhook_sender'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        requests.post(settings.WEBHOOK_URL, json=handler.data)

Analytics Tracking

class AnalyticsConsumer(BaseConsumer):
    consumer_id = 'analytics_processor'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        analytics.track(handler.get_record_id(), handler.get_action(), handler.data)

Management Commands

pgwatch_listen

Basic usage:

# Run all discovered consumers
python manage.py pgwatch_listen

# List available consumers
python manage.py pgwatch_listen --list-consumers

Common filtering options:

# Run specific consumers
python manage.py pgwatch_listen --consumers webhook_sender cache_invalidator

# Run consumers from specific apps
python manage.py pgwatch_listen --apps myapp otherapp

# Exclude heavy processors
python manage.py pgwatch_listen --exclude-consumers analytics_processor

Advanced options:

  • --timeout=30: Listening timeout in seconds
  • --max-batch-size=100: Playback batch size
  • --skip-playback: Skip missed notifications, only process new ones
  • --reconnect-delay=5: Delay before reconnecting after error

pgwatch_cleanup

# Clean up old notifications (keep 7 days)
python manage.py pgwatch_cleanup --days=7

# Preview what will be deleted
python manage.py pgwatch_cleanup --days=7 --dry-run

Deployment Patterns

Single process (default):

python manage.py pgwatch_listen  # All consumers in one process

Parallel processes:

# Split heavy consumers into separate processes
python manage.py pgwatch_listen --consumers cache_invalidator &
python manage.py pgwatch_listen --consumers webhook_sender &

Load balancing:

# Create multiple worker consumers
class Worker1Consumer(BaseConsumer):
    consumer_id = 'worker_1'
    channels = ['work_queue']
    
class Worker2Consumer(BaseConsumer):
    consumer_id = 'worker_2'
    channels = ['work_queue']

Advanced Usage

Error Handling

class RobustConsumer(BaseConsumer):
    consumer_id = 'robust_consumer'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        try:
            self.process_notification(handler)
        except RetryableError as e:
            logger.error(f"Retryable error: {e}")
            raise  # Will retry
        except PermanentError as e:
            logger.error(f"Permanent error: {e}")
            # Don't re-raise - marks as processed

Filtering Notifications

class FilteredConsumer(BaseConsumer):
    consumer_id = 'filtered_consumer'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        # Filter by table
        if handler.get_table() not in ['users', 'orders']:
            return
        
        # Skip replayed notifications
        if handler.is_replay:
            return
            
        self.process_notification(handler)

Custom Notifications

from django_pgwatch.examples import send_custom_notification

send_custom_notification('user_events', 'password_reset', {
    'user_id': 123,
    'ip_address': '192.168.1.1',
    'requested_at': '2024-01-01T10:00:00Z'
})

Admin Interface

The app provides a Django admin interface for monitoring notifications:

  • View all notification logs
  • See which consumers have processed each notification
  • Clean up old notifications
  • Reprocess notifications (clear consumer tracking)
  • View summary statistics

Access at /admin/django_pgwatch/notificationlog/

Database Functions

smart_notify(channel_name, payload_data) - Persists and sends notifications:

SELECT smart_notify('my_channel', '{"event": "test"}'::jsonb);

notify_data_change() - Trigger function for database changes:

CREATE TRIGGER my_table_changes
    AFTER INSERT OR UPDATE OR DELETE ON my_table
    FOR EACH ROW EXECUTE FUNCTION notify_data_change();

Architecture & Performance

How It Works

  1. Notification Storage: All notifications stored in notification_log table
  2. Consumer Tracking: Each consumer tracks processed notifications
  3. Playback: On startup, process missed notifications
  4. Real-time: Listen for new notifications via PostgreSQL LISTEN/NOTIFY
  5. Gap Detection: Automatic detection of missed notifications

Performance Features

  • Batch Processing: Configurable batch sizes for playback
  • Parallel Processing: Multiple consumers process simultaneously
  • Database Optimization: Indexes on channel and timestamps
  • Large Payloads: Automatic handling of payloads >8KB
  • Cleanup: Configurable retention periods

Monitoring

  • Django admin interface for notification tracking
  • Consumer progress and error monitoring
  • Database table size monitoring
  • Alerting for unprocessed notifications

Testing

TODO: Test Coverage Needed

The following areas need comprehensive test coverage:

Auto-Discovery Tests:

  • Test consumer discovery from multiple apps
  • Test filtering by app names (--apps)
  • Test filtering by consumer IDs (--consumers)
  • Test excluding consumers (--exclude-consumers)
  • Test error handling for non-existent apps/consumers
  • Test --list-consumers output formatting

Consumer Base Class Tests:

  • Test BaseConsumer class attribute inheritance
  • Test constructor parameter override of class attributes
  • Test validation error when no consumer_id provided
  • Test channel configuration precedence

Database Integration Tests:

  • Test trigger creation and notification sending
  • Test consumer playback of missed notifications
  • Test real-time notification processing
  • Test consumer restart and gap detection
  • Test large payload handling

Error Handling Tests:

  • Test consumer exception handling (retryable vs permanent)
  • Test database connection failures and reconnection
  • Test malformed notification payloads
  • Test consumer shutdown and cleanup

Management Command Tests:

  • Test command argument parsing and validation
  • Test signal handling for graceful shutdown
  • Test consumer process lifecycle management
  • Test output formatting and logging

Multi-Consumer Tests:

  • Test multiple consumers processing same notifications
  • Test consumer isolation (one failure doesn't affect others)
  • Test consumer-specific progress tracking
  • Test parallel vs sequential execution modes

Contributing

This is an internal Avela Education tool. For issues or feature requests, contact the development team.

License

Internal use only - Avela Education

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_pgwatch-1.0.0.tar.gz (30.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_pgwatch-1.0.0-py3-none-any.whl (25.4 kB view details)

Uploaded Python 3

File details

Details for the file django_pgwatch-1.0.0.tar.gz.

File metadata

  • Download URL: django_pgwatch-1.0.0.tar.gz
  • Upload date:
  • Size: 30.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.0

File hashes

Hashes for django_pgwatch-1.0.0.tar.gz
Algorithm Hash digest
SHA256 c20b1f66993c67529ac5ff9a4d193808bf6176fc728b9e0f48f0d9132ac79181
MD5 356ca8a2b3fe9989e92e0d0d40290f38
BLAKE2b-256 1b7ffe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340

See more details on using hashes here.

File details

Details for the file django_pgwatch-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: django_pgwatch-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 25.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.0

File hashes

Hashes for django_pgwatch-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ed4b23f75efd7a8dfc1b12de8a4898334d1fb20552e16031657ec7be3e11a8b3
MD5 2a674dd90a82427fe9046150ced3c5db
BLAKE2b-256 83bdc081f8425fb1bfd6956dc78fb3b51cbdf930bb24647b789bb0e1f93d63da

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page