Skip to main content

Tremors is a library for logging while collecting metrics.

Project description

Tremors is a library for logging while collecting metrics. Tremors loggers are drop-in replacements for standard loggers. But Tremors loggers have metrics collectors that run when messages are logged. The loggers are also context managers. The library maintains a hierarchy of nested contexts, where all logs and metrics are grouped together. You can create a new hierarchy at anytime to group related logs.

Installation

pip install tremors

Usage

A function can be wrapped in a logger context with the logged decorator. If you call the function without a logger argument, one will automatically be injected into it.

import logging

from tremors import from_logged, logged


@logged
def fn(*, logger=from_logged):
    logger.info("hello")


logging.basicConfig(
    format="Tremors > %(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
)
fn()

The context automatically logs entered, and exited messages before, and after each function call. The logger uses the configured standard root logger by default to log the messages.

Tremors > INFO:root:entered: fn
Tremors > INFO:root:hello
Tremors > INFO:root:exited: fn

You may specify a standard logger by name for the Tremors logger to use as its underlying logger.

@loggeds.logged(logger_name=__name__)
def fn((*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("hello")


fn()

The messages are logged by the specified underlying logger. Based on our standard logging configuration, the messages propagate from the underlying logger to the standard root logger, which emits them.

Tremors > INFO:__main__:entered: fn
Tremors > INFO:__main__:hello
Tremors > INFO:__main__:exited: fn

Next let’s use a collector to measure the elapsed time since the function started each time a message is logged. When a message is logged, the logger runs the collector, and adds its updated state to the message’s LogRecord. We use a standard logging filter to inspect and modify the record before it is emitted. We format the collector state, then add the formatted state to the elapsed custom attribute of the record. Finally, we configure the root logger’s formatter to incorporate the elapsed attribute.

import copy
import time

from tremors.collector import elapsed


def flt(record):
    own_record = copy.copy(record)
    elapsed_msg = elapsed.formatter(own_record)
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    return own_record


@logged(elapsed.factory())
def fn(*, logger=from_logged):
    logger.info("sleeping for 1s...")
    time.sleep(1)


logging.basicConfig(
    format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()

The messages contain elapsed information, according to the formatter configuration, that is sourced from the record’s elapsed custom attribute.

0.000 INFO:root:entered: fn
0.000 INFO:root:sleeping for 1s...
1.000 INFO:root:exited: fn

A Logger can have any number of collectors. Here, in addition to the elapsed collector from the previous example, we add a counter collector. A collector has a level, and will only run if the message is being logged at that level or higher. Our counter level is ERROR. We can also control which custom record attribute has the formatted collector state via the collector’s name. This is useful if you have multiple of the same collector on a single logger. Here, we name the counter errors, so record.errors will contain a formatted string with the running total number of errors that have been logged by a single function call. Finally, we an control the format of the counter state via the fmt argument of the counter’s formatter.

from tremors.collector import counter


def flt(record):
    own_record = copy.copy(record)
    errors = counter.formatter(
        own_record, name="errors", fmt="errors={count}"
    )
    own_record.errors = f"{errors} " if errors else ""
    elapsed_msg = elapsed.formatter(own_record)
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    return own_record


@logged(
    elapsed.factory(), counter.factory(name="errors", level=logging.ERROR)
)
def fn(*, logger=from_logged):
    logger.info("hello")
    time.sleep(1)
    logger.error("uh-ho!")


logging.basicConfig(
    format="%(elapsed)s%(errors)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()

The messages contain information from both collectors.

0.000 errors=0 INFO:root:entered: fn
0.000 errors=0 INFO:root:hello
1.001 errors=1 ERROR:root:uh-ho!
1.001 errors=1 INFO:root:exited: fn

Passing a collector factory, as in the previous example, will result in a new counter collector being used each time the function is called. Let’s reuse the same collector to keep a tally of errors across all calls to the function by passing a collector instance that we get by calling the factory’s create method.

fn_errors = counter.factory(name="errors", level=logging.ERROR).create()


@logged(fn_errors)
def fn(*, logger=from_logged):
    logger.error("uh-ho!")


fn()
fn()

The error count doesn’t reset in the second function call.

errors=0 INFO:root:entered: fn
errors=1 ERROR:root:uh-ho!
errors=1 INFO:root:exited: fn
errors=1 INFO:root:entered: fn
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: fn

Another way we can tally the count across all function calls is to pass the same logger with each call.

from tremors import Logger


def fn(*, logger):
    logger.error("uh-ho!")


with Logger(
    counter.factory(name="errors", level=logging.ERROR), name="context"
) as logger:
    fn(logger=logger)
    fn(logger=logger)

We only get entered and exited messages for the context block. But the single logger used in both function calls maintains its state between calls.

errors=0 INFO:root:entered: context
errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: context

Collectors may be inherited by descendant loggers. Let’s count errors across nested loggers.

@logged(
    counter.factory(name="errors", level=logging.ERROR, inherit=True),
    enter_msg=False,
    exit_msg=False,
)
def parent(*, logger=from_logged):
    logger.error("uh-ho!")
    child()
    child()


@logged(enter_msg=False, exit_msg=False)
def child(*, logger=from_logged):
    logger.error("doh!")
    grandchild()


@logged(enter_msg=False, exit_msg=False)
def grandchild(*, logger=from_logged):
    logger.info("so far, so good")
    logger.error("spoke too soon!")


parent()

We’ve disabled the entered and exited messages with the enter_msg and exit_msg parameters. The parent counter is used in the child and grandchild functions.

errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:doh!
errors=2 INFO:root:so far, so good
errors=3 ERROR:root:spoke too soon!
errors=4 ERROR:root:doh!
errors=4 INFO:root:so far, so good
errors=5 ERROR:root:spoke too soon!

Asynchronous functions and methods may be decorated with async_logged.

import asyncio

from tremors import async_logged


@async_logged(elapsed.factory())
async def async_work_long(*, logger=from_logged):
    logger.info("long work starting")
    await asyncio.sleep(1)


@async_logged(elapsed.factory())
async def async_work_short(*, logger=from_logged):
    logger.info("short work starting")
    await asyncio.sleep(0.1)


@async_logged(elapsed.factory())
async def async_fn(*, logger=from_logged):
    coros = async_work_long(), async_work_short()
    logger.info("awaiting %d works", len(coros))
    await asyncio.gather(*coros)


logging.basicConfig(
    format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
asyncio.run(async_fn())
0.000 INFO:root:entered: async_fn
0.000 INFO:root:awaiting 2 works
0.000 INFO:root:entered: async_work_long
0.000 INFO:root:long work starting
0.000 INFO:root:entered: async_work_short
0.000 INFO:root:short work starting
0.101 INFO:root:exited: async_work_short
1.002 INFO:root:exited: async_work_long
1.003 INFO:root:exited: async_fn

We can capture, and run collectors on standard logs using enrich_std_record, which runs the current logger’s collectors with a LogItem that includes the standard log record. Collectors may inspect this record, and update their state accordingly. flt calls enrich_std_record to run the collectors on any standard records it receives.

For tremors loggers, collectors are run before a record is created with the data that will be used to create the record. The resultant extras are then added to the record by the standard logging framework. On the other hand, for standard logs, after the record is created, a filter or handler may run the current logger’s collectors on the record itself.

from tremors import enrich_std_record
from tremors.collector import identifier


def flt(record):
    own_record = copy.copy(record)
    enrich_std_record(own_record)
    elapsed_msg = elapsed.formatter(own_record)
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    ident_msg = identifier.formatter(own_record)
    own_record.identifier = f"{ident_msg} " if ident_msg else ""
    return own_record


@logged(elapsed.factory(), identifier.factory())
def fn(*, logger=from_logged):
    logger.info("attempting to capture a standard log")
    std()


def std():
    time.sleep(0.1)
    logging.getLogger().info("capture me")
    std2()


def std2():
    time.sleep(0.1)
    logging.getLogger().info("capture me too")


logging.basicConfig(
    format="%(elapsed)s%(identifier)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()

The identifiers collector uses the record from the standard logs to append a path component identifying the standard logger. The elapsed collector doesn’t need any additional information from the record to mark the duration.

0.000 [p:... t:... g:...] fn INFO:root:entered: fn
0.000 [p:... t:... g:...] fn INFO:root:attempting to capture a standard log
0.101 [p:... t:... g:...] fn/std:root INFO:root:capture me
0.201 [p:... t:... g:...] fn/std:root INFO:root:capture me too
0.201 [p:... t:... g:...] fn INFO:root:exited: fn

Let’s finish up with a full example that demonstrates how to turn log messages with collector state into structured JSON that can be persisted in a log store.

import copy
import logging
import time
import uuid

import pydantic
import tremors
from tremors import collector


class AppLogMessage(pydantic.BaseModel):
    """The model for app logs."""

    msg: str
    timestamp: float
    group_id: uuid.UUID | None
    path_list: list[str] | None = pydantic.Field(exclude=True)
    count: int | None
    t0: float | None = pydantic.Field(exclude=True)
    t: float | None = pydantic.Field(exclude=True)
    tt0: float | None = pydantic.Field(exclude=True)
    tt: float | None = pydantic.Field(exclude=True)

    @pydantic.computed_field
    @property
    def path(self) -> str | None:
        if self.path_list is None:
            return None
        return "/".join(self.path_list)

    @pydantic.computed_field
    @property
    def elapsed_ms(self) -> int | None:
        if self.t is None or self.t0 is None:
            return None
        return round((self.t - self.t0) / 1e6)

    @pydantic.computed_field
    @property
    def total_elapsed_ms(self) -> int | None:
        if self.tt is None or self.tt0 is None:
            return None
        return round((self.tt - self.tt0) / 1e6)


def flt(record: logging.LogRecord) -> logging.LogRecord:
    """Convert records with multiple collector states into structured logs.

    Also capture standard logs.
    """
    own_record = copy.copy(record)
    tremors.enrich_std_record(own_record)

    if not hasattr(own_record, tremors.EXTRA_KEY):
        return own_record

    if not hasattr(own_record, "message"):
        own_record.message = own_record.getMessage()

    ident_state = collector.get_state(
        collector.identifier.state, "identifier", own_record
    )
    count_state = collector.get_state(
        collector.enter_counter.state, "enter_counter", own_record
    )
    elapsed_state = collector.get_state(
        collector.elapsed.state, "elapsed", own_record
    )
    tot_elapsed_state = collector.get_state(
        collector.elapsed.state, "total_elapsed", own_record
    )
    group_id, path_list = (
        (ident_state.group_id, ident_state.path)
        if ident_state
        else (None, None)
    )
    count = count_state.count if count_state else None
    t0, t = (
        (elapsed_state.t0, elapsed_state.t)
        if elapsed_state
        else (None, None)
    )
    tt0, tt = (
        (tot_elapsed_state.t0, tot_elapsed_state.t)
        if tot_elapsed_state
        else (None, None)
    )
    try:
        msg = AppLogMessage(
            msg=own_record.message,
            timestamp=own_record.created,
            group_id=group_id,
            path_list=path_list,
            count=count,
            t0=t0,
            t=t,
            tt0=tt0,
            tt=tt,
        )
    except pydantic.ValidationError:
        return own_record
    else:
        own_record.app_log_message = msg
        return own_record


class TremorsHandler(logging.Handler):
    """A handler to persist structured logs in a store."""

    def emit(self, record: logging.LogRecord) -> None:
        """Queue a structured log for persistence to the log store.

        This is just a fake that only prints the JSON to stdout.
        """
        msg = getattr(record, "app_log_message", None)

        if not msg:
            return

        msg_json = msg.model_dump_json(indent=4)
        print(f"queue for log store >>> \033[0;34m{msg_json}\033[0m")  # noqa: T201


global_elapsed = collector.elapsed.factory(
    "total_elapsed", inherit=True
).create()
"""A global elapsed collector instance.

This collector can be added to all loggers to track the elapsed time since the
very first log.
"""

root_logged = tremors.logged(
    collector.identifier.factory(name="identifier", inherit=True),
    collector.enter_counter.factory(name="enter_counter"),
    collector.elapsed.factory(name="elapsed"),
    global_elapsed,
    is_root=True,
)
"""A canned decorator for creating root loggers."""

child_logged = tremors.logged(
    collector.enter_counter.factory(name="enter_counter"),
    collector.elapsed.factory(name="elapsed"),
)
"""A canned decorator for creating child loggers."""


@root_logged
def run_app(logger: tremors.Logger = tremors.from_logged) -> None:
    """The application entry point.

    This function has a root logger. The logger's identifiers collector will be
    inherited by all descendants, so they will all share a group_id, and a
    common path.
    """
    logger.info("app initialized")
    do_something()
    do_something_else()
    do_something_else()
    some_library()


@child_logged
def do_something(logger: tremors.Logger = tremors.from_logged) -> None:
    """This is a function you maintain, and can use a tremors logger.

    Its logger inherits its parent's identifiers collector, and the global
    elapsed collector.
    """
    logger.info("doing something")
    do_something_something()


@child_logged
def do_something_something(logger: tremors.Logger = tremors.from_logged):
    """This is another function you maintain, and can use a tremors logger.

    Its logger also inherits its parent's identifiers collector, and the global
    elapsed collector.
    """
    logger.info("doing something, something")
    time.sleep(0.1)


@root_logged
def do_something_else(logger: tremors.Logger = tremors.from_logged) -> None:
    """This is a function you maintain, and has a tremors root logger.

    Its logger gets its own identifiers collector, along with a new group_id,
    and begins a new root path. All of its descendants will share this new
    group_id, have paths starting from this logger's path. However, the logger
    also gets the shared global elapsed collector.
    """
    logger.info("doing something else")
    time.sleep(0.1)


def some_library() -> None:
    """This is a function you don't maintain that uses standard logging.

    As long as this function's logs flow to the standard root logger, its logs
    will be enriched by the current tremors logger's collectors, and structured
    logs will be persisted to the store. These logs will share the current
    logger's group_id, and have a path based on the current logger's path.
    """
    logging.getLogger("some_library").info("doing library stuff")
    time.sleep(0.1)
    another_library()


def another_library() -> None:
    """This is another function you don't maintain that uses standard logging.

    Its logs will be treated similarly to those of ``some_library``. However,
    path information for loggers between this function, and the current logger
    in the call stack will not be recorded. E.g., the path for this function's
    logs will be ``run_app/std:another_library``, and not
    ``run_app/std:some_library/std:another_library``.
    """
    logging.getLogger("another_library").info("doing other library stuff")
    time.sleep(0.1)


if __name__ == "__main__":
    handler = TremorsHandler()
    handler.addFilter(flt)
    logging.basicConfig(level=logging.INFO, handlers=(handler,), force=True)
    run_app()

You can see from the logs that logs descended from the same root logger share a group_id, and the path follows the call stack. The count is increased once for each time a tremors logged function is called, and separate counts are maintained for each function. The duration of each function is recorded, as well as the total time that has elapsed since the entry point started. And standard logs are also captured as if they were descended from the closes tremors logged function up the call stack.

queue for log store >>> {
    "msg": "entered: run_app",
    "timestamp": 1772951375.6724658,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "app initialized",
    "timestamp": 1772951375.6726198,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "entered: do_something",
    "timestamp": 1772951375.6726973,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "doing something",
    "timestamp": 1772951375.6727448,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "entered: do_something_something",
    "timestamp": 1772951375.6727943,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something/do_something_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "doing something, something",
    "timestamp": 1772951375.672832,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something/do_something_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "exited: do_something_something",
    "timestamp": 1772951375.773108,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something/do_something_something",
    "elapsed_ms": 100,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "exited: do_something",
    "timestamp": 1772951375.7734556,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something",
    "elapsed_ms": 101,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "entered: do_something_else",
    "timestamp": 1772951375.7737124,
    "group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
    "count": 1,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "doing something else",
    "timestamp": 1772951375.7738376,
    "group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
    "count": 1,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "exited: do_something_else",
    "timestamp": 1772951375.8741887,
    "group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
    "count": 1,
    "path": "do_something_else",
    "elapsed_ms": 100,
    "total_elapsed_ms": 202
}
queue for log store >>> {
    "msg": "entered: do_something_else",
    "timestamp": 1772951375.874693,
    "group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
    "count": 2,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 202
}
queue for log store >>> {
    "msg": "doing something else",
    "timestamp": 1772951375.874823,
    "group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
    "count": 2,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 202
}
queue for log store >>> {
    "msg": "exited: do_something_else",
    "timestamp": 1772951375.9751017,
    "group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
    "count": 2,
    "path": "do_something_else",
    "elapsed_ms": 100,
    "total_elapsed_ms": 303
}
queue for log store >>> {
    "msg": "doing library stuff",
    "timestamp": 1772951375.975387,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/std:some_library",
    "elapsed_ms": 303,
    "total_elapsed_ms": 303
}
queue for log store >>> {
    "msg": "doing other library stuff",
    "timestamp": 1772951376.0755894,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/std:another_library",
    "elapsed_ms": 403,
    "total_elapsed_ms": 403
}
queue for log store >>> {
    "msg": "exited: run_app",
    "timestamp": 1772951376.176055,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app",
    "elapsed_ms": 504,
    "total_elapsed_ms": 504
}

See the collector module in the full documentation for how you can define your own collectors, and bundles.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tremors-0.8.0.tar.gz (20.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tremors-0.8.0-py3-none-any.whl (20.5 kB view details)

Uploaded Python 3

File details

Details for the file tremors-0.8.0.tar.gz.

File metadata

  • Download URL: tremors-0.8.0.tar.gz
  • Upload date:
  • Size: 20.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tremors-0.8.0.tar.gz
Algorithm Hash digest
SHA256 d86ef1a26c9c30fc69a5ec47fa834e4c3e981b3fe7a9789d1f8f6226491c2155
MD5 1dfeae7140a21f5f2781c2610311bdc7
BLAKE2b-256 60d5374cb212ee584d9527590e9cd76bbcccbc74a94c24f5bb47d6314696eda3

See more details on using hashes here.

File details

Details for the file tremors-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: tremors-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 20.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tremors-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bceb925341885d5361ed1fa739ab1d34a0ab3165d453c41b85e657f61adb6902
MD5 b69178082f40c481d5e2ad3ca5116ce2
BLAKE2b-256 686f6d0b98d8f1368cd19d2deed640db553bd2088488b402b1bbb1a812634bd8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page