Skip to main content

Python SDK for Kamori — self-hosted log ingestion

Project description

kamori-sdk

Python SDK for Kamori — self-hosted log ingestion with MCP support.

Events are buffered in memory and flushed to your Kamori server on a background daemon thread, so logging never blocks your application.


Installation

pip install kamori-sdk

Quick start

import atexit
from kamori_sdk import KamoriClient

client = KamoriClient(
    url="https://your-kamori-server.com",
    token="your-log-token",   # matches INGEST_TOKEN on the server
)
atexit.register(client.shutdown)  # flush buffered events on process exit

client.log({"level": "info", "service": "api", "message": "Server started"})
client.log({"level": "error", "service": "payments", "message": "Stripe timeout", "duration_ms": 5001})

Python logging integration

One-liner (recommended)

from kamori_sdk import install_logging_handler

handler = install_logging_handler(
    url="https://your-kamori-server.com",
    token="your-log-token",
)

import logging
logging.getLogger("api").info("Server started")
logging.getLogger("payments").error("Stripe timeout")

Each LogRecord is converted to a structured dict with level, message, logger, module, funcName, and lineno fields. Exception info is serialised as exc_text so stack traces are full-text-searchable in Kamori.

Explicit handler

import logging
from kamori_sdk.logging_handler import KamoriHandler

handler = KamoriHandler(
    url="https://your-kamori-server.com",
    token="your-log-token",
    level=logging.WARNING,   # only WARNING and above
)
logging.getLogger().addHandler(handler)

Django

In settings.py:

LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "kamori": {
            "class": "kamori_sdk.logging_handler.KamoriHandler",
            "url": "https://your-kamori-server.com",
            "token": env("INGEST_TOKEN", default=""),
        },
    },
    "root": {"handlers": ["kamori"], "level": "INFO"},
}

FastAPI

from contextlib import asynccontextmanager
from fastapi import FastAPI
from kamori_sdk import install_logging_handler

@asynccontextmanager
async def lifespan(app: FastAPI):
    handler = install_logging_handler(
        url=settings.KAMORI_URL,
        token=settings.INGEST_TOKEN,
    )
    yield
    handler.client.shutdown()   # flush before shutdown

app = FastAPI(lifespan=lifespan)

structlog

import structlog
from kamori_sdk import KamoriClient

client = KamoriClient(url="https://your-kamori-server.com", token="your-log-token")

def kamori_sink(logger, method, event_dict):
    client.log(event_dict)
    return event_dict

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        kamori_sink,
        structlog.dev.ConsoleRenderer(),
    ]
)

loguru

from loguru import logger
from kamori_sdk import KamoriClient

client = KamoriClient(url="https://your-kamori-server.com", token="your-log-token")

def kamori_sink(message):
    record = message.record
    client.log({
        "level":    record["level"].name.lower(),
        "message":  record["message"],
        "module":   record["module"],
        "function": record["function"],
        "line":     record["line"],
    })

logger.add(kamori_sink)

Scoped clients

Add default fields to every log call without repeating them:

def make_scoped(client: KamoriClient, **defaults):
    """Return a log function that merges defaults into every event."""
    def log(event):
        client.log({**defaults, **event})
    return log

api_log = make_scoped(client, service="api", version="2.1.0")
api_log({"level": "info", "message": "Request started", "path": "/checkout"})
api_log({"level": "error", "message": "DB timeout"})

flush_on_exit

The background thread is a daemon thread — it dies automatically when the main thread exits. For guaranteed delivery, register shutdown() via atexit:

import atexit
atexit.register(client.shutdown)   # flush + stop background thread (up to 5s)

Or call explicitly:

client.shutdown(timeout=5.0)   # blocks up to 5 seconds

For an immediate flush without stopping the thread (useful in request handlers):

client.flush()   # blocks until the queue drains

on_drop callback

Called when a batch is permanently dropped after all retry attempts:

import logging

def handle_drop(events):
    logging.warning("Kamori dropped %d events — check server connectivity", len(events))

client = KamoriClient(
    url="https://your-kamori-server.com",
    token="your-log-token",
    on_drop=handle_drop,
)

Retry behaviour

Failed requests are retried up to three times with exponential back-off:

Attempt Delay
1st retry 0.25 s
2nd retry 1 s
3rd retry 4 s

4xx responses are not retried (client error — bad token, oversized batch). The client never raises or crashes the calling thread.


Configuration reference

Parameter Default Description
url Base URL of your Kamori server (required)
token None Auth token (sent as Authorization: Bearer). Omit to skip auth.
batch_size 50 Flush automatically when buffer reaches this size
flush_interval 2.0 Max seconds between background flushes
max_queue 0 Max events in the in-memory queue. 0 = unlimited. New events are dropped (calling on_drop) when the limit is reached.
on_drop None Callable[[list[dict]], None] — called when a batch is dropped

Requirements

  • Python 3.9+
  • No external dependencies (uses urllib.request and threading from the standard library)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kamori_sdk-0.1.1.tar.gz (18.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kamori_sdk-0.1.1-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file kamori_sdk-0.1.1.tar.gz.

File metadata

  • Download URL: kamori_sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 18.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for kamori_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 3e23c05f5fc24662f76f3afa5adec53cfad42abea8c56007a245e8e5f0257d39
MD5 7ded1e39fd75e9674ae4182d4f9bb94e
BLAKE2b-256 805665baa4d63b1fb5468e9328bb9d6a7f3005d480f0f8c8201291256a1b5c31

See more details on using hashes here.

Provenance

The following attestation bundles were made for kamori_sdk-0.1.1.tar.gz:

Publisher: release.yml on usekamori/kamori-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kamori_sdk-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: kamori_sdk-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 9.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for kamori_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0bde0099f49468a187a7adc219425550c0e4f591104bc4ad3321ff5d0e3ae902
MD5 289d0e56fe8103394e4a7824f6515c78
BLAKE2b-256 4adccf03af9f36a10cc3111b09c685d1481dc04d479427f0f214ce6b9619c86b

See more details on using hashes here.

Provenance

The following attestation bundles were made for kamori_sdk-0.1.1-py3-none-any.whl:

Publisher: release.yml on usekamori/kamori-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page