NATS JetStream wrapper providing a FastStream-like interface for easy message handling.
Project description
fsai-nats-app
NatsApp - NATS JetStream Wrapper
A Python wrapper that provides a FastStream-like interface for NATS JetStream, making it easy to build robust message-driven microservices with minimal boilerplate code.
✨ Features
- 🚀 FastStream-like API - Familiar decorator-based interface for easy adoption
- 🔄 Graceful Shutdown - Configurable timeout ensuring message processing completion
- 🛡️ Robust Error Handling - Automatic message acknowledgment/negative acknowledgment with retry logic
- 🔌 Auto-Reconnection - Built-in connection resilience with exponential backoff
- 📦 Pydantic Integration - Seamless validation with Pydantic models
- ⚡ Pull Subscriptions - Efficient JetStream pull-based message consumption
- 🎯 Multiple Subscribers - Support for multiple concurrent message handlers
- 📊 Monitoring - Built-in subscription statistics and health monitoring
- 🐛 Debug Mode - Comprehensive logging for development and troubleshooting
🚀 Quick Start
Installation
pip install nats-py pydantic loguru
Basic Example
import asyncio
from pydantic import BaseModel
from nats_wrapper import ConsumerConfig, PullSubConfig, StreamConfig, create_nats_app
# Define your data models
class UserMessage(BaseModel):
user_id: int
content: str
timestamp: str
class ProcessedMessage(BaseModel):
user_id: int
processed_content: str
status: str
# Create the NATS application
app = create_nats_app(
servers=["nats://localhost:4222"],
graceful_timeout=30
)
# Create a publisher
output_publisher = app.publisher(
subject="processed.messages",
stream=StreamConfig(name="PROCESSED_STREAM", declare=False)
)
# Define message handler with decorator
@app.subscriber(
subject="user.messages",
stream=StreamConfig(name="USER_STREAM", declare=False),
durable="message_processor",
pull_sub=PullSubConfig(batch_size=1, timeout=5),
config=ConsumerConfig(ack_wait=30, max_deliver=3)
)
async def process_user_message(data, msg):
# Validate incoming data
user_msg = UserMessage(**data)
# Process the message
processed_content = f"Processed: {user_msg.content}"
# Create and publish result
result = ProcessedMessage(
user_id=user_msg.user_id,
processed_content=processed_content,
status="completed"
)
await output_publisher.publish(result)
# Run the application
async def main():
await app.run()
if __name__ == "__main__":
asyncio.run(main())
📚 Documentation
Application Configuration
Create a NATS application with custom configuration:
app = create_nats_app(
servers=["nats://server1:4222", "nats://server2:4222"], # Multiple servers for HA
graceful_timeout=60, # Shutdown timeout in seconds
allow_reconnect=True, # Enable auto-reconnection
reconnect_time_wait=2, # Wait time between reconnection attempts
max_reconnect_attempts=10000, # Maximum reconnection attempts (-1 for infinite)
ping_interval=5, # Health check ping interval
connect_timeout=2.0, # Initial connection timeout
logger_instance=my_logger, # Custom logger instance (optional)
message_parser=my_parser_func # Custom message parser function (optional)
)
Subscriber Configuration
Configure message subscribers with fine-grained control:
@app.subscriber(
subject="order.created", # NATS subject to subscribe to
stream=StreamConfig( # JetStream configuration
name="ORDER_EVENTS",
declare=False # Don't create stream (assume exists)
),
durable="order_processor", # Durable consumer name
pull_sub=PullSubConfig( # Pull subscription settings
batch_size=5, # Process up to 5 messages at once
timeout=10 # Timeout for message fetching
),
config=ConsumerConfig( # Consumer behavior
ack_wait=60, # Time to wait for ACK before retry
max_deliver=3 # Maximum delivery attempts
)
)
async def handle_order_created(data, msg):
# Your message processing logic here
order = OrderModel(**data)
await process_order(order)
Publisher Configuration
Create publishers for sending messages:
# Simple publisher
publisher = app.publisher(
subject="order.notifications",
stream=StreamConfig(name="NOTIFICATION_STREAM", declare=False),
timeout=30 # Publish timeout
)
# Publishing messages
await publisher.publish({"order_id": 123, "status": "completed"})
await publisher.publish(OrderNotification(order_id=123, status="completed")) # Pydantic model
Environment-Based Configuration
Use environment variables for deployment flexibility:
import os
app = create_nats_app(
servers=os.getenv("NATS_SERVERS", "nats://localhost:4222").split(","),
graceful_timeout=int(os.getenv("GRACEFUL_SHUTDOWN_TIMEOUT", "30"))
)
@app.subscriber(
subject=os.getenv("INPUT_SUBJECT", "default.input"),
stream=StreamConfig(name=os.getenv("INPUT_STREAM", "DEFAULT_STREAM")),
durable=os.getenv("CONSUMER_GROUP", "default_consumer")
)
async def handler(data, msg):
# Process message
pass
🔧 Advanced Usage
Multiple Subscribers
Handle different message types with multiple subscribers:
@app.subscriber(
subject="user.created",
stream=StreamConfig(name="USER_EVENTS"),
durable="user_created_processor"
)
async def handle_user_created(data, msg):
user = UserCreatedEvent(**data)
await send_welcome_email(user)
@app.subscriber(
subject="user.updated",
stream=StreamConfig(name="USER_EVENTS"),
durable="user_updated_processor"
)
async def handle_user_updated(data, msg):
user = UserUpdatedEvent(**data)
await update_user_cache(user)
@app.subscriber(
subject="user.deleted",
stream=StreamConfig(name="USER_EVENTS"),
durable="user_deleted_processor"
)
async def handle_user_deleted(data, msg):
user = UserDeletedEvent(**data)
await cleanup_user_data(user)
Error Handling and Retry Logic
The wrapper automatically handles errors and retries:
@app.subscriber(
subject="payment.process",
stream=StreamConfig(name="PAYMENT_STREAM"),
durable="payment_processor",
config=ConsumerConfig(
ack_wait=30, # 30 second timeout
max_deliver=5 # Retry up to 5 times
)
)
async def process_payment(data, msg):
try:
payment = PaymentRequest(**data)
# This might fail and trigger retry
result = await external_payment_service.charge(payment)
# Publish success event
await success_publisher.publish(PaymentSuccess(**result))
except ValidationError as e:
# Invalid data - don't retry
logger.error(f"Invalid payment data: {e}")
await msg.term() # Terminate message (no retry)
except PaymentServiceError as e:
# Temporary error - will retry automatically
logger.warning(f"Payment service error: {e}")
raise # Let wrapper handle retry logic
except Exception as e:
# Unknown error - retry
logger.error(f"Unexpected error: {e}")
raise
Manual Message Acknowledgment
Control message acknowledgment manually when needed:
@app.subscriber(
subject="batch.process",
stream=StreamConfig(name="BATCH_STREAM"),
durable="batch_processor"
)
async def process_batch(data, msg):
batch = BatchJob(**data)
try:
# Start processing
await start_batch_job(batch)
# Don't auto-ack yet - wait for completion
# Manually acknowledge when ready
await msg.ack()
except TemporaryError:
# Negative ack for redelivery
await msg.nak()
except PermanentError:
# Terminate message (permanent failure)
await msg.term()
Monitoring and Statistics
Monitor your application's health:
# Enable debug logging
app.enable_debug_logging(True)
# Get subscription statistics
stats = app.get_subscription_stats()
print(f"Active subscribers: {stats['total_subscribers']}")
print(f"Connection status: {stats['connection_status']}")
# Log detailed statistics
app.log_subscription_stats()
Batch Processing
Process messages in batches for better throughput:
@app.subscriber(
subject="analytics.events",
stream=StreamConfig(name="ANALYTICS_STREAM"),
durable="analytics_batch_processor",
pull_sub=PullSubConfig(
batch_size=50, # Process 50 messages at once
timeout=5 # 5 second batch timeout
)
)
async def process_analytics_batch(data, msg):
# This handler will receive up to 50 messages
event = AnalyticsEvent(**data)
await store_analytics_event(event)
Custom Message Parsers
Override the default message parsing logic with a custom parser function:
def custom_message_parser(message_bytes: bytes) -> Any:
"""
Custom parser that handles special message formats.
Args:
message_bytes: Raw message bytes from NATS
Returns:
Parsed data in any format (dict, object, string, etc.)
Raises:
Exception: If parsing fails, will fall back to default parser
"""
# Example: Parse Protocol Buffer messages
try:
proto_message = MyProtoMessage()
proto_message.ParseFromString(message_bytes)
return proto_message
except Exception:
# If custom parsing fails, default parser will be used
raise
# Create app with custom parser
app = create_nats_app(
servers=["nats://localhost:4222"],
message_parser=custom_message_parser
)
@app.subscriber(
subject="proto.messages",
stream=StreamConfig(name="PROTO_STREAM"),
durable="proto_processor"
)
async def handle_proto_message(data, msg):
# data is already parsed by custom_message_parser
# data will be a MyProtoMessage instance
process_proto_data(data)
Common Custom Parser Use Cases
1. Protocol Buffers:
import my_proto_pb2
def protobuf_parser(message_bytes: bytes):
message = my_proto_pb2.MyMessage()
message.ParseFromString(message_bytes)
return message
app = create_nats_app(
servers=["nats://localhost:4222"],
message_parser=protobuf_parser
)
2. MessagePack:
import msgpack
def msgpack_parser(message_bytes: bytes):
return msgpack.unpackb(message_bytes, raw=False)
app = create_nats_app(
servers=["nats://localhost:4222"],
message_parser=msgpack_parser
)
3. AVRO:
import avro.io
import io
def avro_parser(message_bytes: bytes):
bytes_reader = io.BytesIO(message_bytes)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(avro_schema)
return reader.read(decoder)
app = create_nats_app(
servers=["nats://localhost:4222"],
message_parser=avro_parser
)
4. Custom Binary Format:
import struct
def custom_binary_parser(message_bytes: bytes):
# Parse custom binary format: 4 bytes int, 8 bytes timestamp, rest is string
message_id = struct.unpack('>I', message_bytes[:4])[0]
timestamp = struct.unpack('>Q', message_bytes[4:12])[0]
payload = message_bytes[12:].decode('utf-8')
return {
'id': message_id,
'timestamp': timestamp,
'payload': payload
}
app = create_nats_app(
servers=["nats://localhost:4222"],
message_parser=custom_binary_parser
)
Default Parser Behavior
If no custom parser is provided (or if the custom parser raises an exception), the default parser will:
- Try UTF-8 decoding: Attempt to decode bytes as UTF-8 text
- Try JSON parsing: If UTF-8 successful, try parsing as JSON
- Fallback to string: If not JSON, return as plain string
- Return raw bytes: If not UTF-8, return original bytes
# Default parsing logic (automatic):
# - JSON message: {"key": "value"} → dict
# - Plain text: "hello" → str
# - Binary data: b'\x00\x01\x02' → bytes
Error Handling with Custom Parsers
If your custom parser raises an exception, the default parser will automatically be used as a fallback:
def strict_json_parser(message_bytes: bytes):
"""Only parse JSON, fail otherwise"""
import json
return json.loads(message_bytes.decode('utf-8')) # Will raise on non-JSON
app = create_nats_app(
servers=["nats://localhost:4222"],
message_parser=strict_json_parser
)
# If strict_json_parser fails:
# - Exception is logged
# - Default parser takes over
# - Message still gets processed
🔄 Migration from FastStream
Migrating from FastStream is straightforward:
Before (FastStream):
from faststream import FastStream
from faststream.nats import NatsBroker, JStream, PullSub
broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
@broker.subscriber(
"input.messages",
stream=JStream("INPUT_STREAM"),
pull_sub=PullSub(batch_size=1)
)
async def handler(data: MyModel):
# Process data
pass
After (NatsApp):
from nats_wrapper import create_nats_app, StreamConfig, PullSubConfig
app = create_nats_app("nats://localhost:4222")
@app.subscriber(
subject="input.messages",
stream=StreamConfig(name="INPUT_STREAM"),
durable="my_consumer",
pull_sub=PullSubConfig(batch_size=1)
)
async def handler(data, msg):
# Validate data manually
validated_data = MyModel(**data)
# Process data
pass
Key Migration Points:
- Handler Signature: Change from
(model)to(data, msg) - Manual Validation: Add Pydantic validation in handler
- Durable Consumer: Specify consumer name explicitly
- Message Access: Use
msgparameter for advanced operations
🌟 Real-World Example
Here's a complete microservice example:
import asyncio
import os
from datetime import datetime
from pydantic import BaseModel
from loguru import logger
from nats_wrapper import create_nats_app, StreamConfig, PullSubConfig, ConsumerConfig
# Data Models
class OrderCreated(BaseModel):
order_id: str
user_id: str
total_amount: float
items: list[dict]
class OrderProcessed(BaseModel):
order_id: str
status: str
processed_at: datetime
tracking_number: str
class InventoryUpdate(BaseModel):
item_id: str
quantity_reserved: int
# Application Setup
app = create_nats_app(
servers=os.getenv("NATS_SERVERS", "nats://localhost:4222").split(","),
graceful_timeout=60,
logger_instance=logger
)
# Publishers
order_status_publisher = app.publisher(
subject="order.status",
stream=StreamConfig(name="ORDER_STATUS_STREAM")
)
inventory_publisher = app.publisher(
subject="inventory.updates",
stream=StreamConfig(name="INVENTORY_STREAM")
)
# Order Processing Service
@app.subscriber(
subject="order.created",
stream=StreamConfig(name="ORDER_EVENTS"),
durable="order_processor_service",
pull_sub=PullSubConfig(batch_size=3, timeout=10),
config=ConsumerConfig(ack_wait=120, max_deliver=3)
)
async def process_order(data, msg):
"""Process new orders and update inventory"""
try:
# Validate order data
order = OrderCreated(**data)
logger.info(f"Processing order {order.order_id} for user {order.user_id}")
# Reserve inventory for each item
for item in order.items:
inventory_update = InventoryUpdate(
item_id=item["id"],
quantity_reserved=item["quantity"]
)
await inventory_publisher.publish(inventory_update)
# Generate tracking number
tracking_number = f"TRK{order.order_id}{datetime.now().strftime('%Y%m%d')}"
# Create processed order status
processed_order = OrderProcessed(
order_id=order.order_id,
status="processed",
processed_at=datetime.now(),
tracking_number=tracking_number
)
# Publish status update
await order_status_publisher.publish(processed_order)
logger.info(f"Successfully processed order {order.order_id} with tracking {tracking_number}")
except ValidationError as e:
logger.error(f"Invalid order data: {e}")
await msg.term() # Don't retry invalid data
except InventoryServiceError as e:
logger.warning(f"Inventory service error for order {order.order_id}: {e}")
raise # Retry on inventory service errors
except Exception as e:
logger.error(f"Unexpected error processing order {order.order_id}: {e}")
raise
# Health Check Endpoint
@app.subscriber(
subject="service.health.check",
stream=StreamConfig(name="HEALTH_STREAM"),
durable="order_service_health"
)
async def health_check(data, msg):
"""Respond to health check requests"""
stats = app.get_subscription_stats()
health_response = {
"service": "order-processor",
"status": "healthy" if stats["connection_status"] == "connected" else "unhealthy",
"timestamp": datetime.now().isoformat(),
"stats": stats
}
if msg.reply:
await app.nc.publish(msg.reply, json.dumps(health_response).encode())
# Service Entry Point
async def main():
logger.info("🚀 Starting Order Processing Service")
logger.info(f"📊 Configuration: NATS={os.getenv('NATS_SERVERS')}")
# Run the service
await app.run()
if __name__ == "__main__":
asyncio.run(main())
🔧 Configuration Reference
Environment Variables
| Variable | Description | Default |
|---|---|---|
NATS_SERVERS |
Comma-separated NATS server URLs | nats://localhost:4222 |
GRACEFUL_SHUTDOWN_TIMEOUT |
Shutdown timeout in seconds | 30 |
NATS_DEBUG |
Enable debug logging | false |
INPUT_STREAM |
Input stream name | - |
INPUT_SUBJECT |
Input subject pattern | - |
OUTPUT_STREAM |
Output stream name | - |
OUTPUT_SUBJECT |
Output subject pattern | - |
CONSUMER_GROUP |
Consumer group/durable name | - |
Docker Environment
version: '3.8'
services:
order-processor:
image: my-app:latest
environment:
- NATS_SERVERS=nats://nats-server:4222
- INPUT_STREAM=ORDER_EVENTS
- INPUT_SUBJECT=order.created
- OUTPUT_STREAM=ORDER_STATUS
- OUTPUT_SUBJECT=order.processed
- CONSUMER_GROUP=order_processor_v1
- GRACEFUL_SHUTDOWN_TIMEOUT=60
- NATS_DEBUG=false
depends_on:
- nats-server
🤝 Contributing
Contributions are welcome! Please feel free to submit issues and pull requests.
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
- Built on top of the excellent nats-py library
- Inspired by FastStream for the developer experience
- Uses Pydantic for data validation
- Logging powered by Loguru
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fsai_nats_app-0.0.1.tar.gz.
File metadata
- Download URL: fsai_nats_app-0.0.1.tar.gz
- Upload date:
- Size: 17.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.4 CPython/3.12.3 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d41f73407bf08f57c29b441102f57a338c83e14ad466ac93b85d27653437c10d
|
|
| MD5 |
4121f5a9c45a29d875b148fe6964902b
|
|
| BLAKE2b-256 |
7b68beae865dc7b044996b89953d92e24af4c6c2e040f323c2a617cd33fb2b71
|
File details
Details for the file fsai_nats_app-0.0.1-py3-none-any.whl.
File metadata
- Download URL: fsai_nats_app-0.0.1-py3-none-any.whl
- Upload date:
- Size: 13.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.4 CPython/3.12.3 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8c5dd8a79484d89bdbd3213c66f2a3298be5b2a184cc4db903ff1bbb4b946f7
|
|
| MD5 |
791d1898ee07f97b8fdd7152678c7261
|
|
| BLAKE2b-256 |
4ed1dfc8ffd0432901340c353b1c56df01e7158251dac6dbd6c511bf6a62b1d1
|