Skip to main content

A non-blocking, queue-based Python logging library for shipping logs to Elasticsearch and other destinations

Project description

eezy-logging

A non-blocking, queue-based Python logging library for shipping logs to Elasticsearch and other destinations.

Features

  • Non-blocking: Logs are queued and processed in a background thread, never blocking your application
  • Queue-based: Choose between in-memory queue or Redis for persistence
  • Batching: Logs are batched for efficient bulk writes
  • Extensible: Easy to create custom sinks and queues
  • Elasticsearch support: Built-in sink with ILM (Index Lifecycle Management) policies
  • OpenSearch support: Built-in sink with ISM (Index State Management) policies

Installation

pip install eezy-logging

With optional dependencies:

# For Elasticsearch support
pip install eezy-logging[elasticsearch]

# For OpenSearch support
pip install eezy-logging[opensearch]

# For Redis queue support
pip install eezy-logging[redis]

# All optional dependencies
pip install eezy-logging[all]

Quick Start

import logging
from eezy_logging import EezyHandler
from eezy_logging.sinks import ElasticsearchSink

# Create sink and handler
sink = ElasticsearchSink(index_prefix="myapp-logs")
handler = EezyHandler(sink=sink)

# Attach to logger
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Log normally - non-blocking!
logger.info("Application started", extra={"version": "1.0.0"})
logger.info("User logged in", extra={"user_id": 123, "ip": "192.168.1.1"})

# Clean shutdown (flushes remaining logs)
handler.close()

Configuration

EezyHandler

from eezy_logging import EezyHandler

handler = EezyHandler(
    sink=sink,              # Required: where to send logs
    queue=None,             # Optional: queue backend (default: InMemoryQueue)
    batch_size=100,         # Max records per batch
    flush_interval=5.0,     # Max seconds before flushing partial batch
    level=logging.NOTSET,   # Minimum log level
    max_retries=3,          # Max retry attempts for failed writes
    retry_base_delay=1.0,   # Base delay for exponential backoff (seconds)
    serializer=None,        # Custom serializer function (default: serialize_record)
)

Note: Retries use exponential backoff (1s, 2s, 4s, ...) and are non-blocking - the worker continues consuming new logs while waiting to retry failed batches.

Custom Serializer

You can provide a custom serializer function to control how log records are converted to dictionaries:

import logging
from typing import Any
from eezy_logging import EezyHandler
from eezy_logging.serializer import serialize_record

def my_serializer(record: logging.LogRecord) -> dict[str, Any]:
    # Start with default serialization
    data = serialize_record(record)

    # Add custom fields
    data["environment"] = "production"
    data["service"] = "my-api"

    # Transform existing fields
    if "user_id" in data:
        data["user_id"] = str(data["user_id"])

    return data

handler = EezyHandler(sink=sink, serializer=my_serializer)

Or create a completely custom serializer:

def minimal_serializer(record: logging.LogRecord) -> dict[str, Any]:
    return {
        "timestamp": record.created,
        "level": record.levelname,
        "message": record.getMessage(),
        "logger": record.name,
    }

handler = EezyHandler(sink=sink, serializer=minimal_serializer)

Queue Backends

InMemoryQueue (default)

Uses Python's queue.Queue. Simple and efficient for most use cases.

from eezy_logging import InMemoryQueue

queue = InMemoryQueue(max_size=10000)
handler = EezyHandler(sink=sink, queue=queue)

Note: Records are lost if the process crashes. For critical logs, use RedisQueue.

DequeQueue

Uses collections.deque. Slightly more efficient under high load when queue is frequently full.

from eezy_logging import DequeQueue

queue = DequeQueue(max_size=10000)
handler = EezyHandler(sink=sink, queue=queue)

RedisQueue

Uses Redis LIST for persistence. Logs survive process restarts.

from eezy_logging import RedisQueue

# Using environment variables or defaults
queue = RedisQueue(key="myapp:logs")

# Using custom client
from redis import Redis
client = Redis(host="redis.example.com", port=6379, password="secret")
queue = RedisQueue(client=client, key="myapp:logs", max_size=50000)

handler = EezyHandler(sink=sink, queue=queue)

Environment variables (used when client is not provided):

  • EEZY_REDIS_HOST (default: localhost)
  • EEZY_REDIS_PORT (default: 6379)
  • EEZY_REDIS_PASSWORD (default: none)
  • EEZY_REDIS_DB (default: 0)

Sinks

ElasticsearchSink

For Elasticsearch clusters. Supports Index Lifecycle Management (ILM) for automatic index rollover and deletion.

from eezy_logging.sinks import ElasticsearchSink, ILMPolicy

# Using environment variables or defaults
sink = ElasticsearchSink(
    index_prefix="myapp-logs",      # Index name prefix
    index_date_format="%Y.%m.%d",   # Date suffix format (None to disable)
    setup_index_template=True,       # Create index template on setup
    setup_ilm_policy=True,           # Create ILM policy on setup
    max_retries=3,                   # Retry attempts for failed writes
    retry_delay=1.0,                 # Initial retry delay (exponential backoff)
)

# Custom ILM policy
policy = ILMPolicy(
    rollover_max_age="1d",
    rollover_max_size="10gb",
    rollover_max_docs=1000000,
    warm_after="7d",
    delete_after="30d",
)
sink = ElasticsearchSink(
    index_prefix="myapp-logs",
    ilm_policy=policy,
)

# Custom index settings
sink = ElasticsearchSink(
    index_prefix="myapp-logs",
    custom_index_settings={
        "refresh_interval": "30s",
        "number_of_shards": 3,
        "number_of_replicas": 2,
    }
)

# Custom field mappings (replaces defaults - use with custom serializer)
sink = ElasticsearchSink(
    index_prefix="myapp-logs",
    custom_mappings={
        "properties": {
            "@timestamp": {"type": "date"},
            "message": {"type": "text"},
            "level": {"type": "keyword"},
            "user_id": {"type": "keyword"},
            "request_duration_ms": {"type": "float"},
        }
    }
)

# Using custom client
from elasticsearch import Elasticsearch
client = Elasticsearch(
    ["https://es.example.com:9200"],
    api_key="your-api-key"
)
sink = ElasticsearchSink(client=client, index_prefix="myapp-logs")

Environment variables (used when client is not provided):

  • EEZY_ES_HOSTS (comma-separated, default: http://localhost:9200)
  • EEZY_ES_USERNAME
  • EEZY_ES_PASSWORD
  • EEZY_ES_API_KEY
  • EEZY_ES_VERIFY_CERTS (default: true)

Supported versions: Elasticsearch 7.x, 8.x, and 9.x are supported. The library automatically detects the installed client version and uses the appropriate APIs.

Note: For Elastic Cloud deployments, create your own client with the cloud_id parameter and pass it to the sink.

OpenSearchSink

For OpenSearch clusters. Supports Index State Management (ISM) for automatic index rollover and deletion.

from eezy_logging.sinks import OpenSearchSink, ISMPolicy

# Using environment variables or defaults
sink = OpenSearchSink(
    index_prefix="myapp-logs",       # Index name prefix (also used as alias)
    setup_index_template=True,       # Create index template on setup
    setup_ism_policy=True,           # Create ISM policy on setup
)

# Custom ISM policy - standard configuration
policy = ISMPolicy(
    rollover_min_index_age="1d",
    rollover_min_size="10gb",
    rollover_min_doc_count=1000000,
    warm_after="7d",        # Move to warm state after 7 days
    delete_after="30d",     # Delete after 30 days
)
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    ism_policy=policy,
)

# Skip warm phase (hot -> delete directly)
policy = ISMPolicy(
    rollover_min_index_age="1d",
    rollover_min_size="10gb",
    warm_after=None,        # Skip warm phase entirely
    delete_after="30d",
)
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    ism_policy=policy,
)

# Keep data forever (no deletion)
policy = ISMPolicy(
    rollover_min_index_age="7d",
    rollover_min_size="50gb",
    warm_after="30d",
    delete_after=None,      # Keep data forever
)
sink = OpenSearchSink(
    index_prefix="audit-logs",
    ism_policy=policy,
)

# Advanced: Custom JSON policy for complete control
# Useful for multi-tier architectures (hot/warm/cold/delete)
policy = ISMPolicy(
    policy_json={
        "description": "Multi-tier policy",
        "default_state": "hot",
        "states": [
            {
                "name": "hot",
                "actions": [{"rollover": {"min_index_age": "1d", "min_size": "5gb"}}],
                "transitions": [{"state_name": "warm", "conditions": {"min_index_age": "3d"}}]
            },
            {
                "name": "warm",
                "actions": [
                    {"replica_count": {"number_of_replicas": 1}},
                    {"force_merge": {"max_num_segments": 1}}
                ],
                "transitions": [{"state_name": "cold", "conditions": {"min_index_age": "7d"}}]
            },
            {
                "name": "cold",
                "actions": [
                    {"read_only": {}},
                    {"replica_count": {"number_of_replicas": 0}}
                ],
                "transitions": [{"state_name": "delete", "conditions": {"min_index_age": "90d"}}]
            },
            {
                "name": "delete",
                "actions": [{"delete": {}}],
                "transitions": []
            }
        ]
        # ism_template is added automatically if not present
    }
)
sink = OpenSearchSink(
    index_prefix="app-logs",
    ism_policy=policy,
)

# Custom index settings
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    custom_index_settings={
        "refresh_interval": "30s",
        "number_of_shards": 3,
        "number_of_replicas": 2,
    }
)

# Custom field mappings (replaces defaults - use with custom serializer)
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    custom_mappings={
        "properties": {
            "@timestamp": {"type": "date"},
            "message": {"type": "text"},
            "level": {"type": "keyword"},
            "user_id": {"type": "keyword"},
            "request_duration_ms": {"type": "float"},
        }
    }
)

# Using custom client
from opensearchpy import OpenSearch
client = OpenSearch(
    ["https://opensearch.example.com:9200"],
    http_auth=("user", "password")
)
sink = OpenSearchSink(client=client, index_prefix="myapp-logs")

ISM Policy Options:

  • Standard lifecycle: warm_after="7d" + delete_after="30d" (default: hot → warm → delete)
  • Skip warm phase: warm_after=None + delete_after="30d" (hot → delete)
  • Keep forever: delete_after=None (no deletion)
  • Custom JSON: policy_json={...} for complete control (other attributes ignored)

Environment variables (used when client is not provided):

  • EEZY_OS_HOSTS (comma-separated, default: http://localhost:9200)
  • EEZY_OS_USERNAME
  • EEZY_OS_PASSWORD
  • EEZY_OS_VERIFY_CERTS (default: true)

Custom Sink

Implement the Sink interface to send logs anywhere:

from eezy_logging.sinks import Sink, WriteResult

class SlackSink(Sink):
    def __init__(self, webhook_url: str, min_level: int = logging.ERROR):
        self.webhook_url = webhook_url
        self.min_level = min_level

    def setup(self) -> None:
        # Optional: called once before first write
        pass

    def write_batch(self, records: list[dict]) -> WriteResult:
        import requests
        failed = []
        for record in records:
            if record.get("levelno", 0) >= self.min_level:
                try:
                    requests.post(self.webhook_url, json={
                        "text": f"[{record['level']}] {record['message']}"
                    })
                except Exception:
                    failed.append(record)
        if failed:
            return WriteResult.failure(failed, "Failed to post to Slack")
        return WriteResult.ok()

    def close(self) -> None:
        # Optional: cleanup resources
        pass

Note: write_batch should return WriteResult.ok() on success or WriteResult.failure(records, error) for records that should be retried. The worker handles retry scheduling with exponential backoff - sinks should NOT block on retries.

Custom Queue

Implement the Queue interface for custom queue backends:

from eezy_logging.queues import Queue

class MyQueue(Queue):
    def put(self, record: dict) -> None:
        # Add record to queue (non-blocking)
        ...

    def get_batch(self, max_size: int, timeout: float) -> list[dict]:
        # Get up to max_size records, block up to timeout seconds
        ...

    def close(self) -> None:
        # Cleanup resources
        ...

    @property
    def size(self) -> int:
        # Return approximate queue size
        ...

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                   Application Code                               │
│              logger.info("message", extra={...})                 │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    EezyHandler                                   │
│   - Serializes LogRecord to dict                                 │
│   - Pushes to Queue (non-blocking)                               │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Queue                                         │
│   - InMemoryQueue (queue.Queue)                                  │
│   - DequeQueue (collections.deque)                               │
│   - RedisQueue (Redis LIST)                                      │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Worker Thread                                 │
│   - Consumes batches from queue                                  │
│   - Respects batch_size and flush_interval                       │
│   - Non-blocking retry scheduling with exponential backoff       │
│   - Drains queue on shutdown                                     │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Sink                                          │
│   - ElasticsearchSink (bulk API + ILM)                           │
│   - OpenSearchSink (bulk API + ISM)                              │
│   - Custom sinks                                                 │
└─────────────────────────────────────────────────────────────────┘

Overflow Behavior

When a queue reaches its max_size:

  • InMemoryQueue / DequeQueue: Oldest records are dropped, a warning is logged every 100 drops
  • RedisQueue: List is trimmed to max_size, keeping newest records

Graceful Shutdown

Call handler.close() to ensure remaining logs are flushed:

import atexit

handler = EezyHandler(sink=sink)
atexit.register(handler.close)  # Optional: auto-close on exit

Log Record Format

Each log record is serialized to a dictionary with a clean structure. Debugging context (hostname, line numbers, thread info, etc.) is grouped under metadata:

{
    "@timestamp": "2026-01-06T12:00:00.000000+00:00",
    "message": "User logged in",
    "level": "INFO",
    "logger": "myapp.auth",
    "metadata": {
        "hostname": "myapp-pod-abc123",  # Useful for K8s/distributed systems
        "levelno": 20,
        "pathname": "/app/auth.py",
        "filename": "auth.py",
        "module": "auth",
        "funcName": "login",
        "lineno": 42,
        "process": 12345,
        "processName": "MainProcess",
        "thread": 140234567890,
        "threadName": "MainThread",
    },
    # Extra fields from logger.info(..., extra={...})
    "user_id": 123,
    "ip": "192.168.1.1",
    # Exception info (if present)
    "exc_info": "Traceback (most recent call last):...",
}

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eezy_logging-0.1.5.tar.gz (82.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eezy_logging-0.1.5-py3-none-any.whl (32.0 kB view details)

Uploaded Python 3

File details

Details for the file eezy_logging-0.1.5.tar.gz.

File metadata

  • Download URL: eezy_logging-0.1.5.tar.gz
  • Upload date:
  • Size: 82.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eezy_logging-0.1.5.tar.gz
Algorithm Hash digest
SHA256 1113ea97c017269475148bd31023f5cebfff099f559972208addc72d936f3eaa
MD5 bd5874e5c2f6f308694584e98d718a27
BLAKE2b-256 0542ef14246a91f9d25d3ac25762c6374be6113f295c9129f276c6ca4205dec6

See more details on using hashes here.

File details

Details for the file eezy_logging-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: eezy_logging-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 32.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eezy_logging-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 bfd3520a8cfb99c531b842b34d205c6fb00b2f00850a001df5853d0b799b7134
MD5 f30e442903028ca3939ee9c2633c4143
BLAKE2b-256 e27a67fe77eae4f20870c53e7da6de1ac43f0c5c46d98d719a0975ca902b2c43

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page