Skip to main content

Tremors is a library for logging while collecting metrics.

Project description

Tremors is a library for logging while collecting metrics. Tremors loggers are drop-in replacements for standard loggers. But Tremors loggers have metrics collectors that run when messages are logged. The loggers are also context managers. The library maintains a hierarchy of nested contexts, where all logs and metrics are grouped together. You can create a new hierarchy at anytime to group related logs.

Installation

pip install tremors

Usage

A function can be wrapped in a logger context with the logged decorator. If you call the function without a logger argument, one will automatically be injected into it.

import logging

import tremors
from tremors import collector


@tremors.logged
def fn(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("hello")


logging.basicConfig(
    format="Tremors > %(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
)
fn()

The context automatically logs entered, and exited messages before, and after each function call. The logger uses the configured standard root logger by default to log the messages.

Tremors > INFO:root:entered: fn
Tremors > INFO:root:hello
Tremors > INFO:root:exited: fn

You may specify a standard logger by name for the Tremors logger to use as its underlying logger.

@tremors.logged(logger_name=__name__)
def fn2(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("hello")


fn2()

The messages are logged by the specified underlying logger. Based on our standard logging configuration, the messages propagate from the underlying logger to the standard root logger, which emits them.

Tremors > INFO:__main__:entered: fn2
Tremors > INFO:__main__:hello
Tremors > INFO:__main__:exited: fn2

Next let’s use a collector to measure the elapsed time since the function started each time a message is logged. When a message is logged, the logger runs the collector, and adds its updated state to the message’s LogRecord. We use a standard logging filter to inspect and modify the record before it is emitted. We format the collector state, then add the formatted state to the elapsed custom attribute of the record. Finally, we configure the root logger’s formatter to incorporate the elapsed attribute.

import copy
import time


def flt(record: logging.LogRecord) -> logging.LogRecord:
    record = copy.copy(record)
    elapsed = collector.elapsed.formatter(record)
    record.elapsed = f"{elapsed} " if elapsed else ""
    return record


@tremors.logged(collector.elapsed.factory())
def fn3(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("sleeping for 1s...")
    time.sleep(1)


logging.basicConfig(
    format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn3()

The messages contain elapsed information, according to the formatter configuration, that is sourced from the record’s elapsed custom attribute.

0.000 INFO:root:entered: fn3
0.000 INFO:root:sleeping for 1s...
1.000 INFO:root:exited: fn3

A Logger can have any number of collectors. Here, in addition to the elapsed collector from the previous example, we add a counter collector. A collector has a level, and will only run if the message is being logged at that level or higher. Our counter level is ERROR. We can also control which custom record attribute has the formatted collector state via the collector’s name. This is useful if you have multiple of the same collector on a single logger. Here, we name the counter errors, so record.errors will contain a formatted string with the running total number of errors that have been logged by a single function call. Finally, we an control the format of the counter state via the fmt argument of the counter’s formatter.

def flt2(record: logging.LogRecord) -> logging.LogRecord:
    record = copy.copy(record)
    errors = collector.counter.formatter(
        record, name="errors", fmt="errors={counter}"
    )
    record.errors = f"{errors} " if errors else ""
    elapsed = collector.elapsed.formatter(record)
    record.elapsed = f"{elapsed} " if elapsed else ""
    return record


@tremors.logged(
    collector.elapsed.factory(),
    collector.counter.factory(name="errors", level=logging.ERROR),
)
def fn4(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("hello")
    time.sleep(1)
    logger.error("uh-ho!")


logging.basicConfig(
    format="%(elapsed)s%(errors)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt2)
fn4()

The messages contain information from both collectors.

0.000 errors=0 INFO:root:entered: fn4
0.000 errors=0 INFO:root:hello
1.001 errors=1 ERROR:root:uh-ho!
1.001 errors=1 INFO:root:exited: fn4

Passing a collector factory, as in the previous example, will result in a new counter collector being used each time the function is called. Let’s reuse the same collector to keep a tally of errors across all calls to the function by passing a collector instance that we get by calling the factory’s create method.

fn_errors = collector.counter.factory(
    name="errors", level=logging.ERROR
).create()


@tremors.logged(fn_errors)
def fn5(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.error("uh-ho!")


fn5()
fn5()

The error count doesn’t reset in the second function call.

errors=0 INFO:root:entered: fn5
errors=1 ERROR:root:uh-ho!
errors=1 INFO:root:exited: fn5
errors=1 INFO:root:entered: fn5
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: fn5

Another way we can tally the count across all function calls is to pass the same logger with each call.

def fn6(*, logger: tremors.Logger) -> None:
    logger.error("uh-ho!")


with tremors.Logger(
    collector.counter.factory(name="errors", level=logging.ERROR),
    name="context",
) as logger:
    fn6(logger=logger)
    fn6(logger=logger)

We only get entered and exited messages for the context block. But the single logger used in both function calls maintains its state between calls.

errors=0 INFO:root:entered: context
errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: context

Collectors may be inherited by descendant loggers. Let’s count errors across nested loggers.

@tremors.logged(
    collector.counter.factory(
        name="errors", level=logging.ERROR, inherit=True
    ),
    enter_msg=False,
    exit_msg=False,
)
def parent(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.error("uh-ho!")
    child()
    child()


@tremors.logged(enter_msg=False, exit_msg=False)
def child(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.error("doh!")
    grandchild()


@tremors.logged(enter_msg=False, exit_msg=False)
def grandchild(*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("so far, so good")
    logger.error("spoke too soon!")


parent()

We’ve disabled the entered and exited messages with the enter_msg and exit_msg parameters. The parent counter is used in the child and grandchild functions.

errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:doh!
errors=2 INFO:root:so far, so good
errors=3 ERROR:root:spoke too soon!
errors=4 ERROR:root:doh!
errors=4 INFO:root:so far, so good
errors=5 ERROR:root:spoke too soon!

Asynchronous functions and methods may be decorated with async_logged.

import asyncio


@tremors.async_logged(collector.elapsed.factory())
async def async_work_long(
    *, logger: tremors.Logger = tremors.from_logged
) -> None:
    logger.info("long work starting")
    await asyncio.sleep(1)


@tremors.async_logged(collector.elapsed.factory())
async def async_work_short(
    *, logger: tremors.Logger = tremors.from_logged
) -> None:
    logger.info("short work starting")
    await asyncio.sleep(0.1)


@tremors.async_logged(collector.elapsed.factory())
async def async_fn(*, logger: tremors.Logger = tremors.from_logged) -> None:
    coros = async_work_long(), async_work_short()
    logger.info("awaiting %d works", len(coros))
    await asyncio.gather(*coros)


logging.basicConfig(
    format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
asyncio.run(async_fn())
0.000 INFO:root:entered: async_fn
0.000 INFO:root:awaiting 2 works
0.000 INFO:root:entered: async_work_long
0.000 INFO:root:long work starting
0.000 INFO:root:entered: async_work_short
0.000 INFO:root:short work starting
0.101 INFO:root:exited: async_work_short
1.002 INFO:root:exited: async_work_long
1.003 INFO:root:exited: async_fn

See the collector module in the full documentation for how you can define your own collectors, and bundles.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tremors-0.7.0.tar.gz (15.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tremors-0.7.0-py3-none-any.whl (16.9 kB view details)

Uploaded Python 3

File details

Details for the file tremors-0.7.0.tar.gz.

File metadata

  • Download URL: tremors-0.7.0.tar.gz
  • Upload date:
  • Size: 15.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tremors-0.7.0.tar.gz
Algorithm Hash digest
SHA256 808c6a28205dba2d135eb8010d7dd168487e200bfba6d36b5348ec8611af7a71
MD5 9ac13f4ce6aec9af2532aa982087e7f1
BLAKE2b-256 cf307b955a8c6ea17a51dfce5da6774ce470ca9fb65d80ab1b7d5133e03dc231

See more details on using hashes here.

File details

Details for the file tremors-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: tremors-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 16.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tremors-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 805f9511d9ab5f48b219276db9290a2d987bbb302b6056ae2137d2a73f9d00c9
MD5 2b2c7c4a4391b74bec931f44c6377156
BLAKE2b-256 f3241fab82856aa92d3d99a5cae3daa7b63c533eceecba18f8da2c178dec560b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page