Skip to main content

Django NATS Consumer

Project description

django-nats-consumer

NATS + Django = โšก๏ธ

A powerful Django integration for NATS JetStream with decorator-based message handlers, support for both Push and Pull consumers, native NATS retry mechanisms, and flexible wildcard subject routing.

Features

  • ๐ŸŽฏ Decorator-Based Handlers: Use @handle decorator for explicit, clean handler registration
  • ๐ŸŒŸ Wildcard Support: Full support for * and > wildcards in subject patterns
  • ๐Ÿš€ Push & Pull Consumers: Support for both JetStream consumer types
  • ๐Ÿ”„ Native NATS Retry: Built-in NATS retry with exponential backoff
  • ๐ŸŽ›๏ธ Smart Routing: Automatic message routing with exact and wildcard matching
  • ๐Ÿ›ก๏ธ Error Handling: Configurable error acknowledgment behaviors
  • ๐Ÿ“Š Monitoring: Built-in success/error counters and logging
  • โšก Performance: Optional uvloop support for better performance
  • ๐Ÿ”ง Django Integration: Seamless integration with Django management commands
  • ๐Ÿงช Well Tested: Comprehensive test suite with 52 passing tests
  • ๐Ÿ”’ Production Ready: Used in production environments with robust error handling

What's New

Version 2.0 - Decorator-Based Handlers

  • โœ… @handle decorator for explicit handler registration
  • โœ… Wildcard patterns (* and >) fully supported
  • โœ… Multiple subjects per handler - one method can handle multiple subjects
  • โœ… No naming conventions - name your methods however you want
  • โœ… Native NATS retry with exponential backoff
  • โœ… Cleaner API - no more __init__ with subjects list

Installation

This library is in Beta status with comprehensive test coverage and production-ready features. The API is stable but may evolve based on community feedback.

# Install from PyPI
pip install oxnats

Optional Performance Enhancement

# For better performance on Unix-like systems
pip install oxnats[uvloop]

Usage

settings.py

INSTALLED_APPS = [
    ...
    "nats_consumer",
    ...
]

NATS_CONSUMER = {
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 5,
        "reconnect_time_wait": 1,
        "connect_timeout": 10,
    },
}

Subject Naming Convention (IMPORTANT)

โš ๏ธ STRONGLY RECOMMENDED: Use dot notation for all subjects

NATS subjects should follow a hierarchical dot notation pattern:

# โœ… RECOMMENDED: Dot notation (hierarchical)
'orders.created'
'orders.updated'
'users.profile.updated'
'payments.completed'
'notifications.email.sent'

# โŒ NOT RECOMMENDED: Other separators
'orders-created'      # Hyphen separator
'orders_created'      # Underscore separator
'orderscreated'       # No separator

Why dot notation?

  • โœ… Standard NATS convention - Industry best practice
  • โœ… Wildcard support - Works seamlessly with * and > wildcards
  • โœ… Hierarchical clarity - Clear domain.entity.action structure
  • โœ… Better routing - Easier to filter and route messages
  • โœ… Consistency - Matches NATS ecosystem patterns

Examples of good subject hierarchies:

# E-commerce domain
'orders.created'
'orders.updated'
'orders.payment.completed'
'orders.shipment.dispatched'

# User management
'users.registered'
'users.profile.updated'
'users.password.reset'

# Notifications
'notifications.email.sent'
'notifications.sms.sent'
'notifications.push.delivered'

Quick Start

1. Define Your Handler with @handle Decorator

# {app_name}/consumers.py
from nats_consumer import ConsumerHandler, handle
import logging
import json

logger = logging.getLogger(__name__)

class OrderHandler(ConsumerHandler):
    """Handler using @handle decorator for explicit registration"""
    
    @handle('orders.created')
    async def on_order_created(self, message):
        """Handle new order creation"""
        data = json.loads(message.data.decode())
        logger.info(f"New order created: {data}")
    
    @handle('orders.updated', 'orders.modified')
    async def on_order_changed(self, message):
        """Handle order updates - multiple subjects, one handler!"""
        data = json.loads(message.data.decode())
        logger.info(f"Order changed: {data}")
    
    @handle('orders.cancelled')
    async def on_order_cancelled(self, message):
        """Handle order cancellation"""
        data = json.loads(message.data.decode())
        logger.info(f"Order cancelled: {data}")
    
    @handle('orders.*')  # Wildcard support!
    async def on_any_order_event(self, message):
        """Catch-all for any order event"""
        logger.debug(f"Order event: {message.subject}")
    
    async def fallback_handle(self, msg, reason="unknown"):
        """Custom fallback for unhandled messages"""
        logger.error(f"Unhandled: {msg.subject} (reason: {reason})")
        await msg.nak()  # NAK for redelivery (default behavior)

### 2. Create Your Consumer

```python
from nats_consumer import JetstreamPushConsumer, operations, ErrorAckBehavior
from nats_consumer.operations import api

class OrderConsumer(JetstreamPushConsumer):
    stream_name = "orders"
    subjects = ["orders.created", "orders.updated", "orders.cancelled"]
    
    # Native NATS retry configuration
    max_deliver = 3  # Max delivery attempts
    ack_wait = 30  # Seconds to wait for ACK
    backoff_delays = [1.0, 5.0, 10.0]  # Exponential backoff delays
    
    # Error handling behavior after max retries
    handle_error_ack_behavior = ErrorAckBehavior.NAK

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.handler = OrderHandler()

    async def setup(self):
        """Setup stream before starting consumer"""
        return [
            # โœ… PRODUCTION: Use CreateOrUpdateStream for flexibility
            # Automatically updates stream config when it changes
            operations.CreateOrUpdateStream(
                name=self.stream_name,
                subjects=self.subjects,
                retention=api.RetentionPolicy.LIMITS,
                max_age=3600,  # 1 hour retention
            ),
        ]

    async def handle_message(self, message):
        """Route message to handler"""
        await self.handler.handle(message)
    
    async def handle_error(self, message, error, delivery_count):
        """Called when max_deliver is reached"""
        logger.error(f"Max retries reached after {delivery_count} attempts: {error}")
        # Send to DLQ, alert monitoring, etc.

3. Run Your Consumer

# Setup stream and start consumer
python manage.py nats_consumer OrderConsumer --setup

# Development mode with auto-reload
python manage.py nats_consumer OrderConsumer --reload

๐Ÿ’ก Production Best Practice: Stream Setup

Use CreateOrUpdateStream instead of CreateStream in production:

# โš ๏ธ CreateStream - Only creates, doesn't update
async def setup(self):
    return [
        operations.CreateStream(
            name=self.stream_name,
            subjects=self.subjects,
        ),
    ]
    # If stream exists: logs warning, doesn't update configuration
    # If stream missing: creates it

# โœ… CreateOrUpdateStream - Creates OR updates (RECOMMENDED)
async def setup(self):
    return [
        operations.CreateOrUpdateStream(
            name=self.stream_name,
            subjects=self.subjects,
            retention=api.RetentionPolicy.LIMITS,
            max_age=3600,
        ),
    ]
    # If stream exists: updates configuration
    # If stream missing: creates it

Why CreateOrUpdateStream for production?

  • โœ… Flexible - Updates stream configuration when it changes
  • โœ… Idempotent - Safe to run multiple times
  • โœ… CI/CD friendly - Applies configuration changes automatically
  • โœ… Zero downtime - Updates streams without recreation
  • โœ… Version control - Stream config evolves with your code

When to use each:

# CreateStream - Simple cases
# - Only creates stream if missing
# - Logs warning if stream exists (doesn't update)
# - Good for static configurations that never change

# CreateOrUpdateStream - Production (RECOMMENDED)
# - Creates stream if missing
# - Updates stream configuration if exists
# - Perfect when stream config evolves over time
# - Ideal for CI/CD pipelines

# DeleteStream - Maintenance only
# - Use nats_delete_stream command
# - Requires confirmation

# UpdateStream - Manual updates only  
# - Use nats_update_stream command
# - For specific one-time configuration changes

## Key Features Explained

### ๐ŸŽฏ Decorator-Based Handler Registration

The `@handle` decorator provides explicit, clean handler registration:

```python
from nats_consumer import ConsumerHandler, handle

class MyHandler(ConsumerHandler):
    # โœ… One subject, one handler
    @handle('orders.created')
    async def on_created(self, msg):
        pass
    
    # โœ… Multiple subjects, one handler
    @handle('orders.updated', 'orders.modified', 'orders.changed')
    async def on_updated(self, msg):
        pass
    
    # โœ… Wildcard patterns
    @handle('orders.*')  # Match orders.created, orders.updated, etc.
    async def on_any_order(self, msg):
        pass
    
    @handle('notifications.>')  # Match notifications.email, notifications.sms.sent, etc.
    async def on_any_notification(self, msg):
        pass
    
    # โœ… Name methods however you want!
    @handle('payments.completed')
    async def process_payment_completion(self, msg):
        pass

Benefits:

  • Explicit: Clear which methods handle which subjects
  • Flexible: One method can handle multiple subjects
  • No conventions: Name methods however you want
  • Wildcards: Full support for * and > patterns
  • Type-safe: Easy to understand and maintain

๐Ÿ”„ Native NATS Retry Mechanism

Uses NATS JetStream's built-in retry with exponential backoff:

class MyConsumer(JetstreamPushConsumer):
    max_deliver = 5  # Maximum delivery attempts
    ack_wait = 30  # Seconds to wait for ACK before retry
    backoff_delays = [1.0, 2.0, 4.0, 8.0, 16.0]  # Exponential backoff

How it works:

  1. Message fails โ†’ NAK sent to NATS
  2. NATS waits backoff_delays[attempt] seconds
  3. NATS redelivers message
  4. After max_deliver attempts โ†’ handle_error() called

๐ŸŒŸ Wildcard Subject Matching

Exact match takes priority:

class MyHandler(ConsumerHandler):
    @handle('orders.created')  # Exact match
    async def on_created(self, msg):
        # This is called for orders.created
        pass
    
    @handle('orders.*')  # Wildcard
    async def on_any_order(self, msg):
        # This is called for orders.updated, orders.deleted, etc.
        # But NOT for orders.created (exact match wins)
        pass

Wildcard patterns:

  • * - Matches exactly one token: orders.* matches orders.created but not orders.payment.completed
  • > - Matches one or more tokens: orders.> matches orders.created and orders.payment.completed

Consumer Types

Push Consumer (Event-Driven)

Best for low-latency, real-time event processing:

from nats_consumer import JetstreamPushConsumer
from nats_consumer.operations import CreateOrUpdateStream, api

class RealtimeOrderConsumer(JetstreamPushConsumer):
    stream_name = "orders"
    subjects = ["orders.*"]  # All order events
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.handler = OrderHandler()
    
    async def setup(self):
        """Use CreateOrUpdateStream for production"""
        return [
            CreateOrUpdateStream(
                name=self.stream_name,
                subjects=self.subjects,
                retention=api.RetentionPolicy.LIMITS,
            ),
        ]
    
    async def handle_message(self, message):
        await self.handler.handle(message)

Pull Consumer (Batch Processing)

Best for high-throughput batch processing:

from nats_consumer import JetstreamPullConsumer
from nats_consumer.operations import CreateOrUpdateStream, api

class BatchOrderConsumer(JetstreamPullConsumer):
    stream_name = "orders"
    subjects = ["orders.*"]
    batch_size = 100  # Process 100 messages at a time
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.handler = OrderHandler()
    
    async def setup(self):
        """Use CreateOrUpdateStream for production"""
        return [
            CreateOrUpdateStream(
                name=self.stream_name,
                subjects=self.subjects,
                retention=api.RetentionPolicy.LIMITS,
            ),
        ]
    
    async def handle_message(self, message):
        await self.handler.handle(message)

Advanced Patterns

Multiple Handlers for Different Concerns

class OrderValidationHandler(ConsumerHandler):
    @handle('orders.created')
    async def validate_order(self, msg):
        # Validation logic
        pass

class OrderNotificationHandler(ConsumerHandler):
    @handle('orders.created', 'orders.updated')
    async def send_notification(self, msg):
        # Notification logic
        pass

class OrderConsumer(JetstreamPushConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.validation_handler = OrderValidationHandler()
        self.notification_handler = OrderNotificationHandler()
    
    async def handle_message(self, message):
        # Chain handlers
        await self.validation_handler.handle(message)
        await self.notification_handler.handle(message)

Custom Fallback Handling

class RobustHandler(ConsumerHandler):
    @handle('orders.created')
    async def on_created(self, msg):
        pass
    
    async def fallback_handle(self, msg, reason="unknown"):
        """
        Called when no handler matches the subject.
        
        Reasons:
        - "no_handler": No handler registered for this subject
        
        Default behavior: NAK (message will be redelivered)
        """
        if reason == "no_handler":
            # Log and discard unknown subjects
            logger.warning(f"Unknown subject: {msg.subject}")
            await msg.ack()  # ACK to prevent redelivery
        else:
            # NAK for potential retry
            logger.error(f"Handler issue: {msg.subject} ({reason})")
            await msg.nak()

Publishing Messages

publish.py

import asyncio
import json
from nats_consumer import get_nats_client

async def publish_messages():
    ns = await get_nats_client()
    js = ns.jetstream()
    
    # โœ… ALWAYS use dot notation for subjects
    for i in range(5):
        data = {"id": i, "name": f"Order {i}", "status": "created"}
        data_b = json.dumps(data).encode("utf-8")
        print(f"Publishing message {i}...")
        
        # โœ… Good: dot notation
        await js.publish("orders.created", data_b)
        
        # โŒ Bad: other separators
        # await js.publish("orders-created", data_b)  # DON'T DO THIS
        # await js.publish("orders_created", data_b)  # DON'T DO THIS
    
    await ns.close()

if __name__ == "__main__":
    asyncio.run(publish_messages())

Running Consumers

Basic Usage

# Run a single consumer with setup
python manage.py nats_consumer OrderConsumer --setup

# Run multiple specific consumers
python manage.py nats_consumer OrderConsumer BatchOrderConsumer

# Run all registered consumers
python manage.py nats_consumer

Development Options

# Enable auto-reload for development (watches for file changes)
python manage.py nats_consumer --reload

# Run with specific batch size for Pull consumers
python manage.py nats_consumer BatchOrderConsumer --batch-size 50

Production Considerations

# Run with uvloop for better performance
python manage.py nats_consumer --event-loop uvloop

# Run with custom timeout
python manage.py nats_consumer --timeout 30

Advanced Configuration

Error Handling Behaviors

from nats_consumer import ErrorAckBehavior

class MyConsumer(JetstreamPushConsumer):
    # Choose error acknowledgment behavior:
    handle_error_ack_behavior = ErrorAckBehavior.ACK  # Acknowledge and move on
    handle_error_ack_behavior = ErrorAckBehavior.NAK  # Negative ack for redelivery
    handle_error_ack_behavior = ErrorAckBehavior.IMPLEMENTED_BY_HANDLE_ERROR  # Custom handling

Retry Configuration

class MyConsumer(JetstreamPushConsumer):
    max_retries = 5  # Maximum retry attempts
    initial_retry_delay = 2.0  # Initial delay in seconds
    max_retry_delay = 120.0  # Maximum delay in seconds
    backoff_factor = 2.0  # Exponential backoff multiplier

Performance Optimization

For production environments, uvloop provides better performance on Unix-like systems:

pip install django-nats-consumer[uvloop]

settings.py

NATS_CONSUMER = {
    "event_loop_policy": "uvloop.EventLoopPolicy",
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 2,
        "connect_timeout": 10,
    },
    "default_durable_name": "my-app",  # Default durable name for consumers
}

Monitoring

Consumers provide built-in metrics:

class MyConsumer(JetstreamPushConsumer):
    async def handle_message(self, message):
        # Access metrics
        print(f"Success count: {self.total_success_count}")
        print(f"Error count: {self.total_error_count}")
        print(f"Is running: {self.is_running}")
        print(f"Is connected: {self.is_connected}")

Best Practices

๐Ÿ“‹ Handler Design

โœ… DO:

  • Use @handle decorator for all handlers
  • Use descriptive method names (no naming conventions required)
  • Implement fallback_handle() for unhandled messages
  • Use wildcards for catch-all handlers
  • Handle one concern per handler class

โŒ DON'T:

  • Don't forget to call super().__init__() in your handler
  • Don't register the same subject in multiple handlers (causes warnings)
  • Don't mix handler logic with consumer logic

๐ŸŽฏ Subject Design

โœ… ALWAYS DO:

  • Use dot notation exclusively: orders.created, users.profile.updated
  • Use hierarchical structure: domain.entity.action or domain.action
  • Use lowercase: orders.created not Orders.Created
  • Use descriptive names: orders.payment.completed not orders.pc
  • Use wildcards strategically: orders.*, notifications.>

โŒ NEVER DO:

  • Don't use hyphens: โŒ orders-created โ†’ โœ… orders.created
  • Don't use underscores: โŒ orders_created โ†’ โœ… orders.created
  • Don't mix separators: โŒ orders.created + users-updated
  • Don't use overly deep hierarchies: โŒ a.b.c.d.e.f.g (max 3-4 levels)
  • Don't use spaces or special chars: โŒ orders created or orders@created

Subject naming patterns:

# Pattern 1: domain.action (simple)
'orders.created'
'payments.completed'
'users.registered'

# Pattern 2: domain.entity.action (detailed)
'orders.payment.completed'
'users.profile.updated'
'notifications.email.sent'

# Pattern 3: domain.subdomain.entity.action (complex)
'ecommerce.orders.payment.completed'
'platform.users.profile.updated'

๐Ÿ”ง Retry Configuration

class MyConsumer(JetstreamPushConsumer):
    # Conservative (for critical operations)
    max_deliver = 10
    ack_wait = 60
    backoff_delays = [1, 2, 5, 10, 30, 60, 120, 300, 600, 900]
    
    # Aggressive (for non-critical operations)
    max_deliver = 3
    ack_wait = 10
    backoff_delays = [1, 5, 10]

๐Ÿ›ก๏ธ Error Handling

from nats_consumer import ErrorAckBehavior

class MyConsumer(JetstreamPushConsumer):
    # After max_deliver is reached:
    
    # Option 1: NAK (redelivery - default, safest)
    handle_error_ack_behavior = ErrorAckBehavior.NAK
    
    # Option 2: ACK (discard message)
    handle_error_ack_behavior = ErrorAckBehavior.ACK
    
    # Option 3: Custom (handle in handle_error method)
    handle_error_ack_behavior = ErrorAckBehavior.IMPLEMENTED_BY_HANDLE_ERROR
    
    async def handle_error(self, msg, error, delivery_count):
        # Send to DLQ, alert monitoring, etc.
        await send_to_dlq(msg, error)
        await msg.ack()  # Must ACK/NAK if using IMPLEMENTED_BY_HANDLE_ERROR

๐Ÿ“Š Monitoring

class MonitoredConsumer(JetstreamPushConsumer):
    async def handle_message(self, message):
        # Built-in metrics
        logger.info(f"Success: {self.total_success_count}")
        logger.info(f"Errors: {self.total_error_count}")
        logger.info(f"Running: {self.is_running}")
        logger.info(f"Connected: {self.is_connected}")
        
        await self.handler.handle(message)

Migration from v1.x

If you're upgrading from the old auto-detection approach, see DECORATOR_MIGRATION.md for a complete migration guide.

Quick summary:

# OLD (v1.x)
class OrderHandler(ConsumerHandler):
    def __init__(self):
        subjects = ["orders.created", "orders.updated"]  # โœ… Good: dot notation
        super().__init__(subjects)
    
    async def handle_created(self, msg):  # Auto-detected by name
        pass

# NEW (v2.x)
class OrderHandler(ConsumerHandler):
    @handle('orders.created')  # โœ… Explicit with decorator + dot notation
    async def on_created(self, msg):  # Name it however you want!
        pass
    
    @handle('orders.updated', 'orders.modified')  # Multiple subjects!
    async def on_updated(self, msg):
        pass

โš ๏ธ Important: If migrating from non-dot notation subjects

If your old code used hyphens or underscores, strongly consider migrating to dot notation:

# OLD (BAD PRACTICE)
subjects = ["orders-created", "orders_updated"]  # โŒ Don't do this

# NEW (BEST PRACTICE)
subjects = ["orders.created", "orders.updated"]  # โœ… Always use dots

This may require updating your publishers and stream configurations, but it's worth it for long-term maintainability.

Testing

# Run all tests
uv run pytest

# Run specific test file
uv run pytest tests/test_handler.py -v

# Run with coverage
uv run pytest --cov=nats_consumer --cov-report=html

Performance

For production environments, install with uvloop for better performance:

pip install oxnats[uvloop]

settings.py:

NATS_CONSUMER = {
    "event_loop_policy": "uvloop.EventLoopPolicy",
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
    },
}

License

This project is licensed under the MIT License.

Original Work Attribution

This project is a fork and significant enhancement of the original work by Christian Toivola (@dev360).

The original project was licensed under the BSD-3-Clause License. We gratefully acknowledge the foundational work that made this project possible.

Original Author: Christian Toivola
Original Repository: https://github.com/dev360
Original License: BSD-3-Clause

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues, questions, or contributions, please visit the GitHub repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oxnats-2.0.0.tar.gz (20.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

oxnats-2.0.0-py3-none-any.whl (24.0 kB view details)

Uploaded Python 3

File details

Details for the file oxnats-2.0.0.tar.gz.

File metadata

  • Download URL: oxnats-2.0.0.tar.gz
  • Upload date:
  • Size: 20.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.8

File hashes

Hashes for oxnats-2.0.0.tar.gz
Algorithm Hash digest
SHA256 8380b90024a20dc176447daf0ef90f24f7d442072e687d34bd12320b0f1bb940
MD5 255596fae55877d526d0c4ec5a68fd5e
BLAKE2b-256 81ccaab4e16141a2fa562f72e06859917d1bd6f9acef9d5737d51cbd3ac7067a

See more details on using hashes here.

File details

Details for the file oxnats-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: oxnats-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 24.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.8

File hashes

Hashes for oxnats-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d2dc34977b81cddbcdd830b70476f324aa183d16f4a650252dcc6dbe201f9526
MD5 0ff5c8bdef8e767c64f0a1132f2289f4
BLAKE2b-256 ad0c75b255b1e34de8e2ada49e7f9752f7b812ab1b5303b2c7b9a1cbb65bd995

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page