Skip to main content

Asynchronous logger for high performance applications that incorporates multiple observers, logs in batches, and includes automatic retries and fallback if services are not available.

Project description

Async ITS Logger

This logger is an easy-to-implement asynchronous logging library that uses the Observer Pattern to enable your application to log in multiple ways. It is designed to work in both asynchronous and synchronous environments. With ITS Logger you can log to several destinations such as Console, Local Files, AWS S3, MongoDB, Redis, and Datadog—all configurable via a YAML configuration file.

Features

  • Asynchronous Logging: Efficient logging that runs in the background without blocking your application.
  • Observer Pattern: Supports multiple logging destinations (observers) simultaneously.
  • Flexible Configuration: Easily configure log batching, retry delays, and more using a simple YAML file.
  • Dual Environment Support: Works with both asynchronous and synchronous applications.
  • Extensible: Add or remove logging observers as needed.

Installation

From PyPI

Once ITS Logger is published on PyPI, install it with pip:

pip install its_logger

From Source

Clone the repository:

bash

git clone https://github.com/yourusername/its_logger.git
cd its_logger

Install using pip:

pip install .

Requirements

Python 3.7 or higher

Configuration

ITS Logger is configured using a YAML file (e.g., config.yaml). Below is an example configuration file with placeholder values. Replace the placeholders with your own configuration details.

logging:
  enable_console: false
  enable_local_file: false
  enable_datadog: true
  enable_s3: false
  enable_mongodb: false
  enable_redis: false

  batch_interval: 1           # Interval in seconds to send logs
  batch_size: 10              # Maximum number of logs to send in one batch
  retry_delay: 1              # Initial retry delay (in seconds)
  max_retries: 5              # Maximum number of retries
  max_wait_time: 10           # Maximum seconds logs can wait before being flushed

  log_level: "INFO"
  local_log_path: "./logs"

datadog:
  api_key: "<YOUR_DATADOG_API_KEY>"
  app_key: "<YOUR_DATADOG_APP_KEY>"
  endpoint: "https://http-intake.logs.<YOUR_REGION>.datadoghq.com"

aws:
  s3_bucket_name: "<YOUR_S3_BUCKET_NAME>"
  s3_prefix: "logs"

mongodb:
  uri: "mongodb://localhost:27017"
  database: "logging"
  collection: "logs"
  execution_times_collection: "execution_times"

redis:
  host: "localhost"
  port: 6379
  db: 0
  redis_list: "log_entries"
  execution_time_key: "execution_times"

Note:

  • For Datadog, make sure you use the appropriate values for your account and region.
  • You can enable or disable individual observers by setting their flags to true or false under the logging section.

Usage Example

Below is a comprehensive example demonstrating how to use ITS Logger in your application:

import time
import yaml
import asyncio
from its_logger.async_its_logger import AsyncITSLogger
from its_logger.observers.mongodb_observer import MongoDBObserver
from its_logger.observers.redis_observer import RedisObserver
from its_logger.observers.datadog_observer import DatadogObserver
from its_logger.observers.s3_observer import S3Observer
from its_logger.observers.local_file_observer import LocalFileObserver
from its_logger.observers.console_observer import ConsoleObserver

# Load configuration
with open("config.yaml", "r") as file:
    config = yaml.safe_load(file)

# Initialize logger (which creates its own event loop in a background thread)
logger = AsyncITSLogger()

# Register enabled observers dynamically based on the config
if config["logging"]["enable_datadog"]:
    print("Datadog observer enabled.")
    datadog_observer = DatadogObserver()
    logger.register_observer(datadog_observer)

if config["logging"]["enable_s3"]:
    print("S3 observer enabled.")
    s3_observer = S3Observer()
    logger.register_observer(s3_observer)

if config["logging"]["enable_local_file"]:
    print("Local file observer enabled.")
    local_file_observer = LocalFileObserver()
    logger.register_observer(local_file_observer)

if config["logging"]["enable_console"]:
    print("Console observer enabled.")
    console_observer = ConsoleObserver()
    logger.register_observer(console_observer)

if config["logging"]["enable_mongodb"]:
    print("MongoDB observer enabled.")
    mongodb_observer = MongoDBObserver()
    logger.register_observer(mongodb_observer)
    # Start auto-flush for MongoDB observer using the logger's loop
    asyncio.run_coroutine_threadsafe(mongodb_observer.auto_flush(), logger.loop)

if config["logging"]["enable_redis"]:
    print("Redis observer enabled.")
    redis_observer = RedisObserver()
    logger.register_observer(redis_observer)
    # Start auto-flush for Redis observer using the logger's loop
    asyncio.run_coroutine_threadsafe(redis_observer.auto_flush(), logger.loop)

# Example usage: simulate a request processing function
def process_request():
    task_id = logger.generate_task_id()
    print(f"Generated Task ID: {task_id}")

    start_time = time.time()
    logger.log("MyApp", "RequestHandler", task_id=task_id, message="Request started.")

    try:
        for i in range(3):
            logger.log("MyApp", f"ProcessingStep{i+1}", task_id=task_id, message=f"Processing step {i+1}...")
            time.sleep(1)  # Simulate processing time
        raise Exception("Something went wrong!")
    except Exception as e:
        logger.log(
            "MyApp",
            "ErrorHandler",
            task_id=task_id,
            status="ERROR",
            message="An error occurred while processing the request",
            exception=e
        )

    end_time = time.time()
    execution_time = round(end_time - start_time, 2)
    logger.log(
        "MyApp",
        "RequestHandler",
        task_id=task_id,
        status="INFO",
        message=f"Request completed in {execution_time} seconds."
    )

# Run simulated requests
for _ in range(1):
    process_request()
    time.sleep(2)

print("Main application finished.")

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch for your feature or bugfix.
  3. Make your changes and commit them with clear messages.
  4. Submit a pull request describing your changes.

License

This project is licensed under the MIT License.

Contact

For questions or support, please contact engineering@inmediatum.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_its_logger-0.1.1.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_its_logger-0.1.1-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file async_its_logger-0.1.1.tar.gz.

File metadata

  • Download URL: async_its_logger-0.1.1.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.6

File hashes

Hashes for async_its_logger-0.1.1.tar.gz
Algorithm Hash digest
SHA256 3d70a78e8dd93512524748eed30102581759f83cd893f2e27a7ffe29c217e96c
MD5 59fd96df984fd431225c98016bf24bd1
BLAKE2b-256 c5a178405cb0b9ace96f235ea5cb365a88a1e78d5446b467bfef054d315772cb

See more details on using hashes here.

File details

Details for the file async_its_logger-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for async_its_logger-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d7c4ccbd84bf893e48438a111394044c4083e11273fc760a1060a82ff8481240
MD5 c5af460f1f18e1c8b4c43404a57d4dee
BLAKE2b-256 da05d42a26687fa2366abce2d8f0cf390fef341a2157c42b043635ae9d6a200a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page