Skip to main content

The Python Logger for all KiLM's Project

Project description

KiLM Logger library

PyPI version

The KiLM Logger library provides convenient log to for metrics, monitoring, and structure log.

Installation

pip install kilmlogger

Usage

Use the KiLM Logger's Default Configuration

import os

from kilmlogger import KilmLoggerConfiguration


LOG_PATH = os.getenv("LOG_PATH")
APP_NAME = os.getenv("APP_NAME")

# Logging's level that will be sent to scribe
SCRIBE_LOGGING_LEVEL

# Logging's level that will show only console
CONSOLE_LOGGING_LEVEL

# Accepted levels that will actually sent metrics to
ACCEPTED_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]


# Setup your file to store logging records
def get_logging_file_path() -> str:
    log_file_name: str = f"{APP_NAME}.log"
    log_path: str = os.path.join(LOG_PATH, APP_NAME)
    if not os.path.exists(log_path):
        os.makedirs(log_path)
    return os.path.join(log_path, log_file_name)


# Setup logging level, file, scribe level,...
def use_default_logging():
    kilm_logger_config: KilmLoggerConfiguration = KilmLoggerConfiguration(
        logging_filename=get_logging_file_path(),
        logging_level=SCRIBE_LOGGING_LEVEL,
        console_logging_level=CONSOLE_LOGGING_LEVEL,
        accepted_levels=ACCEPTED_LEVELS,
        # use_async=True,
    )
    kilm_logger_config.use_default_configuration()

Send metrics to Scribe

import functools
import kilmlogger as logging

from enum import Enum
from typing import Awaitable
from datetime import datetime as dt


logger = logging.get_logger("kilmlogger")

# Scribe category
SCRIBE_LOGGING_CATEGORY


class MetricCommandEnum(int, Enum):
    METRIC_COMMAND = 1111
    ...



class MetricSubCommandEnum(int, Enum):
    METRIC_SUB_COMMAND = 0
    ...


class MetricResultEnum(int, Enum):
    METRIC_SUCCESS = 0
    METRIC_FAILURE = 1
    ...


def async_send_metric(func: Awaitable):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        command: MetricCommandEnum = kwargs.get("command")
        category: str = kwargs.get("category", SCRIBE_LOGGING_CATEGORY)
        sub_command: MetricSubCommandEnum = kwargs.get("sub_command")
        try:
            start_time: float = int(dt.now().timestamp() * 1000)
            result: MetricResultEnum = MetricResultEnum.SUCCESS
            return await func(*args, **kwargs)
        except Exception as e:
            result = MetricResultEnum.FAILURE
            logger.error(
                f"Error when executing function {func.__name__}",
            )
            raise e
        finally:
            logger.debug(
                f"Finish executing {command.name}",
                start_time=start_time,
                category=category,
                command=command,
                sub_command=sub_command,
                result=result,
                metric_only=True,
            )

    return wrapper


def async_metric_sender(
    command: MetricCommandEnum,
    category: str = SCRIBE_LOGGING_CATEGORY,
    sub_command: MetricSubCommandEnum = MetricSubCommandEnum.DEFAULT,
):
    def decorator(func: Awaitable):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                start_time: float = int(dt.utcnow().timestamp() * 1000)
                result: MetricResultEnum = MetricResultEnum.SUCCESS
                return await func(*args, **kwargs)
            except BaseApplicationException as e:
                raise e
            except Exception as e:
                result = MetricResultEnum.FAILURE
                logger.error(
                    f"Error when executing function {func.__name__}",
                )
                raise e
            finally:
                logger.debug(
                    f"Finish executing {command.name}",
                    start_time=start_time,
                    category=category,
                    command=command,
                    sub_command=sub_command,
                    result=result,
                    metric_only=True,
                )

        return wrapper

    return decorator


def sync_metric_sender(
    command: MetricCommandEnum,
    category: str = SCRIBE_LOGGING_CATEGORY,
    sub_command: MetricSubCommandEnum = MetricSubCommandEnum.DEFAULT,
):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            try:
                start_time: float = int(dt.utcnow().timestamp() * 1000)
                result: MetricResultEnum = MetricResultEnum.SUCCESS
                return func(*args, **kwargs)
            except BaseApplicationException as e:
                raise e
            except Exception as e:
                result = MetricResultEnum.FAILURE
                logger.error(
                    f"Error when executing function {func.__name__}",
                    correlation_id="",
                )
                raise e
            finally:
                logger.debug(
                    f"Finish executing {command.name}",
                    start_time=start_time,
                    category=category,
                    command=command,
                    sub_command=sub_command,
                    result=result,
                    metric_only=True,
                    correlation_id="",
                )

        return wrapper

    return decorator

Examples:

import asyncio
import kilmlogger as logging

from httpx import AsyncClient, Response, Timeout, TimeoutException

from app.utils.metric import async_send_metric


logger = logging.get_logger("kilmlogger")


@async_send_metric
async def make_request(
    *,
    client: AsyncClient,
    url: str,
    method: str = "GET",
    data: dict | None = None,
    json: dict | None = None,
    params: dict | None = None,
    headers: dict | None = None,
    timeout: float | None = 5.0,
    connect_timeout: float | None = 5.0,
    **kwargs,
) -> Response:
    try:
        return await client.request(
            method=method,
            url=url,
            data=data,
            json=json,
            params=params,
            headers=headers,
            timeout=Timeout(timeout=timeout, connect=connect_timeout),
        )
    except TimeoutException as e:
        logger.error(f"Timeout when request {url} with error: {type(e)}")
        raise e
    except Exception as e:
        logger.error(f"Error when request {url} with error: {e}")
        raise e


asyncio.run(
    make_request(
        ...
        command=MetricCommandEnum.RETRIEVE_DOC,
    )
)

This example use decorator as a middle layer that will wrap the method make_request() to sent the metrics to the Scribe Log

We also need some environments config:

ENVIRONMENT=staging
SCRIBE_HOST=kiki-scribelog-forward-grpc-headless-svc.kiki-infras
SCRIBE_PORT=9080
# DP Cate Name
DP_CATEGORY=KILM_EVENT_LOG
# To push log to Central Log (Data Platform Cate)
# The order needs to be the same as the DP Cate order
DP_LOG=app_name,app_mode,event_category,correlation_id,message,metrics,extra_data

or ArgoCD env:

...
  - name: ENVIRONMENT
    value: staging
  - name: SCRIBE_HOST
    value: "kiki-scribelog-forward-grpc-headless-svc.kiki-infras"
  - name: SCRIBE_PORT
    value: "9080"
  - name: DP_CATEGORY
    value: "KILM_EVENT_LOG"
  - name: DP_LOG
    value: "app_name,app_mode,event_category,correlation_id,message,metrics,extra_data"

Requirements

Python 3.12 or higher.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kilmlogger-0.0.30.tar.gz (11.8 kB view details)

Uploaded Source

Built Distribution

kilmlogger-0.0.30-py3-none-any.whl (14.8 kB view details)

Uploaded Python 3

File details

Details for the file kilmlogger-0.0.30.tar.gz.

File metadata

  • Download URL: kilmlogger-0.0.30.tar.gz
  • Upload date:
  • Size: 11.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for kilmlogger-0.0.30.tar.gz
Algorithm Hash digest
SHA256 c1893671d49b5b89b039202d9da93c2505874f8152b03f6095da236b198e9347
MD5 9e437a8d892882a7c82b87287a1cecc7
BLAKE2b-256 c7cbbbfca727b181b4157f6a1c6468cab4d8945cc66440a071f137bd93034447

See more details on using hashes here.

File details

Details for the file kilmlogger-0.0.30-py3-none-any.whl.

File metadata

  • Download URL: kilmlogger-0.0.30-py3-none-any.whl
  • Upload date:
  • Size: 14.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for kilmlogger-0.0.30-py3-none-any.whl
Algorithm Hash digest
SHA256 3189dbe2d5cb4eb0dd61cff18ae9a8aa854461f518a3be8e64025e7dadcdfab3
MD5 2af974a3e336ba3403cdd33cefda3e39
BLAKE2b-256 37b0ff26b6eac9f07236e1878af1f1ec504e83bcbfdf082e3bfd0911a72731c0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page