Skip to main content

Asynchronous logger for high performance applications that incorporates multiple observers, logs in batches, and includes automatic retries and fallback if services are not available.

Project description

Async ITS Logger

This logger is an easy-to-implement asynchronous logging library that uses the Observer Pattern to enable your application to log in multiple ways. It is designed to work in both asynchronous and synchronous environments. With ITS Logger you can log to several destinations such as Console, Local Files, AWS S3, MongoDB, Redis, and Datadog—all configurable via a YAML configuration file.

Features

  • Asynchronous Logging: Efficient logging that runs in the background without blocking your application.
  • Observer Pattern: Supports multiple logging destinations (observers) simultaneously.
  • Flexible Configuration: Easily configure log batching, retry delays, and more using a simple YAML file.
  • Dual Environment Support: Works with both asynchronous and synchronous applications.
  • Extensible: Add or remove logging observers as needed.

Installation

From PyPI

Once ITS Logger is published on PyPI, install it with pip:

pip install its_logger

From Source

Clone the repository:

bash

git clone https://github.com/yourusername/its_logger.git
cd its_logger

Install using pip:

pip install .

Requirements

Python 3.7 or higher

Configuration

ITS Logger is configured using a YAML file (e.g., config.yaml). Below is an example configuration file with placeholder values. Replace the placeholders with your own configuration details.

logging:
  enable_console: false
  enable_local_file: false
  enable_datadog: true
  enable_s3: false
  enable_mongodb: false
  enable_redis: false

  batch_interval: 1           # Interval in seconds to send logs
  batch_size: 10              # Maximum number of logs to send in one batch
  retry_delay: 1              # Initial retry delay (in seconds)
  max_retries: 5              # Maximum number of retries
  max_wait_time: 10           # Maximum seconds logs can wait before being flushed

  log_level: "INFO"
  local_log_path: "./logs"

datadog:
  api_key: "<YOUR_DATADOG_API_KEY>"
  app_key: "<YOUR_DATADOG_APP_KEY>"
  endpoint: "https://http-intake.logs.<YOUR_REGION>.datadoghq.com"

aws:
  s3_bucket_name: "<YOUR_S3_BUCKET_NAME>"
  s3_prefix: "logs"

mongodb:
  uri: "mongodb://localhost:27017"
  database: "logging"
  collection: "logs"
  execution_times_collection: "execution_times"

redis:
  host: "localhost"
  port: 6379
  db: 0
  redis_list: "log_entries"
  execution_time_key: "execution_times"

Note:

  • For Datadog, make sure you use the appropriate values for your account and region.
  • You can enable or disable individual observers by setting their flags to true or false under the logging section.

Usage Example

Below is a comprehensive example demonstrating how to use ITS Logger in your application:

import time
import yaml
import asyncio
from its_logger.async_its_logger import AsyncITSLogger
from its_logger.observers.mongodb_observer import MongoDBObserver
from its_logger.observers.redis_observer import RedisObserver
from its_logger.observers.datadog_observer import DatadogObserver
from its_logger.observers.s3_observer import S3Observer
from its_logger.observers.local_file_observer import LocalFileObserver
from its_logger.observers.console_observer import ConsoleObserver

# Load configuration
with open("config.yaml", "r") as file:
    config = yaml.safe_load(file)

# Initialize logger (which creates its own event loop in a background thread)
logger = AsyncITSLogger()

# Register enabled observers dynamically based on the config
if config["logging"]["enable_datadog"]:
    print("Datadog observer enabled.")
    datadog_observer = DatadogObserver()
    logger.register_observer(datadog_observer)

if config["logging"]["enable_s3"]:
    print("S3 observer enabled.")
    s3_observer = S3Observer()
    logger.register_observer(s3_observer)

if config["logging"]["enable_local_file"]:
    print("Local file observer enabled.")
    local_file_observer = LocalFileObserver()
    logger.register_observer(local_file_observer)

if config["logging"]["enable_console"]:
    print("Console observer enabled.")
    console_observer = ConsoleObserver()
    logger.register_observer(console_observer)

if config["logging"]["enable_mongodb"]:
    print("MongoDB observer enabled.")
    mongodb_observer = MongoDBObserver()
    logger.register_observer(mongodb_observer)
    # Start auto-flush for MongoDB observer using the logger's loop
    asyncio.run_coroutine_threadsafe(mongodb_observer.auto_flush(), logger.loop)

if config["logging"]["enable_redis"]:
    print("Redis observer enabled.")
    redis_observer = RedisObserver()
    logger.register_observer(redis_observer)
    # Start auto-flush for Redis observer using the logger's loop
    asyncio.run_coroutine_threadsafe(redis_observer.auto_flush(), logger.loop)

# Example usage: simulate a request processing function
def process_request():
    task_id = logger.generate_task_id()
    print(f"Generated Task ID: {task_id}")

    start_time = time.time()
    logger.log("MyApp", "RequestHandler", task_id=task_id, message="Request started.")

    try:
        for i in range(3):
            logger.log("MyApp", f"ProcessingStep{i+1}", task_id=task_id, message=f"Processing step {i+1}...")
            time.sleep(1)  # Simulate processing time
        raise Exception("Something went wrong!")
    except Exception as e:
        logger.log(
            "MyApp",
            "ErrorHandler",
            task_id=task_id,
            status="ERROR",
            message="An error occurred while processing the request",
            exception=e
        )

    end_time = time.time()
    execution_time = round(end_time - start_time, 2)
    logger.log(
        "MyApp",
        "RequestHandler",
        task_id=task_id,
        status="INFO",
        message=f"Request completed in {execution_time} seconds."
    )

# Run simulated requests
for _ in range(1):
    process_request()
    time.sleep(2)

print("Main application finished.")

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch for your feature or bugfix.
  3. Make your changes and commit them with clear messages.
  4. Submit a pull request describing your changes.

License

This project is licensed under the MIT License.

Contact

For questions or support, please contact engineering@inmediatum.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_its_logger-0.1.0.tar.gz (12.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_its_logger-0.1.0-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file async_its_logger-0.1.0.tar.gz.

File metadata

  • Download URL: async_its_logger-0.1.0.tar.gz
  • Upload date:
  • Size: 12.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.6

File hashes

Hashes for async_its_logger-0.1.0.tar.gz
Algorithm Hash digest
SHA256 daa08ade9f497666addc8ac5dd9fc424dbf42752bd775793557d9eccebf2fba4
MD5 e7aeb625a5d29e48a21f7591d4d0f8df
BLAKE2b-256 dd24ab957d17b5b383bc0d4c1d9109fe9eefa1fd7c7220f3c358fbe9ecf1451b

See more details on using hashes here.

File details

Details for the file async_its_logger-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for async_its_logger-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f0132777ebe08cf533fc6f7b2e83aa79d7d988338f78ccb0614499b411e30801
MD5 2d0ed09b688f1de69f9ec3404565a54f
BLAKE2b-256 6f8554c8ba3dc953ce19d3e2b56da734b89c4d3a6941aafd352d284cf6d064d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page