Skip to main content

A powerful, flexible, and easy-to-use logging library with streaming and exception tracking

Project description

AmzurLog - Custom Logging Library

A powerful, flexible, and easy-to-use logging library built from scratch for Python applications.

Features

🚀 Core Features

  • Multiple log levels (DEBUG, INFO, WARNING, ERROR, CRITICAL)
  • Thread-safe logging operations
  • Structured logging with JSON support
  • Custom formatters and filters
  • File rotation and log management
  • Performance monitoring decorators
  • Context-aware logging
  • Async logging support

📡 Event Streaming

  • Real-time event streaming to monitoring platforms
  • ELK Stack integration (Elasticsearch, Logstash, Kibana)
  • Grafana Loki integration for log aggregation
  • Apache Kafka streaming support
  • Redis Streams integration
  • HTTP webhook streaming
  • Circuit breaker and rate limiting
  • Batch processing and buffering

🛡️ Exception Tracking

  • Comprehensive exception capture and reporting
  • Sentry integration for error monitoring
  • Rollbar integration for error tracking
  • Custom webhook integrations
  • Exception fingerprinting and deduplication
  • Context enrichment with breadcrumbs
  • Rate limiting for exception spam prevention
  • Automatic severity classification

🎨 Handlers

  • Console/stdout output with colors
  • File logging with rotation
  • Multi-handler support
  • Custom handler creation

📊 Formatters

  • Simple text formatting
  • JSON structured logging
  • Colored console output
  • CSV format support
  • Template-based formatting

🔍 Filters

  • Level-based filtering
  • Pattern matching
  • Rate limiting
  • Duplicate detection
  • Thread-based filtering
  • Custom field filtering

Decorators

  • Performance logging
  • Function call tracking
  • Error/exception logging
  • Async function support

🌐 Context Management

  • Request/response context
  • Thread-local context
  • Transaction context
  • Global context variables

Quick Start

Basic Usage

import amzurlog

# Quick logging
amzurlog.info("Application started")
amzurlog.warning("This is a warning")
amzurlog.error("An error occurred")

# Get a named logger
logger = amzurlog.configure_logger("my_app")
logger.info("Hello from my_app!")

Advanced Configuration

from amzurlog import AmzurLogger, FileHandler, JSONFormatter, LevelFilter

# Create logger
logger = AmzurLogger("advanced_app")

# Add file handler with JSON formatter
handler = FileHandler("app.log")
handler.set_formatter(JSONFormatter(indent=2))
handler.add_filter(LevelFilter(min_level="INFO"))
logger.add_handler(handler)

# Log with structured data
logger.info("User login", user_id="123", ip="192.168.1.1", success=True)

Quick Setup

from amzurlog import quick_setup

# One-liner setup
logger = quick_setup(
    level='DEBUG',
    log_file='myapp.log',
    format_type='json',
    console=True
)

logger.info("Ready to go!")

Event Streaming

Stream to ELK Stack

from amzurlog import AmzurLogger, ELKStreamingHandler

# Create logger with ELK streaming
logger = AmzurLogger("my_app")
elk_handler = ELKStreamingHandler(
    elasticsearch_hosts=["localhost:9200"],
    index_prefix="myapp-logs"
)
logger.add_handler("elk", elk_handler)

# Logs are automatically streamed to Elasticsearch
logger.info("User action", user_id="123", action="login")

Stream to Grafana Loki

from amzurlog import GrafanaStreamingHandler

# Add Grafana Loki streaming
grafana_handler = GrafanaStreamingHandler(
    loki_url="http://localhost:3100",
    job_name="my-application"
)
logger.add_handler("grafana", grafana_handler)

logger.info("System status", component="database", status="healthy")

Stream to Kafka

from amzurlog import KafkaStreamingHandler

# Stream logs to Kafka
kafka_handler = KafkaStreamingHandler(
    bootstrap_servers=["localhost:9092"],
    topic="application-logs"
)
logger.add_handler("kafka", kafka_handler)

logger.error("Payment failed", transaction_id="tx-456", amount=99.99)

Stream to Redis

from amzurlog import RedisStreamingHandler

# Stream to Redis Streams
redis_handler = RedisStreamingHandler(
    redis_url="redis://localhost:6379",
    stream_name="app:logs"
)
logger.add_handler("redis", redis_handler)

logger.warning("High memory usage", memory_percent=85)

Configuration

Core Configuration-Based Streaming

from amzurlog import create_streaming_config

# Create streaming configuration
config = create_streaming_config(
    destinations={
        "elasticsearch": {
            "enabled": True,
            "hosts": ["localhost:9200"],
            "index_prefix": "myapp"
        },
        "kafka": {
            "enabled": True,
            "servers": ["localhost:9092"],
            "topic": "logs"
        }
    },
    circuit_breaker={
        "failure_threshold": 5,
        "recovery_timeout": 60
    },
    rate_limiting={
        "max_events_per_second": 100
    }
)

# Apply configuration to logger
logger = config.setup_streaming("my_app")

Exception Tracking

Basic Exception Tracking

from amzurlog import AmzurLogger, ExceptionTracker, ExceptionHandler

# Create logger with exception tracking
logger = AmzurLogger("my_app")
exception_handler = ExceptionHandler(auto_capture=True)
logger.add_handler("exceptions", exception_handler)

# Exceptions are automatically captured and tracked
try:
    risky_operation()
except Exception:
    logger.error("Operation failed")  # Exception details automatically captured

Sentry Integration

from amzurlog import SentryHandler

# Add Sentry integration
sentry_handler = SentryHandler(
    dsn="https://your-dsn@sentry.io/project-id",
    environment="production"
)
logger.add_handler("sentry", sentry_handler)

# Exceptions automatically sent to Sentry
logger.error("Critical error occurred", exc_info=True)

Exception Decorator

from amzurlog import track_exceptions, ExceptionTracker, ExceptionSeverity

logger = AmzurLogger("my_app")
tracker = ExceptionTracker(logger)

@track_exceptions(tracker, severity=ExceptionSeverity.HIGH, reraise=True)
def critical_function():
    # Any exception here is automatically tracked
    raise ValueError("Something went wrong")

try:
    critical_function()
except ValueError:
    pass  # Exception was tracked automatically

Global Exception Handler

from amzurlog import install_global_exception_handler, ExceptionTracker

logger = AmzurLogger("my_app")
tracker = ExceptionTracker(logger)

# Install global handler for unhandled exceptions
install_global_exception_handler(tracker)

# Any unhandled exception will be automatically tracked
raise RuntimeError("This will be tracked automatically")

Context and Breadcrumbs

from amzurlog import ExceptionTracker

tracker = ExceptionTracker(logger)

# Set user context
tracker.set_user_context("user_123", email="user@example.com")

# Set request context
tracker.set_request_context("req_456", method="POST", url="/api/data")

# Add breadcrumbs for debugging
tracker.add_breadcrumb("Starting data processing", "process")
tracker.add_breadcrumb("Loading configuration", "config")

try:
    process_data()
except Exception:
    # Exception will include user context, request info, and breadcrumbs
    tracker.handle_exception(severity=ExceptionSeverity.HIGH)

Custom Exception Integration

from amzurlog import WebhookExceptionIntegration, ExceptionHandler

# Custom webhook integration
webhook_integration = WebhookExceptionIntegration(
    webhook_url="https://your-monitoring.com/exceptions",
    headers={"Authorization": "Bearer your-token"}
)

exception_handler = ExceptionHandler()
exception_handler.add_integration("webhook", webhook_integration)
logger.add_handler("exceptions", exception_handler)

Exception Configuration

from amzurlog import create_exception_config

# Create exception tracking configuration
config = create_exception_config(
    integrations={
        "sentry": {
            "enabled": True,
            "dsn": "https://your-dsn@sentry.io/project-id",
            "environment": "production"
        },
        "webhook": {
            "enabled": True,
            "url": "https://monitoring.com/webhook",
            "headers": {"Authorization": "Bearer token"}
        }
    },
    rate_limiting={
        "max_exceptions": 10,
        "time_window": 60
    },
    capture_settings={
        "capture_locals": True,
        "capture_globals": False,
        "max_breadcrumbs": 50
    }
)

# Apply configuration
logger = config.setup_exception_tracking("my_app")

From Dictionary

from amzurlog import AmzurLogConfig

config = AmzurLogConfig({
    'level': 'INFO',
    'console': {'enabled': True},
    'file': 'app.log',
    'rotation': {
        'max_size': '10MB',
        'backup_count': 5
    },
    'format': 'json'
})

logger = config.setup_logger('my_app')

From JSON File

{
    "level": "INFO",
    "handlers": [
        {
            "type": "console",
            "formatter": {
                "type": "colored",
                "options": {}
            }
        },
        {
            "type": "rotating",
            "filename": "logs/app.log",
            "max_bytes": "10MB",
            "backup_count": 5,
            "formatter": {
                "type": "json",
                "options": {"indent": 2}
            }
        }
    ],
    "filters": [
        {
            "type": "level",
            "min_level": "INFO"
        }
    ]
}
from amzurlog import AmzurLogConfig

config = AmzurLogConfig.from_file('config.json')
logger = config.setup_logger('my_app')

From Environment Variables

export AMZURLOG_LEVEL=DEBUG
export AMZURLOG_FORMAT=json
export AMZURLOG_DIR=logs
export AMZURLOG_MAX_SIZE=50MB
export AMZURLOG_BACKUP_COUNT=10
export AMZURLOG_CONSOLE=true
from amzurlog import AmzurLogConfig

config = AmzurLogConfig.from_env()
logger = config.setup_logger('my_app')

Performance Decorators

Function Performance Monitoring

from amzurlog.decorators import log_performance

@log_performance(threshold_seconds=0.1, include_memory=True)
def expensive_operation():
    # Your code here
    return "result"

# Automatically logs execution time, memory usage, CPU time
result = expensive_operation()

Function Call Logging

from amzurlog.decorators import log_calls

@log_calls(include_args=True, include_result=True)
def process_data(data, format="json"):
    return {"processed": len(data)}

# Logs function calls with parameters and return values
result = process_data([1, 2, 3], format="xml")

Error Logging

from amzurlog.decorators import log_errors

@log_errors(include_locals=True)
def risky_operation():
    # This will automatically log any exceptions
    raise ValueError("Something went wrong")

risky_operation()  # Exception details logged automatically

Context Management

Request Context

from amzurlog.context import RequestContext

with RequestContext("req-123", method="POST", path="/api/users", user_id="456"):
    logger.info("Processing request")
    logger.warning("Validation failed")
    # All logs include request context automatically

Custom Context

from amzurlog.context import log_context

with log_context(transaction_id="tx-789", operation="transfer"):
    logger.info("Starting transaction")
    # Process transaction
    logger.info("Transaction completed")

Thread-Local Context

from amzurlog.context import set_global_context

# Set context for the entire thread
set_global_context(service="payment", version="1.2.3")

logger.info("Service started")  # Includes service and version

Filters and Rate Limiting

Rate Limiting

from amzurlog.filters import RateLimitFilter

# Allow max 10 messages per minute
rate_filter = RateLimitFilter(max_rate=10, time_window=60)
handler.add_filter(rate_filter)

Pattern Filtering

from amzurlog.filters import PatternFilter

# Only log messages containing "ERROR"
error_filter = PatternFilter(r"ERROR", include=True)
handler.add_filter(error_filter)

Duplicate Prevention

from amzurlog.filters import DuplicateFilter

# Prevent duplicate messages within 5 minutes
dup_filter = DuplicateFilter(time_window=300, max_duplicates=1)
handler.add_filter(dup_filter)

Custom Formatters

Template Formatter

from amzurlog.formatters import TemplateFormatter

formatter = TemplateFormatter(
    template="[{timestamp:%Y-%m-%d %H:%M:%S}] {level} | {logger} | {message}",
    field_formatters={
        'timestamp': lambda dt: dt.strftime('%Y-%m-%d %H:%M:%S'),
        'level': lambda lvl: lvl.name.upper()
    }
)

CSV Formatter

from amzurlog.formatters import CSVFormatter

formatter = CSVFormatter(
    fields=['timestamp', 'level', 'logger', 'message', 'user_id'],
    delimiter=','
)

Async Support

from amzurlog.decorators import log_async_calls
from amzurlog.context import async_log_context

@log_async_calls(include_args=True)
async def fetch_data(url):
    async with async_log_context(operation="fetch", url=url):
        # Your async code
        return await some_async_operation()

File Rotation

Size-Based Rotation

from amzurlog.handlers import RotatingFileHandler

handler = RotatingFileHandler(
    filename="app.log",
    max_bytes=10 * 1024 * 1024,  # 10MB
    backup_count=5
)

Time-Based Rotation

from amzurlog.handlers import TimedRotatingFileHandler

# Daily rotation at midnight
daily_handler = TimedRotatingFileHandler(
    filename="daily_app.log",
    when='midnight',
    interval=1,
    backup_count=7  # Keep 7 days
)

# Hourly rotation
hourly_handler = TimedRotatingFileHandler(
    filename="hourly_app.log", 
    when='H',
    interval=1,
    backup_count=24  # Keep 24 hours
)

# Weekly rotation (every Monday)
weekly_handler = TimedRotatingFileHandler(
    filename="weekly_app.log",
    when='W0',  # 0=Monday, 1=Tuesday, etc.
    interval=1,
    backup_count=4  # Keep 4 weeks
)

# Custom time rotation (daily at 2:30 AM)
from datetime import time
custom_handler = TimedRotatingFileHandler(
    filename="custom_app.log",
    when='midnight',
    interval=1,
    backup_count=30,
    at_time=time(2, 30)  # 2:30 AM
)

Rotation Schedule Options

  • 'S' - Second-based rotation
  • 'M' - Minute-based rotation
  • 'H' - Hourly rotation
  • 'D' or 'midnight' - Daily rotation
  • 'W0'-'W6' - Weekly rotation (0=Monday, 6=Sunday)

Advanced Options

# UTC time rotation for global applications
utc_handler = TimedRotatingFileHandler(
    filename="global_app.log",
    when='midnight',
    utc=True,  # Use UTC instead of local time
    backup_count=30
)

# Delayed file opening (for efficiency)
delayed_handler = TimedRotatingFileHandler(
    filename="delayed_app.log", 
    when='H',
    delay=True,  # Don't create file until first log
    backup_count=24
)

Integration Examples

FastAPI Integration

from fastapi import FastAPI, Request
from amzurlog.context import RequestContext
import amzurlog

app = FastAPI()
logger = amzurlog.configure_logger("fastapi_app")

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    request_id = str(uuid.uuid4())
    
    with RequestContext(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
        ip_address=request.client.host
    ):
        logger.info("Request started")
        response = await call_next(request)
        logger.info("Request completed", status_code=response.status_code)
        
    return response

Django Integration

# In Django settings.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'amzurlog': {
            'class': 'amzurlog.handlers.FileHandler',
            'filename': 'django.log',
            'formatter': 'json',
        },
    },
    'loggers': {
        'django': {
            'handlers': ['amzurlog'],
            'level': 'INFO',
        },
    },
}

Flask Integration

from flask import Flask, g, request
from amzurlog.context import RequestContext
import amzurlog

app = Flask(__name__)
logger = amzurlog.configure_logger("flask_app")

@app.before_request
def before_request():
    g.request_context = RequestContext(
        request_id=str(uuid.uuid4()),
        method=request.method,
        path=request.path,
        ip_address=request.remote_addr
    )
    g.request_context.__enter__()
    logger.info("Request started")

@app.after_request
def after_request(response):
    logger.info("Request completed", status_code=response.status_code)
    g.request_context.__exit__(None, None, None)
    return response

Testing

Run the complete test suite:

cd amzurlog

# Run all tests
python test_amzurlog.py        # Core functionality tests
python test_streaming.py       # Event streaming tests  
python test_exception_tracking.py  # Exception tracking tests

# Run complete integration example
python complete_integration_example.py

Or run specific test classes:

python -m unittest test_amzurlog.TestAmzurLogger
python -m unittest test_amzurlog.TestHandlers
python -m unittest test_amzurlog.TestFormatters
python -m unittest test_streaming.TestStreamManager
python -m unittest test_exception_tracking.TestExceptionTracker

Performance

AmzurLog is designed for high performance:

  • Thread-safe operations with minimal locking
  • Efficient memory usage with lazy formatting
  • Optional performance monitoring with psutil
  • Rate limiting to prevent log spam
  • Configurable log rotation to manage disk space

Security

  • PII sanitization support (integrate with your sanitizer)
  • Secure context isolation
  • Rate limiting prevents log injection attacks
  • Thread-safe operations prevent race conditions

API Reference

Core Classes

  • AmzurLogger: Main logger class
  • LogLevel: Log level enumeration
  • LogRecord: Individual log entry representation

Handlers

  • ConsoleHandler: Output to stdout/stderr
  • FileHandler: Output to files
  • RotatingFileHandler: File output with size-based rotation
  • TimedRotatingFileHandler: File output with time-based rotation
  • MultiHandler: Multiple handler support

Formatters

  • SimpleFormatter: Basic text formatting
  • JSONFormatter: Structured JSON output
  • ColoredFormatter: Colored console output
  • CSVFormatter: CSV format output
  • TemplateFormatter: Custom template formatting

Filters

  • LevelFilter: Filter by log level
  • PatternFilter: Filter by message pattern
  • RateLimitFilter: Rate limiting
  • DuplicateFilter: Prevent duplicates
  • ThreadFilter: Filter by thread
  • FieldFilter: Filter by custom fields

Context

  • LogContext: Add context to logs
  • RequestContext: HTTP request context
  • TransactionContext: Database transaction context
  • log_context(): Context manager
  • async_log_context(): Async context manager

Decorators

  • @log_performance: Performance monitoring
  • @log_calls: Function call logging
  • @log_errors: Error logging
  • @log_async_calls: Async function logging

License

MIT License - see LICENSE file for details.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for your changes
  4. Run the test suite
  5. Submit a pull request

Support

For questions and support, please open an issue on the GitHub repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amzurlog-1.2.0.tar.gz (65.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amzurlog-1.2.0-py3-none-any.whl (68.0 kB view details)

Uploaded Python 3

File details

Details for the file amzurlog-1.2.0.tar.gz.

File metadata

  • Download URL: amzurlog-1.2.0.tar.gz
  • Upload date:
  • Size: 65.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for amzurlog-1.2.0.tar.gz
Algorithm Hash digest
SHA256 a013974c0b6c8a172e512723fe387a4e892a919674f01a5d2926e8adfe52f6f8
MD5 0b5a73054f0572c6e361f9d5d19fd566
BLAKE2b-256 61407690f09221d7636632e704d49f71b8e1acc4dbe095bf58c94d070f06b774

See more details on using hashes here.

File details

Details for the file amzurlog-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: amzurlog-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 68.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for amzurlog-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a8884ab547c411c8207642808a4195212a7b90675e8f5f980b7dec13be5b687f
MD5 8db8a858d27746eddf8d967ff85b83c6
BLAKE2b-256 7b927f3934ba5fe758e5bd641c2085b8a118bccb1f3cfde3ec88df0c018d0047

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page