Official Python SDK for LogTide - Self-hosted log management with batching, retry logic, circuit breaker, query API, and middleware support
Project description
LogTide Python SDK
Official Python SDK for LogTide with advanced features: automatic batching, retry logic, circuit breaker, query API, live streaming, and middleware support.
Features
- ✅ Automatic batching with configurable size and interval
- ✅ Retry logic with exponential backoff
- ✅ Circuit breaker pattern for fault tolerance
- ✅ Max buffer size with drop policy to prevent memory leaks
- ✅ Query API for searching and filtering logs
- ✅ Live tail with Server-Sent Events (SSE)
- ✅ Trace ID context for distributed tracing
- ✅ Global metadata added to all logs
- ✅ Structured error serialization
- ✅ Internal metrics (logs sent, errors, latency, etc.)
- ✅ Flask, Django & FastAPI middleware for auto-logging HTTP requests
- ✅ Full Python 3.8+ support with type hints
Requirements
- Python 3.8 or higher
- pip or poetry
Installation
pip install logtide-sdk
Optional dependencies
# For async support
pip install logtide-sdk[async]
# For Flask middleware
pip install logtide-sdk[flask]
# For Django middleware
pip install logtide-sdk[django]
# For FastAPI middleware
pip install logtide-sdk[fastapi]
# Install all extras
pip install logtide-sdk[async,flask,django,fastapi]
Quick Start
from logtide_sdk import LogTideClient, ClientOptions
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
# Send logs
client.info('api-gateway', 'Server started', {'port': 3000})
client.error('database', 'Connection failed', Exception('Timeout'))
# Graceful shutdown (automatic via atexit, but can be called manually)
client.close()
Configuration Options
Basic Options
| Option | Type | Default | Description |
|---|---|---|---|
api_url |
str |
required | Base URL of your LogTide instance |
api_key |
str |
required | Project API key (starts with lp_) |
batch_size |
int |
100 |
Number of logs to batch before sending |
flush_interval |
int |
5000 |
Interval in ms to auto-flush logs |
Advanced Options
| Option | Type | Default | Description |
|---|---|---|---|
max_buffer_size |
int |
10000 |
Max logs in buffer (prevents memory leak) |
max_retries |
int |
3 |
Max retry attempts on failure |
retry_delay_ms |
int |
1000 |
Initial retry delay (exponential backoff) |
circuit_breaker_threshold |
int |
5 |
Failures before opening circuit |
circuit_breaker_reset_ms |
int |
30000 |
Time before retrying after circuit opens |
enable_metrics |
bool |
True |
Track internal metrics |
debug |
bool |
False |
Enable debug logging to console |
global_metadata |
dict |
{} |
Metadata added to all logs |
auto_trace_id |
bool |
False |
Auto-generate trace IDs for logs |
Example: Full Configuration
import os
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
# Batching
batch_size=100,
flush_interval=5000,
# Buffer management
max_buffer_size=10000,
# Retry with exponential backoff (1s → 2s → 4s)
max_retries=3,
retry_delay_ms=1000,
# Circuit breaker
circuit_breaker_threshold=5,
circuit_breaker_reset_ms=30000,
# Metrics & debugging
enable_metrics=True,
debug=True,
# Global context
global_metadata={
'env': os.getenv('APP_ENV'),
'version': '1.0.0',
'hostname': os.uname().nodename,
},
# Auto trace IDs
auto_trace_id=False,
)
)
Logging Methods
Basic Logging
from logtide_sdk import LogLevel
client.debug('service-name', 'Debug message')
client.info('service-name', 'Info message', {'userId': 123})
client.warn('service-name', 'Warning message')
client.error('service-name', 'Error message', {'custom': 'data'})
client.critical('service-name', 'Critical message')
Error Logging with Auto-Serialization
The SDK automatically serializes Exception objects:
try:
raise RuntimeError('Database timeout')
except Exception as e:
# Automatically serializes error with stack trace
client.error('database', 'Query failed', e)
Generated log metadata:
{
"error": {
"name": "RuntimeError",
"message": "Database timeout",
"stack": "Traceback (most recent call last):\n ..."
}
}
Trace ID Context
Track requests across services with trace IDs.
Manual Trace ID
client.set_trace_id('request-123')
client.info('api', 'Request received')
client.info('database', 'Querying users')
client.info('api', 'Response sent')
client.set_trace_id(None) # Clear context
Scoped Trace ID (Context Manager)
with client.with_trace_id('request-456'):
client.info('api', 'Processing in context')
client.warn('cache', 'Cache miss')
# Trace ID automatically restored after context
Auto-Generated Trace ID
import uuid
with client.with_new_trace_id():
client.info('worker', 'Background job started')
client.info('worker', 'Job completed')
Query API
Search and retrieve logs programmatically.
Basic Query
from datetime import datetime, timedelta
from logtide_sdk import QueryOptions, LogLevel
result = client.query(
QueryOptions(
service='api-gateway',
level=LogLevel.ERROR,
from_time=datetime.now() - timedelta(hours=24),
to_time=datetime.now(),
limit=100,
offset=0,
)
)
print(f"Found {result.total} logs")
for log in result.logs:
print(log)
Full-Text Search
result = client.query(QueryOptions(q='timeout', limit=50))
Get Logs by Trace ID
logs = client.get_by_trace_id('trace-123')
print(f"Trace has {len(logs)} logs")
Aggregated Statistics
from datetime import datetime, timedelta
from logtide_sdk import AggregatedStatsOptions
stats = client.get_aggregated_stats(
AggregatedStatsOptions(
from_time=datetime.now() - timedelta(days=7),
to_time=datetime.now(),
interval='1h',
)
)
for service in stats.top_services:
print(f"{service['service']}: {service['count']} logs")
Live Streaming (SSE)
Stream logs in real-time using Server-Sent Events.
def handle_log(log):
print(f"[{log['time']}] {log['level']}: {log['message']}")
def handle_error(error):
print(f"Stream error: {error}")
client.stream(
on_log=handle_log,
on_error=handle_error,
filters={
'service': 'api-gateway',
'level': 'error',
}
)
# Note: This blocks. Run in separate thread for production.
Metrics
Track SDK performance and health.
metrics = client.get_metrics()
print(f"Logs sent: {metrics.logs_sent}")
print(f"Logs dropped: {metrics.logs_dropped}")
print(f"Errors: {metrics.errors}")
print(f"Retries: {metrics.retries}")
print(f"Avg latency: {metrics.avg_latency_ms}ms")
print(f"Circuit breaker trips: {metrics.circuit_breaker_trips}")
# Get circuit breaker state
print(client.get_circuit_breaker_state()) # CLOSED|OPEN|HALF_OPEN
# Reset metrics
client.reset_metrics()
Middleware
Flask Middleware
Auto-log all HTTP requests and responses.
from flask import Flask
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFlaskMiddleware
app = Flask(__name__)
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
LogTideFlaskMiddleware(
app,
client=client,
service_name='flask-api',
log_requests=True,
log_responses=True,
skip_paths=['/metrics'],
)
Logged automatically:
- Request:
GET /api/users - Response:
GET /api/users 200 (45ms) - Errors:
Request error: Internal Server Error
Django Middleware
# settings.py
MIDDLEWARE = [
'logtide_sdk.middleware.LogTideDjangoMiddleware',
]
from logtide_sdk import LogTideClient, ClientOptions
LOGTIDE_CLIENT = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
LOGTIDE_SERVICE_NAME = 'django-api'
FastAPI Middleware
from fastapi import FastAPI
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFastAPIMiddleware
app = FastAPI()
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
app.add_middleware(
LogTideFastAPIMiddleware,
client=client,
service_name='fastapi-api',
)
Examples
See the examples/ directory for complete working examples:
- basic.py - Simple usage
- advanced.py - All advanced features
- flask_example.py - Flask integration
- fastapi_example.py - FastAPI integration
API Reference
LogTideClient
Constructor
client = LogTideClient(options: ClientOptions)
Logging Methods
log(entry: LogEntry) -> Nonedebug(service: str, message: str, metadata: dict = None) -> Noneinfo(service: str, message: str, metadata: dict = None) -> Nonewarn(service: str, message: str, metadata: dict = None) -> Noneerror(service: str, message: str, metadata_or_error: dict | Exception = None) -> Nonecritical(service: str, message: str, metadata_or_error: dict | Exception = None) -> None
Context Methods
set_trace_id(trace_id: str | None) -> Noneget_trace_id() -> str | Nonewith_trace_id(trace_id: str)→ context managerwith_new_trace_id()→ context manager
Query Methods
query(options: QueryOptions) -> LogsResponseget_by_trace_id(trace_id: str) -> list[dict]get_aggregated_stats(options: AggregatedStatsOptions) -> AggregatedStatsResponse
Streaming
stream(on_log: callable, on_error: callable = None, filters: dict = None) -> None
Metrics
get_metrics() -> ClientMetricsreset_metrics() -> Noneget_circuit_breaker_state() -> CircuitState
Lifecycle
flush() -> Noneclose() -> None
Best Practices
1. Always Close on Shutdown
import atexit
# Automatic cleanup (already registered by client)
# Or manually:
atexit.register(client.close)
2. Use Global Metadata
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
global_metadata={
'env': os.getenv('ENV'),
'version': '1.0.0',
'region': 'us-east-1',
},
)
)
3. Enable Debug Mode in Development
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
debug=os.getenv('ENV') == 'development',
)
)
4. Monitor Metrics in Production
import time
import threading
def monitor_metrics():
while True:
metrics = client.get_metrics()
if metrics.logs_dropped > 0:
print(f"⚠️ Logs dropped: {metrics.logs_dropped}")
if metrics.circuit_breaker_trips > 0:
print("🔴 Circuit breaker is OPEN!")
time.sleep(60)
# Run in background thread
monitor_thread = threading.Thread(target=monitor_metrics, daemon=True)
monitor_thread.start()
Development
Setup
# Clone repository
git clone https://github.com/logtide-dev/python-sdk.git
cd logtide-sdk-python
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dev dependencies
pip install -e ".[dev]"
Testing
# Run tests
pytest tests/
# Type checking
mypy logtide_sdk/
# Code formatting
black logtide_sdk/ tests/ examples/
# Linting
ruff check logtide_sdk/
License
MIT
Contributing
Contributions are welcome! Please open an issue or PR on GitHub.
Support
- Documentation: https://logtide.dev/docs
- Issues: GitHub Issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file logtide_sdk-0.1.1.tar.gz.
File metadata
- Download URL: logtide_sdk-0.1.1.tar.gz
- Upload date:
- Size: 21.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98770a4e155b6466eaa7e66ed7c6b426e1376b11fd1cf17d88f4dac16c8ecd96
|
|
| MD5 |
bcfe1ae378bd713ad0e52a6cc7ed45a0
|
|
| BLAKE2b-256 |
8421ca148252325430290a0979ac5a788742c249a1c0e338ec0003e6be7d86c7
|
File details
Details for the file logtide_sdk-0.1.1-py3-none-any.whl.
File metadata
- Download URL: logtide_sdk-0.1.1-py3-none-any.whl
- Upload date:
- Size: 19.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9479dd626f5ca1a7c8f46d13c1aee27be349d1ed7eb478d9f6caeb425eb82c2d
|
|
| MD5 |
051df6974a197713aef8c748cce6e422
|
|
| BLAKE2b-256 |
1c24726e134c2224b93e47b0b46a971dc8fbeef5af2a5807e22889c582f8c733
|