Skip to main content

A non-blocking, queue-based Python logging library for shipping logs to Elasticsearch and other destinations

Project description

eezy-logging

A non-blocking, queue-based Python logging library for shipping logs to Elasticsearch and other destinations.

Features

  • Non-blocking: Logs are queued and processed in a background thread, never blocking your application
  • Queue-based: Choose between in-memory queue or Redis for persistence
  • Batching: Logs are batched for efficient bulk writes
  • Extensible: Easy to create custom sinks and queues
  • Elasticsearch support: Built-in sink with ILM (Index Lifecycle Management) policies
  • OpenSearch support: Built-in sink with ISM (Index State Management) policies

Installation

pip install eezy-logging

With optional dependencies:

# For Elasticsearch support
pip install eezy-logging[elasticsearch]

# For OpenSearch support
pip install eezy-logging[opensearch]

# For Redis queue support
pip install eezy-logging[redis]

# All optional dependencies
pip install eezy-logging[all]

Quick Start

import logging
from eezy_logging import EezyHandler
from eezy_logging.sinks import ElasticsearchSink

# Create sink and handler
sink = ElasticsearchSink(index_prefix="myapp-logs")
handler = EezyHandler(sink=sink)

# Attach to logger
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Log normally - non-blocking!
logger.info("Application started", extra={"version": "1.0.0"})
logger.info("User logged in", extra={"user_id": 123, "ip": "192.168.1.1"})

# Clean shutdown (flushes remaining logs)
handler.close()

Configuration

EezyHandler

from eezy_logging import EezyHandler

handler = EezyHandler(
    sink=sink,              # Required: where to send logs
    queue=None,             # Optional: queue backend (default: InMemoryQueue)
    batch_size=100,         # Max records per batch
    flush_interval=5.0,     # Max seconds before flushing partial batch
    level=logging.NOTSET,   # Minimum log level
    max_retries=3,          # Max retry attempts for failed writes
    retry_base_delay=1.0,   # Base delay for exponential backoff (seconds)
    serializer=None,        # Custom serializer function (default: serialize_record)
)

Note: Retries use exponential backoff (1s, 2s, 4s, ...) and are non-blocking - the worker continues consuming new logs while waiting to retry failed batches.

Custom Serializer

You can provide a custom serializer function to control how log records are converted to dictionaries:

import logging
from typing import Any
from eezy_logging import EezyHandler
from eezy_logging.serializer import serialize_record

def my_serializer(record: logging.LogRecord) -> dict[str, Any]:
    # Start with default serialization
    data = serialize_record(record)

    # Add custom fields
    data["environment"] = "production"
    data["service"] = "my-api"

    # Transform existing fields
    if "user_id" in data:
        data["user_id"] = str(data["user_id"])

    return data

handler = EezyHandler(sink=sink, serializer=my_serializer)

Or create a completely custom serializer:

def minimal_serializer(record: logging.LogRecord) -> dict[str, Any]:
    return {
        "timestamp": record.created,
        "level": record.levelname,
        "message": record.getMessage(),
        "logger": record.name,
    }

handler = EezyHandler(sink=sink, serializer=minimal_serializer)

Queue Backends

InMemoryQueue (default)

Uses Python's queue.Queue. Simple and efficient for most use cases.

from eezy_logging import InMemoryQueue

queue = InMemoryQueue(max_size=10000)
handler = EezyHandler(sink=sink, queue=queue)

Note: Records are lost if the process crashes. For critical logs, use RedisQueue.

DequeQueue

Uses collections.deque. Slightly more efficient under high load when queue is frequently full.

from eezy_logging import DequeQueue

queue = DequeQueue(max_size=10000)
handler = EezyHandler(sink=sink, queue=queue)

RedisQueue

Uses Redis LIST for persistence. Logs survive process restarts.

from eezy_logging import RedisQueue

# Using environment variables or defaults
queue = RedisQueue(key="myapp:logs")

# Using custom client
from redis import Redis
client = Redis(host="redis.example.com", port=6379, password="secret")
queue = RedisQueue(client=client, key="myapp:logs", max_size=50000)

handler = EezyHandler(sink=sink, queue=queue)

Environment variables (used when client is not provided):

  • EEZY_REDIS_HOST (default: localhost)
  • EEZY_REDIS_PORT (default: 6379)
  • EEZY_REDIS_PASSWORD (default: none)
  • EEZY_REDIS_DB (default: 0)

Sinks

ElasticsearchSink

For Elasticsearch clusters. Supports Index Lifecycle Management (ILM) for automatic index rollover and deletion.

from eezy_logging.sinks import ElasticsearchSink, ILMPolicy

# Using environment variables or defaults
sink = ElasticsearchSink(
    index_prefix="myapp-logs",      # Index name prefix
    index_date_format="%Y.%m.%d",   # Date suffix format (None to disable)
    setup_index_template=True,       # Create index template on setup
    setup_ilm_policy=True,           # Create ILM policy on setup
    max_retries=3,                   # Retry attempts for failed writes
    retry_delay=1.0,                 # Initial retry delay (exponential backoff)
)

# Custom ILM policy
policy = ILMPolicy(
    rollover_max_age="1d",
    rollover_max_size="10gb",
    rollover_max_docs=1000000,
    warm_after="7d",
    delete_after="30d",
)
sink = ElasticsearchSink(
    index_prefix="myapp-logs",
    ilm_policy=policy,
)

# Custom index settings
sink = ElasticsearchSink(
    index_prefix="myapp-logs",
    custom_index_settings={
        "refresh_interval": "30s",
        "number_of_shards": 3,
        "number_of_replicas": 2,
    }
)

# Custom field mappings (replaces defaults - use with custom serializer)
sink = ElasticsearchSink(
    index_prefix="myapp-logs",
    custom_mappings={
        "properties": {
            "@timestamp": {"type": "date"},
            "message": {"type": "text"},
            "level": {"type": "keyword"},
            "user_id": {"type": "keyword"},
            "request_duration_ms": {"type": "float"},
        }
    }
)

# Using custom client
from elasticsearch import Elasticsearch
client = Elasticsearch(
    ["https://es.example.com:9200"],
    api_key="your-api-key"
)
sink = ElasticsearchSink(client=client, index_prefix="myapp-logs")

Environment variables (used when client is not provided):

  • EEZY_ES_HOSTS (comma-separated, default: http://localhost:9200)
  • EEZY_ES_USERNAME
  • EEZY_ES_PASSWORD
  • EEZY_ES_API_KEY
  • EEZY_ES_VERIFY_CERTS (default: true)

Supported versions: Elasticsearch 7.x, 8.x, and 9.x are supported. The library automatically detects the installed client version and uses the appropriate APIs.

Note: For Elastic Cloud deployments, create your own client with the cloud_id parameter and pass it to the sink.

OpenSearchSink

For OpenSearch clusters. Supports Index State Management (ISM) for automatic index rollover and deletion.

from eezy_logging.sinks import OpenSearchSink, ISMPolicy

# Using environment variables or defaults
sink = OpenSearchSink(
    index_prefix="myapp-logs",       # Index name prefix (also used as alias)
    setup_index_template=True,       # Create index template on setup
    setup_ism_policy=True,           # Create ISM policy on setup
)

# Custom ISM policy - standard configuration
policy = ISMPolicy(
    rollover_min_index_age="1d",
    rollover_min_size="10gb",
    rollover_min_doc_count=1000000,
    warm_after="7d",        # Move to warm state after 7 days
    delete_after="30d",     # Delete after 30 days
)
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    ism_policy=policy,
)

# Skip warm phase (hot -> delete directly)
policy = ISMPolicy(
    rollover_min_index_age="1d",
    rollover_min_size="10gb",
    warm_after=None,        # Skip warm phase entirely
    delete_after="30d",
)
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    ism_policy=policy,
)

# Keep data forever (no deletion)
policy = ISMPolicy(
    rollover_min_index_age="7d",
    rollover_min_size="50gb",
    warm_after="30d",
    delete_after=None,      # Keep data forever
)
sink = OpenSearchSink(
    index_prefix="audit-logs",
    ism_policy=policy,
)

# Advanced: Custom JSON policy for complete control
# Useful for multi-tier architectures (hot/warm/cold/delete)
policy = ISMPolicy(
    policy_json={
        "description": "Multi-tier policy",
        "default_state": "hot",
        "states": [
            {
                "name": "hot",
                "actions": [{"rollover": {"min_index_age": "1d", "min_size": "5gb"}}],
                "transitions": [{"state_name": "warm", "conditions": {"min_index_age": "3d"}}]
            },
            {
                "name": "warm",
                "actions": [
                    {"replica_count": {"number_of_replicas": 1}},
                    {"force_merge": {"max_num_segments": 1}}
                ],
                "transitions": [{"state_name": "cold", "conditions": {"min_index_age": "7d"}}]
            },
            {
                "name": "cold",
                "actions": [
                    {"read_only": {}},
                    {"replica_count": {"number_of_replicas": 0}}
                ],
                "transitions": [{"state_name": "delete", "conditions": {"min_index_age": "90d"}}]
            },
            {
                "name": "delete",
                "actions": [{"delete": {}}],
                "transitions": []
            }
        ]
        # ism_template is added automatically if not present
    }
)
sink = OpenSearchSink(
    index_prefix="app-logs",
    ism_policy=policy,
)

# Custom index settings
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    custom_index_settings={
        "refresh_interval": "30s",
        "number_of_shards": 3,
        "number_of_replicas": 2,
    }
)

# Custom field mappings (replaces defaults - use with custom serializer)
sink = OpenSearchSink(
    index_prefix="myapp-logs",
    custom_mappings={
        "properties": {
            "@timestamp": {"type": "date"},
            "message": {"type": "text"},
            "level": {"type": "keyword"},
            "user_id": {"type": "keyword"},
            "request_duration_ms": {"type": "float"},
        }
    }
)

# Using custom client
from opensearchpy import OpenSearch
client = OpenSearch(
    ["https://opensearch.example.com:9200"],
    http_auth=("user", "password")
)
sink = OpenSearchSink(client=client, index_prefix="myapp-logs")

ISM Policy Options:

  • Standard lifecycle: warm_after="7d" + delete_after="30d" (default: hot → warm → delete)
  • Skip warm phase: warm_after=None + delete_after="30d" (hot → delete)
  • Keep forever: delete_after=None (no deletion)
  • Custom JSON: policy_json={...} for complete control (other attributes ignored)

Environment variables (used when client is not provided):

  • EEZY_OS_HOSTS (comma-separated, default: http://localhost:9200)
  • EEZY_OS_USERNAME
  • EEZY_OS_PASSWORD
  • EEZY_OS_VERIFY_CERTS (default: true)

Custom Sink

Implement the Sink interface to send logs anywhere:

from eezy_logging.sinks import Sink, WriteResult

class SlackSink(Sink):
    def __init__(self, webhook_url: str, min_level: int = logging.ERROR):
        self.webhook_url = webhook_url
        self.min_level = min_level

    def setup(self) -> None:
        # Optional: called once before first write
        pass

    def write_batch(self, records: list[dict]) -> WriteResult:
        import requests
        failed = []
        for record in records:
            if record.get("levelno", 0) >= self.min_level:
                try:
                    requests.post(self.webhook_url, json={
                        "text": f"[{record['level']}] {record['message']}"
                    })
                except Exception:
                    failed.append(record)
        if failed:
            return WriteResult.failure(failed, "Failed to post to Slack")
        return WriteResult.ok()

    def close(self) -> None:
        # Optional: cleanup resources
        pass

Note: write_batch should return WriteResult.ok() on success or WriteResult.failure(records, error) for records that should be retried. The worker handles retry scheduling with exponential backoff - sinks should NOT block on retries.

Custom Queue

Implement the Queue interface for custom queue backends:

from eezy_logging.queues import Queue

class MyQueue(Queue):
    def put(self, record: dict) -> None:
        # Add record to queue (non-blocking)
        ...

    def get_batch(self, max_size: int, timeout: float) -> list[dict]:
        # Get up to max_size records, block up to timeout seconds
        ...

    def close(self) -> None:
        # Cleanup resources
        ...

    @property
    def size(self) -> int:
        # Return approximate queue size
        ...

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                   Application Code                               │
│              logger.info("message", extra={...})                 │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    EezyHandler                                   │
│   - Serializes LogRecord to dict                                 │
│   - Pushes to Queue (non-blocking)                               │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Queue                                         │
│   - InMemoryQueue (queue.Queue)                                  │
│   - DequeQueue (collections.deque)                               │
│   - RedisQueue (Redis LIST)                                      │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Worker Thread                                 │
│   - Consumes batches from queue                                  │
│   - Respects batch_size and flush_interval                       │
│   - Non-blocking retry scheduling with exponential backoff       │
│   - Drains queue on shutdown                                     │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Sink                                          │
│   - ElasticsearchSink (bulk API + ILM)                           │
│   - OpenSearchSink (bulk API + ISM)                              │
│   - Custom sinks                                                 │
└─────────────────────────────────────────────────────────────────┘

Overflow Behavior

When a queue reaches its max_size:

  • InMemoryQueue / DequeQueue: Oldest records are dropped, a warning is logged every 100 drops
  • RedisQueue: List is trimmed to max_size, keeping newest records

Graceful Shutdown

Call handler.close() to ensure remaining logs are flushed:

import atexit

handler = EezyHandler(sink=sink)
atexit.register(handler.close)  # Optional: auto-close on exit

Log Record Format

Each log record is serialized to a dictionary with a clean structure. Debugging context (hostname, line numbers, thread info, etc.) is grouped under metadata:

{
    "@timestamp": "2026-01-06T12:00:00.000000+00:00",
    "message": "User logged in",
    "level": "INFO",
    "logger": "myapp.auth",
    "metadata": {
        "hostname": "myapp-pod-abc123",  # Useful for K8s/distributed systems
        "levelno": 20,
        "pathname": "/app/auth.py",
        "filename": "auth.py",
        "module": "auth",
        "funcName": "login",
        "lineno": 42,
        "process": 12345,
        "processName": "MainProcess",
        "thread": 140234567890,
        "threadName": "MainThread",
    },
    # Extra fields from logger.info(..., extra={...})
    "user_id": 123,
    "ip": "192.168.1.1",
    # Exception info (if present)
    "exc_info": "Traceback (most recent call last):...",
}

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eezy_logging-0.1.4.tar.gz (79.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eezy_logging-0.1.4-py3-none-any.whl (30.8 kB view details)

Uploaded Python 3

File details

Details for the file eezy_logging-0.1.4.tar.gz.

File metadata

  • Download URL: eezy_logging-0.1.4.tar.gz
  • Upload date:
  • Size: 79.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eezy_logging-0.1.4.tar.gz
Algorithm Hash digest
SHA256 0e274bb784310c47982f6865a1bb85dacabcfd5f1c740ca46b1f1cd5bd19c81d
MD5 64cb02c806744afc2c045ae2cffdb5f8
BLAKE2b-256 c86379b6cb355a3dfad1bc6d662427659bf7d108fd5d6edf321ddc5287ed619e

See more details on using hashes here.

File details

Details for the file eezy_logging-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: eezy_logging-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 30.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eezy_logging-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 8288ddd73a77e4f782ff20e59779279531b1cf1516e2714f362591d0aec25df5
MD5 c0e5e9a07fd1c96eb001e4c1af215114
BLAKE2b-256 528c98a9a793ba1eeaf30f339f2c775435007a1b7a3ccfb45b580a0823db38ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page