A non-blocking, queue-based Python logging library for shipping logs to Elasticsearch and other destinations
Project description
eezy-logging
A non-blocking, queue-based Python logging library for shipping logs to Elasticsearch and other destinations.
Features
- Non-blocking: Logs are queued and processed in a background thread, never blocking your application
- Queue-based: Choose between in-memory queue or Redis for persistence
- Batching: Logs are batched for efficient bulk writes
- Extensible: Easy to create custom sinks and queues
- Elasticsearch support: Built-in sink with ILM (Index Lifecycle Management) policies
- OpenSearch support: Built-in sink with ISM (Index State Management) policies
Installation
pip install eezy-logging
With optional dependencies:
# For Elasticsearch support
pip install eezy-logging[elasticsearch]
# For OpenSearch support
pip install eezy-logging[opensearch]
# For Redis queue support
pip install eezy-logging[redis]
# All optional dependencies
pip install eezy-logging[all]
Quick Start
import logging
from eezy_logging import EezyHandler
from eezy_logging.sinks import ElasticsearchSink
# Create sink and handler
sink = ElasticsearchSink(index_prefix="myapp-logs")
handler = EezyHandler(sink=sink)
# Attach to logger
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Log normally - non-blocking!
logger.info("Application started", extra={"version": "1.0.0"})
logger.info("User logged in", extra={"user_id": 123, "ip": "192.168.1.1"})
# Clean shutdown (flushes remaining logs)
handler.close()
Configuration
EezyHandler
from eezy_logging import EezyHandler
handler = EezyHandler(
sink=sink, # Required: where to send logs
queue=None, # Optional: queue backend (default: InMemoryQueue)
batch_size=100, # Max records per batch
flush_interval=5.0, # Max seconds before flushing partial batch
level=logging.NOTSET, # Minimum log level
max_retries=3, # Max retry attempts for failed writes
retry_base_delay=1.0, # Base delay for exponential backoff (seconds)
serializer=None, # Custom serializer function (default: serialize_record)
)
Note: Retries use exponential backoff (1s, 2s, 4s, ...) and are non-blocking - the worker continues consuming new logs while waiting to retry failed batches.
Custom Serializer
You can provide a custom serializer function to control how log records are converted to dictionaries:
import logging
from typing import Any
from eezy_logging import EezyHandler
from eezy_logging.serializer import serialize_record
def my_serializer(record: logging.LogRecord) -> dict[str, Any]:
# Start with default serialization
data = serialize_record(record)
# Add custom fields
data["environment"] = "production"
data["service"] = "my-api"
# Transform existing fields
if "user_id" in data:
data["user_id"] = str(data["user_id"])
return data
handler = EezyHandler(sink=sink, serializer=my_serializer)
Or create a completely custom serializer:
def minimal_serializer(record: logging.LogRecord) -> dict[str, Any]:
return {
"timestamp": record.created,
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
}
handler = EezyHandler(sink=sink, serializer=minimal_serializer)
Queue Backends
InMemoryQueue (default)
Uses Python's queue.Queue. Simple and efficient for most use cases.
from eezy_logging import InMemoryQueue
queue = InMemoryQueue(max_size=10000)
handler = EezyHandler(sink=sink, queue=queue)
Note: Records are lost if the process crashes. For critical logs, use
RedisQueue.
DequeQueue
Uses collections.deque. Slightly more efficient under high load when queue is frequently full.
from eezy_logging import DequeQueue
queue = DequeQueue(max_size=10000)
handler = EezyHandler(sink=sink, queue=queue)
RedisQueue
Uses Redis LIST for persistence. Logs survive process restarts.
from eezy_logging import RedisQueue
# Using environment variables or defaults
queue = RedisQueue(key="myapp:logs")
# Using custom client
from redis import Redis
client = Redis(host="redis.example.com", port=6379, password="secret")
queue = RedisQueue(client=client, key="myapp:logs", max_size=50000)
handler = EezyHandler(sink=sink, queue=queue)
Environment variables (used when client is not provided):
EEZY_REDIS_HOST(default:localhost)EEZY_REDIS_PORT(default:6379)EEZY_REDIS_PASSWORD(default: none)EEZY_REDIS_DB(default:0)
Sinks
ElasticsearchSink
For Elasticsearch clusters. Supports Index Lifecycle Management (ILM) for automatic index rollover and deletion.
from eezy_logging.sinks import ElasticsearchSink, ILMPolicy
# Using environment variables or defaults
sink = ElasticsearchSink(
index_prefix="myapp-logs", # Index name prefix
index_date_format="%Y.%m.%d", # Date suffix format (None to disable)
setup_index_template=True, # Create index template on setup
setup_ilm_policy=True, # Create ILM policy on setup
max_retries=3, # Retry attempts for failed writes
retry_delay=1.0, # Initial retry delay (exponential backoff)
)
# Custom ILM policy
policy = ILMPolicy(
rollover_max_age="1d",
rollover_max_size="10gb",
rollover_max_docs=1000000,
warm_after="7d",
delete_after="30d",
)
sink = ElasticsearchSink(
index_prefix="myapp-logs",
ilm_policy=policy,
)
# Custom index settings
sink = ElasticsearchSink(
index_prefix="myapp-logs",
custom_index_settings={
"refresh_interval": "30s",
"number_of_shards": 3,
"number_of_replicas": 2,
}
)
# Custom field mappings (replaces defaults - use with custom serializer)
sink = ElasticsearchSink(
index_prefix="myapp-logs",
custom_mappings={
"properties": {
"@timestamp": {"type": "date"},
"message": {"type": "text"},
"level": {"type": "keyword"},
"user_id": {"type": "keyword"},
"request_duration_ms": {"type": "float"},
}
}
)
# Using custom client
from elasticsearch import Elasticsearch
client = Elasticsearch(
["https://es.example.com:9200"],
api_key="your-api-key"
)
sink = ElasticsearchSink(client=client, index_prefix="myapp-logs")
Environment variables (used when client is not provided):
EEZY_ES_HOSTS(comma-separated, default:http://localhost:9200)EEZY_ES_USERNAMEEEZY_ES_PASSWORDEEZY_ES_API_KEYEEZY_ES_VERIFY_CERTS(default:true)
Supported versions: Elasticsearch 7.x, 8.x, and 9.x are supported. The library automatically detects the installed client version and uses the appropriate APIs.
Note: For Elastic Cloud deployments, create your own client with the
cloud_idparameter and pass it to the sink.
OpenSearchSink
For OpenSearch clusters. Supports Index State Management (ISM) for automatic index rollover and deletion.
from eezy_logging.sinks import OpenSearchSink, ISMPolicy
# Using environment variables or defaults
sink = OpenSearchSink(
index_prefix="myapp-logs", # Index name prefix
index_date_format="%Y.%m.%d", # Date suffix format (None to disable)
setup_index_template=True, # Create index template on setup
setup_ism_policy=True, # Create ISM policy on setup
max_retries=3, # Retry attempts for failed writes
retry_delay=1.0, # Initial retry delay (exponential backoff)
)
# Custom ISM policy - standard configuration
policy = ISMPolicy(
rollover_min_index_age="1d",
rollover_min_size="10gb",
rollover_min_doc_count=1000000,
warm_after="7d", # Move to warm state after 7 days
delete_after="30d", # Delete after 30 days
)
sink = OpenSearchSink(
index_prefix="myapp-logs",
ism_policy=policy,
)
# Skip warm phase (hot -> delete directly)
policy = ISMPolicy(
rollover_min_index_age="1d",
rollover_min_size="10gb",
warm_after=None, # Skip warm phase entirely
delete_after="30d",
)
sink = OpenSearchSink(
index_prefix="myapp-logs",
ism_policy=policy,
)
# Keep data forever (no deletion)
policy = ISMPolicy(
rollover_min_index_age="7d",
rollover_min_size="50gb",
warm_after="30d",
delete_after=None, # Keep data forever
)
sink = OpenSearchSink(
index_prefix="audit-logs",
ism_policy=policy,
)
# Advanced: Custom JSON policy for complete control
# Useful for multi-tier architectures (hot/warm/cold/delete)
policy = ISMPolicy(
policy_json={
"description": "Multi-tier policy",
"default_state": "hot",
"states": [
{
"name": "hot",
"actions": [{"rollover": {"min_index_age": "1d", "min_size": "5gb"}}],
"transitions": [{"state_name": "warm", "conditions": {"min_index_age": "3d"}}]
},
{
"name": "warm",
"actions": [
{"replica_count": {"number_of_replicas": 1}},
{"force_merge": {"max_num_segments": 1}}
],
"transitions": [{"state_name": "cold", "conditions": {"min_index_age": "7d"}}]
},
{
"name": "cold",
"actions": [
{"read_only": {}},
{"replica_count": {"number_of_replicas": 0}}
],
"transitions": [{"state_name": "delete", "conditions": {"min_index_age": "90d"}}]
},
{
"name": "delete",
"actions": [{"delete": {}}],
"transitions": []
}
]
# ism_template is added automatically if not present
}
)
sink = OpenSearchSink(
index_prefix="app-logs",
ism_policy=policy,
)
# Custom index settings
sink = OpenSearchSink(
index_prefix="myapp-logs",
custom_index_settings={
"refresh_interval": "30s",
"number_of_shards": 3,
"number_of_replicas": 2,
}
)
# Custom field mappings (replaces defaults - use with custom serializer)
sink = OpenSearchSink(
index_prefix="myapp-logs",
custom_mappings={
"properties": {
"@timestamp": {"type": "date"},
"message": {"type": "text"},
"level": {"type": "keyword"},
"user_id": {"type": "keyword"},
"request_duration_ms": {"type": "float"},
}
}
)
# Using custom client
from opensearchpy import OpenSearch
client = OpenSearch(
["https://opensearch.example.com:9200"],
http_auth=("user", "password")
)
sink = OpenSearchSink(client=client, index_prefix="myapp-logs")
ISM Policy Options:
- Standard lifecycle:
warm_after="7d"+delete_after="30d"(default: hot → warm → delete) - Skip warm phase:
warm_after=None+delete_after="30d"(hot → delete) - Keep forever:
delete_after=None(no deletion) - Custom JSON:
policy_json={...}for complete control (other attributes ignored)
Environment variables (used when client is not provided):
EEZY_OS_HOSTS(comma-separated, default:http://localhost:9200)EEZY_OS_USERNAMEEEZY_OS_PASSWORDEEZY_OS_VERIFY_CERTS(default:true)
Custom Sink
Implement the Sink interface to send logs anywhere:
from eezy_logging.sinks import Sink, WriteResult
class SlackSink(Sink):
def __init__(self, webhook_url: str, min_level: int = logging.ERROR):
self.webhook_url = webhook_url
self.min_level = min_level
def setup(self) -> None:
# Optional: called once before first write
pass
def write_batch(self, records: list[dict]) -> WriteResult:
import requests
failed = []
for record in records:
if record.get("levelno", 0) >= self.min_level:
try:
requests.post(self.webhook_url, json={
"text": f"[{record['level']}] {record['message']}"
})
except Exception:
failed.append(record)
if failed:
return WriteResult.failure(failed, "Failed to post to Slack")
return WriteResult.ok()
def close(self) -> None:
# Optional: cleanup resources
pass
Note:
write_batchshould returnWriteResult.ok()on success orWriteResult.failure(records, error)for records that should be retried. The worker handles retry scheduling with exponential backoff - sinks should NOT block on retries.
Custom Queue
Implement the Queue interface for custom queue backends:
from eezy_logging.queues import Queue
class MyQueue(Queue):
def put(self, record: dict) -> None:
# Add record to queue (non-blocking)
...
def get_batch(self, max_size: int, timeout: float) -> list[dict]:
# Get up to max_size records, block up to timeout seconds
...
def close(self) -> None:
# Cleanup resources
...
@property
def size(self) -> int:
# Return approximate queue size
...
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Application Code │
│ logger.info("message", extra={...}) │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ EezyHandler │
│ - Serializes LogRecord to dict │
│ - Pushes to Queue (non-blocking) │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Queue │
│ - InMemoryQueue (queue.Queue) │
│ - DequeQueue (collections.deque) │
│ - RedisQueue (Redis LIST) │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Thread │
│ - Consumes batches from queue │
│ - Respects batch_size and flush_interval │
│ - Non-blocking retry scheduling with exponential backoff │
│ - Drains queue on shutdown │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Sink │
│ - ElasticsearchSink (bulk API + ILM) │
│ - OpenSearchSink (bulk API + ISM) │
│ - Custom sinks │
└─────────────────────────────────────────────────────────────────┘
Overflow Behavior
When a queue reaches its max_size:
- InMemoryQueue / DequeQueue: Oldest records are dropped, a warning is logged every 100 drops
- RedisQueue: List is trimmed to
max_size, keeping newest records
Graceful Shutdown
Call handler.close() to ensure remaining logs are flushed:
import atexit
handler = EezyHandler(sink=sink)
atexit.register(handler.close) # Optional: auto-close on exit
Log Record Format
Each log record is serialized to a dictionary with a clean structure. Debugging context (hostname, line numbers, thread info, etc.) is grouped under metadata:
{
"@timestamp": "2026-01-06T12:00:00.000000+00:00",
"message": "User logged in",
"level": "INFO",
"logger": "myapp.auth",
"metadata": {
"hostname": "myapp-pod-abc123", # Useful for K8s/distributed systems
"levelno": 20,
"pathname": "/app/auth.py",
"filename": "auth.py",
"module": "auth",
"funcName": "login",
"lineno": 42,
"process": 12345,
"processName": "MainProcess",
"thread": 140234567890,
"threadName": "MainThread",
},
# Extra fields from logger.info(..., extra={...})
"user_id": 123,
"ip": "192.168.1.1",
# Exception info (if present)
"exc_info": "Traceback (most recent call last):...",
}
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eezy_logging-0.1.3.tar.gz.
File metadata
- Download URL: eezy_logging-0.1.3.tar.gz
- Upload date:
- Size: 79.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4efa2337f0d95115ea4eff309112ac493d180b8c86d2c0fd560d6f7c8534948a
|
|
| MD5 |
45e2fc10acee39807e6344ded91c993d
|
|
| BLAKE2b-256 |
dbfad0a06bf96498812712b926ffb1a1deb5e29cab975462287c9a57e00da87c
|
File details
Details for the file eezy_logging-0.1.3-py3-none-any.whl.
File metadata
- Download URL: eezy_logging-0.1.3-py3-none-any.whl
- Upload date:
- Size: 30.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6381c53e88de9279870edd770ca47ce5b7a045efc654edb2646235762071400a
|
|
| MD5 |
770fdda4f4195b7baaf021b6f68df75b
|
|
| BLAKE2b-256 |
c69fb030f6895016dea4cd0c26a443fdf6e0188b5954336d7b7a543d944246bc
|