Django app for PostgreSQL LISTEN/NOTIFY with persistence and playback
Project description
django-pgwatch
A Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.
Table of Contents
- Quick Start
- Features
- Installation
- Basic Usage
- Database Change Notifications
- Management Commands
- Common Patterns
- Advanced Usage
- Admin Interface
- Architecture & Performance
- Testing
Quick Start
Get up and running in 3 steps:
-
Install and migrate:
# Add 'django_pgwatch' to INSTALLED_APPS python manage.py migrate django_pgwatch
-
Send a notification:
from django_pgwatch.utils import smart_notify smart_notify('my_channel', {'event': 'test', 'user_id': 123})
-
Create and run a consumer:
# myapp/consumers.py from django_pgwatch.consumer import BaseConsumer, NotificationHandler class MyConsumer(BaseConsumer): consumer_id = 'my_consumer' channels = ['my_channel'] def handle_notification(self, handler: NotificationHandler): print(f"Received: {handler.data}")
python manage.py pgwatch_listen
That's it! Your consumer will process all notifications and stay running for new ones.
Features
- Guaranteed Delivery: All notifications are persisted to a database table
- Playback Capability: Consumers can catch up on missed notifications after reconnecting
- Multiple Consumers: Track which consumers have processed each notification
- Large Payload Support: Automatically handles payloads larger than PostgreSQL's 8KB limit
- Gap Detection: Automatically detects and fills missed notifications during operation
- Consumer Management: Track consumer progress and handle consumer-specific processing
- Django Integration: Native Django models, admin interface, and management commands
Installation
-
Add
django_pgwatchto yourINSTALLED_APPS:INSTALLED_APPS = [ # ... other apps 'django_pgwatch', ]
-
Run migrations (automatically installs PostgreSQL functions):
python manage.py migrate django_pgwatch
Basic Usage
1. Sending Notifications
from django_pgwatch.utils import smart_notify
# Send a simple notification
notification_log_id = smart_notify('my_channel', {
'event_type': 'user_login',
'user_id': 123,
'timestamp': '2024-01-01T10:00:00Z'
})
# The notification is automatically persisted and sent via PostgreSQL NOTIFY
2. Creating a Consumer
Create a consumers.py file in your Django app:
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler
class MyConsumer(BaseConsumer):
# Class attributes for auto-discovery
consumer_id = 'my_consumer'
channels = ['my_channel', 'data_change']
def handle_notification(self, handler: NotificationHandler):
print(f"Received: {handler.data}")
# Access notification details
print(f"Notification Log ID: {handler.notification_log_id}")
print(f"Channel: {handler.channel}")
print(f"Is replay: {handler.is_replay}")
# For database change notifications
if handler.get_table() == 'users':
if handler.is_insert():
print(f"New user: {handler.get_new_data()}")
elif handler.is_update():
print(f"User updated: {handler.get_old_data()} -> {handler.get_new_data()}")
elif handler.is_delete():
print(f"User deleted: {handler.get_old_data()}")
3. Running Consumers
The management command automatically discovers all consumers from your INSTALLED_APPS:
# Run all discovered consumers
python manage.py pgwatch_listen
# List all discoverable consumers
python manage.py pgwatch_listen --list-consumers
# Run consumers from specific apps only
python manage.py pgwatch_listen --apps myapp otherapp
# Run specific consumers by ID
python manage.py pgwatch_listen --consumers my_consumer webhook_sender
# Exclude specific consumers
python manage.py pgwatch_listen --exclude-consumers heavy_processor
Database Change Notifications
Setting up Triggers
The app provides a trigger function that automatically sends notifications for database changes.
Recommended Approach: Django Migrations (Preferred)
The best way to create database triggers is using Django migrations with RunSQL:
# In your app's migration file (e.g., migrations/0002_create_triggers.py)
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('your_app', '0001_initial'),
# Ensure notify_data_change() function exists
('django_pgwatch', '0002_install_pg_functions'),
]
operations = [
migrations.RunSQL(
# Forward migration - create trigger
sql="""
CREATE TRIGGER notify_users_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW
EXECUTE FUNCTION notify_data_change();
""",
# Reverse migration - drop trigger
reverse_sql="""
DROP TRIGGER IF EXISTS notify_users_changes ON users;
"""
),
]
Benefits of using migrations:
- ✅ Automatic deployment with
python manage.py migrate - ✅ Version controlled and reversible
- ✅ Consistent across environments
- ✅ No manual intervention required
Alternative Approaches
Direct SQL:
-- Create a trigger on any table
CREATE TRIGGER notify_users_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_data_change();
Python helper function:
from django_pgwatch.examples import create_trigger_for_table
# Create trigger for the users table
create_trigger_for_table('users')
create_trigger_for_table('orders', 'order_events') # Custom channel
Note: The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.
Consuming Database Changes
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler
class DatabaseChangeConsumer(BaseConsumer):
consumer_id = 'database_changes'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
table = handler.get_table()
action = handler.get_action() # INSERT, UPDATE, DELETE
if table == 'users' and action == 'INSERT':
user_data = handler.get_new_data()
self.send_welcome_email(user_data)
elif table == 'orders' and action == 'UPDATE':
old_data = handler.get_old_data()
new_data = handler.get_new_data()
if old_data['status'] != new_data['status']:
self.send_status_update(new_data)
Common Patterns
Cache Invalidation
class CacheInvalidationConsumer(BaseConsumer):
consumer_id = 'cache_invalidator'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
if handler.is_database_change():
cache_key = f"{handler.get_table()}:{handler.get_record_id()}"
cache.delete(cache_key)
Webhook Integration
class WebhookConsumer(BaseConsumer):
consumer_id = 'webhook_sender'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
requests.post(settings.WEBHOOK_URL, json=handler.data)
Analytics Tracking
class AnalyticsConsumer(BaseConsumer):
consumer_id = 'analytics_processor'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
analytics.track(handler.get_record_id(), handler.get_action(), handler.data)
Management Commands
pgwatch_listen
Basic usage:
# Run all discovered consumers
python manage.py pgwatch_listen
# List available consumers
python manage.py pgwatch_listen --list-consumers
Common filtering options:
# Run specific consumers
python manage.py pgwatch_listen --consumers webhook_sender cache_invalidator
# Run consumers from specific apps
python manage.py pgwatch_listen --apps myapp otherapp
# Exclude heavy processors
python manage.py pgwatch_listen --exclude-consumers analytics_processor
Advanced options:
--timeout=30: Listening timeout in seconds--max-batch-size=100: Playback batch size--skip-playback: Skip missed notifications, only process new ones--reconnect-delay=5: Delay before reconnecting after error
pgwatch_cleanup
# Clean up old notifications (keep 7 days)
python manage.py pgwatch_cleanup --days=7
# Preview what will be deleted
python manage.py pgwatch_cleanup --days=7 --dry-run
Deployment Patterns
Single process (default):
python manage.py pgwatch_listen # All consumers in one process
Parallel processes:
# Split heavy consumers into separate processes
python manage.py pgwatch_listen --consumers cache_invalidator &
python manage.py pgwatch_listen --consumers webhook_sender &
Load balancing:
# Create multiple worker consumers
class Worker1Consumer(BaseConsumer):
consumer_id = 'worker_1'
channels = ['work_queue']
class Worker2Consumer(BaseConsumer):
consumer_id = 'worker_2'
channels = ['work_queue']
Advanced Usage
Error Handling
class RobustConsumer(BaseConsumer):
consumer_id = 'robust_consumer'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
try:
self.process_notification(handler)
except RetryableError as e:
logger.error(f"Retryable error: {e}")
raise # Will retry
except PermanentError as e:
logger.error(f"Permanent error: {e}")
# Don't re-raise - marks as processed
Filtering Notifications
class FilteredConsumer(BaseConsumer):
consumer_id = 'filtered_consumer'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
# Filter by table
if handler.get_table() not in ['users', 'orders']:
return
# Skip replayed notifications
if handler.is_replay:
return
self.process_notification(handler)
Custom Notifications
from django_pgwatch.examples import send_custom_notification
send_custom_notification('user_events', 'password_reset', {
'user_id': 123,
'ip_address': '192.168.1.1',
'requested_at': '2024-01-01T10:00:00Z'
})
Admin Interface
The app provides a Django admin interface for monitoring notifications:
- View all notification logs
- See which consumers have processed each notification
- Clean up old notifications
- Reprocess notifications (clear consumer tracking)
- View summary statistics
Access at /admin/django_pgwatch/notificationlog/
Database Functions
smart_notify(channel_name, payload_data) - Persists and sends notifications:
SELECT smart_notify('my_channel', '{"event": "test"}'::jsonb);
notify_data_change() - Trigger function for database changes:
CREATE TRIGGER my_table_changes
AFTER INSERT OR UPDATE OR DELETE ON my_table
FOR EACH ROW EXECUTE FUNCTION notify_data_change();
Architecture & Performance
How It Works
- Notification Storage: All notifications stored in
notification_logtable - Consumer Tracking: Each consumer tracks processed notifications
- Playback: On startup, process missed notifications
- Real-time: Listen for new notifications via PostgreSQL LISTEN/NOTIFY
- Gap Detection: Automatic detection of missed notifications
Performance Features
- Batch Processing: Configurable batch sizes for playback
- Parallel Processing: Multiple consumers process simultaneously
- Database Optimization: Indexes on channel and timestamps
- Large Payloads: Automatic handling of payloads >8KB
- Cleanup: Configurable retention periods
Monitoring
- Django admin interface for notification tracking
- Consumer progress and error monitoring
- Database table size monitoring
- Alerting for unprocessed notifications
Testing
TODO: Test Coverage Needed
The following areas need comprehensive test coverage:
Auto-Discovery Tests:
- Test consumer discovery from multiple apps
- Test filtering by app names (
--apps) - Test filtering by consumer IDs (
--consumers) - Test excluding consumers (
--exclude-consumers) - Test error handling for non-existent apps/consumers
- Test
--list-consumersoutput formatting
Consumer Base Class Tests:
- Test BaseConsumer class attribute inheritance
- Test constructor parameter override of class attributes
- Test validation error when no consumer_id provided
- Test channel configuration precedence
Database Integration Tests:
- Test trigger creation and notification sending
- Test consumer playback of missed notifications
- Test real-time notification processing
- Test consumer restart and gap detection
- Test large payload handling
Error Handling Tests:
- Test consumer exception handling (retryable vs permanent)
- Test database connection failures and reconnection
- Test malformed notification payloads
- Test consumer shutdown and cleanup
Management Command Tests:
- Test command argument parsing and validation
- Test signal handling for graceful shutdown
- Test consumer process lifecycle management
- Test output formatting and logging
Multi-Consumer Tests:
- Test multiple consumers processing same notifications
- Test consumer isolation (one failure doesn't affect others)
- Test consumer-specific progress tracking
- Test parallel vs sequential execution modes
Contributing
This is an internal Avela Education tool. For issues or feature requests, contact the development team.
License
Internal use only - Avela Education
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_pgwatch-1.0.0.tar.gz.
File metadata
- Download URL: django_pgwatch-1.0.0.tar.gz
- Upload date:
- Size: 30.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c20b1f66993c67529ac5ff9a4d193808bf6176fc728b9e0f48f0d9132ac79181
|
|
| MD5 |
356ca8a2b3fe9989e92e0d0d40290f38
|
|
| BLAKE2b-256 |
1b7ffe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340
|
File details
Details for the file django_pgwatch-1.0.0-py3-none-any.whl.
File metadata
- Download URL: django_pgwatch-1.0.0-py3-none-any.whl
- Upload date:
- Size: 25.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed4b23f75efd7a8dfc1b12de8a4898334d1fb20552e16031657ec7be3e11a8b3
|
|
| MD5 |
2a674dd90a82427fe9046150ced3c5db
|
|
| BLAKE2b-256 |
83bdc081f8425fb1bfd6956dc78fb3b51cbdf930bb24647b789bb0e1f93d63da
|