Skip to main content

Django NATS Consumer

Project description

django-nats-consumer

NATS + Django = ⚡️

A powerful Django integration for NATS JetStream with support for both Push and Pull consumers, automatic retry mechanisms, and flexible multi-subject handling.

Features

  • 🚀 Push & Pull Consumers: Support for both JetStream consumer types
  • 🔄 Automatic Retries: Built-in exponential backoff retry mechanism
  • 🎯 Subject Filtering: Native NATS subject filtering with wildcards
  • 🎛️ Smart Handler Routing: Automatic message routing based on subjects
  • 🛡️ Error Handling: Configurable error acknowledgment behaviors
  • 📊 Monitoring: Built-in success/error counters and logging
  • Performance: Optional uvloop support for better performance
  • 🔧 Django Integration: Seamless integration with Django management commands
  • 🧪 Well Tested: Comprehensive test suite covering all features
  • 🔒 Production Ready: Used in production environments with robust error handling

What's New in This Fork

This fork adds significant enhancements to the original work:

  • ✅ Smart handler routing with ConsumerHandler
  • ✅ Subject filtering with wildcards
  • ✅ Fallback handling with collision detection
  • ✅ Comprehensive test suite with pytest
  • ✅ Multi-subject support removal (simplified architecture)s

Installation

This library is in Beta status with comprehensive test coverage and production-ready features. The API is stable but may evolve based on community feedback.

# Install directly from GitHub
pip install git+https://github.com/oxiliere/django-nats-consumer.git

Optional Performance Enhancement

# For better performance on Unix-like systems
pip install "git+https://github.com/oxiliere/django-nats-consumer.git[uvloop]"

Usage

settings.py

INSTALLED_APPS = [
    ...
    "nats_consumer",
    ...
]

NATS_CONSUMER = {
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 5,
        "reconnect_time_wait": 1,
        "connect_timeout": 10,
    },
}

Consumer Types

Push Consumer with Smart Handler Routing

# {app_name}/consumers.py
from nats_consumer import JetstreamPushConsumer, operations, ErrorAckBehavior
from nats_consumer.handler import ConsumerHandler
import logging
import json

logger = logging.getLogger(__name__)

class OrderHandler(ConsumerHandler):
    """Smart handler with automatic method routing"""
    
    def __init__(self):
        # ✅ RECOMMENDED: Use dot notation for subjects
        subjects = [
            "orders.created",
            "orders.updated", 
            "orders.cancelled",
            "orders.shipped"
        ]
        super().__init__(subjects)

    async def handle_created(self, message):
        """Handles orders.created messages"""
        logger.info(f"New order created: {message.data}")

    async def handle_updated(self, message):
        """Handles orders.updated messages"""
        logger.info(f"Order updated: {message.data}")

    async def handle_cancelled(self, message):
        """Handles orders.cancelled messages"""
        logger.info(f"Order cancelled: {data}")

    async def handle_shipped(self, message):
        """Handles orders.shipped messages"""
        logger.info(f"Order shipped: {message.data}")

    async def fallback_handle(self, msg, reason="unknown"):
        """Custom fallback for unhandled messages"""
        logger.error(f"Unhandled message for {msg.subject} (reason: {reason}): {message.data}")
        
        # Custom behavior: send to dead letter queue
        await self.send_to_dlq(msg, reason)
        await msg.ack()  # ACK after handling

class OrderConsumer(JetstreamPushConsumer):
    stream_name = "orders"
    subjects = ["orders.created", "orders.updated", "orders.cancelled", "orders.shipped"]
    
    # ✅ RECOMMENDED: Use wildcards for filtering
    filter_subject = "orders.*"  # Filter all order events
    
    # Retry configuration
    max_retries = 3
    initial_retry_delay = 1.0
    max_retry_delay = 60.0
    backoff_factor = 2.0
    
    # Error handling behavior
    handle_error_ack_behavior = ErrorAckBehavior.NAK

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.handler = OrderHandler()

    async def setup(self):
        return [
            operations.CreateStream(
                name=self.stream_name,
                subjects=self.subjects,
                storage="file"
            ),
        ]

    async def handle_message(self, message):
        """Route message to appropriate handler method"""
        await self.handler.handle(message)
    
    async def handle_error(self, message, error, attempt):
        """Optional: Custom error handling after max retries"""
        logger.error(f"Final error after {attempt} attempts: {error}")

Pull Consumer with Subject Filtering

from nats_consumer import JetstreamPullConsumer

class BatchOrderConsumer(JetstreamPullConsumer):
    stream_name = "orders"
    subjects = ["orders.created", "orders.updated", "orders.shipped"]
    
    # ✅ RECOMMENDED: Use wildcard or single subject filtering
    filter_subject = "orders.*"  # All order events
    # OR: filter_subject = "orders.created"  # Only creation events
    # OR: No filter_subject → automatically uses "orders.created" (subjects[0])
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.handler = OrderHandler()  # Reuse the same handler
    
    async def handle_message(self, message):
        """Route to handler for batch processing"""
        await self.handler.handle(message)

Subject Filtering & Best Practices

🎯 Subject Naming (RECOMMENDED)

# ✅ GOOD: Use dot notation for hierarchical subjects
subjects = [
    "orders.created",
    "orders.updated", 
    "orders.cancelled",
    "users.profile.updated",
    "payments.completed"
]

# ❌ AVOID: Mixed separators cause handler collisions
subjects = [
    "orders.created",    # → handle_created()
    "orders-created",    # → handle_created() ⚠️ COLLISION!
    "orders_created"     # → handle_created() ⚠️ COLLISION!
]

🎛️ Subject Filtering Strategies

# Wildcard filtering (recommended for related events)
filter_subject = "orders.*"           # All order events
filter_subject = "users.profile.*"    # All profile events

# Single subject filtering (for specific processing)
filter_subject = "orders.created"     # Only creation events

# Pattern filtering (for complex hierarchies)
filter_subject = "*.error"           # All error events across domains
filter_subject = "orders.*.failed"   # All failed order operations

# No filtering (process everything)
filter_subject = ">"                 # All subjects in the stream

# Auto-filtering (if filter_subject not specified)
class MyConsumer(JetstreamPushConsumer):
    subjects = ["orders.created", "orders.updated"]
    # filter_subject automatically uses "orders.created" (subjects[0])

🔍 Collision Detection

The handler automatically detects and warns about subject collisions:

# This will generate a warning:
class ProblematicHandler(ConsumerHandler):
    def __init__(self):
        subjects = [
            "orders.created",    # → handle_created()
            "users-created",     # → handle_created() ⚠️ COLLISION!
        ]
        super().__init__(subjects)

Publishing Messages

publish.py

import asyncio
import json
from nats_consumer import get_nats_client

async def publish_messages():
    ns = await get_nats_client()
    js = ns.jetstream()
    
    for i in range(5):
        data = {"id": i, "name": f"Order {i}", "status": "created"}
        data_b = json.dumps(data).encode("utf-8")
        print(f"Publishing message {i}...")
        await js.publish("orders.created", data_b)
    
    await ns.close()

if __name__ == "__main__":
    asyncio.run(publish_messages())

Running Consumers

Basic Usage

# Run a single consumer with setup
python manage.py nats_consumer OrderConsumer --setup

# Run multiple specific consumers
python manage.py nats_consumer OrderConsumer BatchOrderConsumer

# Run all registered consumers
python manage.py nats_consumer

Development Options

# Enable auto-reload for development (watches for file changes)
python manage.py nats_consumer --reload

# Run with specific batch size for Pull consumers
python manage.py nats_consumer BatchOrderConsumer --batch-size 50

Production Considerations

# Run with uvloop for better performance
python manage.py nats_consumer --event-loop uvloop

# Run with custom timeout
python manage.py nats_consumer --timeout 30

Advanced Configuration

Error Handling Behaviors

from nats_consumer import ErrorAckBehavior

class MyConsumer(JetstreamPushConsumer):
    # Choose error acknowledgment behavior:
    handle_error_ack_behavior = ErrorAckBehavior.ACK  # Acknowledge and move on
    handle_error_ack_behavior = ErrorAckBehavior.NAK  # Negative ack for redelivery
    handle_error_ack_behavior = ErrorAckBehavior.IMPLEMENTED_BY_HANDLE_ERROR  # Custom handling

Retry Configuration

class MyConsumer(JetstreamPushConsumer):
    max_retries = 5  # Maximum retry attempts
    initial_retry_delay = 2.0  # Initial delay in seconds
    max_retry_delay = 120.0  # Maximum delay in seconds
    backoff_factor = 2.0  # Exponential backoff multiplier

Performance Optimization

For production environments, uvloop provides better performance on Unix-like systems:

pip install django-nats-consumer[uvloop]

settings.py

NATS_CONSUMER = {
    "event_loop_policy": "uvloop.EventLoopPolicy",
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 2,
        "connect_timeout": 10,
    },
    "default_durable_name": "my-app",  # Default durable name for consumers
}

Monitoring

Consumers provide built-in metrics:

class MyConsumer(JetstreamPushConsumer):
    async def handle_message(self, message):
        # Access metrics
        print(f"Success count: {self.total_success_count}")
        print(f"Error count: {self.total_error_count}")
        print(f"Is running: {self.is_running}")
        print(f"Is connected: {self.is_connected}")

Best Practices

📋 Subject Design

  • ✅ Use dot notation: orders.created instead of orders-created or orders_created
  • ✅ Use hierarchical structure: users.profile.updated, orders.payment.completed
  • ✅ Use wildcards in filters: filter_subject = "orders.*"
  • ❌ Avoid mixed separators: Don't mix ., -, and _ in the same domain

🏗️ Consumer Architecture

  • Use Push consumers for low-latency, event-driven processing
  • Use Pull consumers for high-throughput batch processing
  • Use ConsumerHandler for automatic message routing
  • Use subject filtering to process only relevant events

🔧 Configuration

  • Configure appropriate retry policies based on your use case
  • Implement proper error handling with custom handle_error methods
  • Use durable consumers in production for message persistence
  • Monitor consumer metrics for operational insights

🎯 Handler Design

  • One handler method per event type: handle_created(), handle_updated()
  • Use descriptive method names that match your subject hierarchy
  • Validate handler implementations with validate_handlers()
  • Implement fallback_handle() for robust error handling

🛡️ Fallback Handling

class RobustHandler(ConsumerHandler):
    async def fallback_handle(self, msg, reason="unknown"):
        """
        Custom fallback for unhandled messages.
        Default behavior: NAK (recommended for safety)
        
        Reasons:
        - "unhandled_subject": Subject not in handler's list
        - "no_mapping": No handler method mapping found  
        - "not_implemented": Handler method not implemented
        """
        if reason == "unhandled_subject":
            # Log and discard unknown subjects
            logger.warning(f"Unknown subject {msg.subject}, discarding")
            await msg.ack()
        else:
            # For implementation issues, NAK for redelivery
            logger.error(f"Handler issue for {msg.subject}: {reason}")
            await msg.nak()  # Default behavior

### 🎯 Filter Subject Examples
```python
# Explicit filtering (recommended)
class OrderConsumer(JetstreamPushConsumer):
    subjects = ["orders.created", "orders.updated", "orders.cancelled"]
    filter_subject = "orders.*"  # Process all order events

# Auto-filtering (uses subjects[0])
class SimpleConsumer(JetstreamPushConsumer):
    subjects = ["orders.created", "orders.updated"]
    # No filter_subject → automatically uses "orders.created"

# Single subject filtering
class CreationConsumer(JetstreamPushConsumer):
    subjects = ["orders.created", "orders.updated", "orders.cancelled"]
    filter_subject = "orders.created"  # Only creation events

License

This project is licensed under the MIT License.

Original Work Attribution

This project is a fork and significant enhancement of the original work by Christian Toivola (@dev360).

The original project was licensed under the BSD-3-Clause License. We gratefully acknowledge the foundational work that made this project possible.

Original Author: Christian Toivola
Original Repository: https://github.com/dev360
Original License: BSD-3-Clause

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oxnats-1.0.0.tar.gz (15.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

oxnats-1.0.0-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file oxnats-1.0.0.tar.gz.

File metadata

  • Download URL: oxnats-1.0.0.tar.gz
  • Upload date:
  • Size: 15.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.8

File hashes

Hashes for oxnats-1.0.0.tar.gz
Algorithm Hash digest
SHA256 55c1ef10397e4a6325918bca88ae86a687fe6be3cfada5910029197febfc41be
MD5 bd74a6aaeaa0cf9c4719d414d585d5bd
BLAKE2b-256 02ae4892e795077824569c38ba0b560f291c82ea1f5b3b0785cee3547fd92cc2

See more details on using hashes here.

File details

Details for the file oxnats-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: oxnats-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.8

File hashes

Hashes for oxnats-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4da38ab81bce79b72fb21530a75e932c7e4bc4060c3356936809858950d1a438
MD5 a9f81544f155e36d814e2882d975eff4
BLAKE2b-256 a53f2bc41286dda94df7edc1e466067cb3b648f6aedf7f928d08a86526b109fc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page