Skip to main content

No project description provided

Project description

from pydantic import BaseModelfrom datetime import timezonefrom sotkalib.time import dtfunc

sotkalib

Async-first utility library for Python 3.13+. Reusable building blocks for web applications with database, caching, and HTTP support.

Most base classes (RedisLRU, DistributedLock, HTTPSession, Database, etc.) are designed to be declared once as a constant and then shallow-copied with per-use overrides via builder methods. This lets you define a baseline configuration in one place and derive specialized variants without mutating the original.

Install

pip install sotkalib

Modules


config -- Environment-based configuration

Type-safe settings read from environment variables (with .env support). Only immutable primitives are allowed (int, float, complex, str, bool, None). Attribute names must be UPPER_SNAKE_CASE by default.

import secrets
from sotkalib.config import AppSettings, SettingsField

class Config(AppSettings):
    BOT_TOKEN: str = SettingsField(nullable=False)
    POSTGRES_USER: str = SettingsField(default="pg_user")
    POSTGRES_PASSWORD: str = SettingsField(factory=lambda: secrets.token_urlsafe(8))
    SECRET: str = SettingsField(factory="computed_secret")  # resolved from a @property

    @property
    def computed_secret(self) -> str:
        return "derived-from-other-fields"

settings = Config()                                    # loads from env / .env
prod_settings = Config(dotenv_path=".env.prod", strict=True)  # strict rejects mutable types

SettingsField options:

  • default -- static fallback value
  • factory -- callable, or a str referencing a @property on the class (resolved after all other fields)
  • nullable -- if True, missing env vars become None instead of raising

sqla -- SQLAlchemy database management

Database

Manages sync and async SQLAlchemy engines and session factories. Sessions come in two flavors: unsafe (raw sessionmaker context manager) and safe (auto-commit on success, rollback on exception, close on exit). The explicit_safe setting (default True) makes .session / .asession return safe wrappers.

from sotkalib.sqla import Database, DatabaseSettings, BasicDBM

db = Database(DatabaseSettings(
    uri="postgresql://user:pass@localhost:5432/mydb",
    async_driver="psycopg",   # set to None to disable async
	enable_sync_engine=False, 
    pool_size=10,
    echo=False,
    expire_on_commit=False,
    implicit_safe=True,        # .asession returns safe wrapper
))

# async usage -- auto-commits, rolls back on exception
async with db as d: # would raise with async_engine=None
    async with d.asession as session:
        session.add(row) 

# sync usage
with db as d: # would raise with enable_sync_engine=False
    with d.session as session: 
        session.add(row)

# explicit unsafe if needed
async with db.asession_unsafe as session:
    ...

# create tables from declarative base
db = Database(DatabaseSettings(uri=..., decl_base=BasicDBM))
db.create()

BasicDBM

Declarative base with .dict() for converting ORM models to plain dicts, optionally filtered through a Pydantic model's fields.

from sotkalib.sqla import BasicDBM

class User(BasicDBM):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

user.dict()                                       # all columns
user.dict(pydantic_model=UserSchema)              # only fields in UserSchema
user.dict(explicitly_include=["id", "name"])      # only specified fields
user.dict(name="not_that_model_has")              # inject/override keys

user.is_loaded(attr="email")                      # check if attr is loaded (useful with lazy relationships)

PydanticJSON

Column type that stores Pydantic models as JSON (JSONB on PostgreSQL).

from sotkalib.sqla import PydanticJSON

class Profile(BaseModel):
    bio: str
    links: list[str]

class User(BasicDBM):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    profile = Column(PydanticJSON(Profile))  # stored as JSONB in postgres, JSON elsewhere

Use flag_pydantic_changes to detect mutations during updates:

from sqlalchemy import event
from sotkalib.sqla import flag_pydantic_changes

event.listen(User, "before_update", flag_pydantic_changes)

redis -- Distributed caching and locking

All Redis utilities take an AbstractAsyncContextManager[Redis] as their first argument -- typically a RedisPool instance, but you can use any AsyncContextManager that yields a Redis client (standard redis.asyncio.Redis is supported). Base instances are immutable; builder methods return shallow copies so you can derive variants from one constant.

RedisPool

Connection pool wrapper. Implements AbstractAsyncContextManager[Redis] -- use as a context manager to get a Redis client.

from sotkalib.redis import RedisPool, RedisPoolSettings

pool = RedisPool(RedisPoolSettings(
    uri="redis://localhost:6379",
    db_num=4,
    max_connections=50,
    health_check_interval=30,
))

async with pool as rc:
    await rc.set("key", "value")

RedisLRU

Async function cache backed by Redis. Define a base instance, then derive variants with different TTLs, versions, serializers, or key functions.

from sotkalib.redis import RedisLRU, LRUSettings

# base instance -- define once, import everywhere
LRU = RedisLRU(pool, LRUSettings(ttl=600, version=1))

@LRU.version(2).ttl(60)
async def get_user(user_id: int) -> User: ...

@LRU.ttl(300).version(3).serializer(JSONSerializer)
async def get_session(token: str) -> Session: ...

Builder methods: .ttl(), .version(), .serializer(), .keyfunc()

The default serializer is B64Pickle (base64-encoded pickle). A SecurityWarning is emitted unless you set LRU_CACHE_ALLOW_PICKLE=yes or provide a different serializer.

Custom serializers implement the Serializer protocol:

class Serializer(Protocol):
    @staticmethod
    def marshal(data: Any) -> bytes: ...
    @staticmethod
    def unmarshal(raw_data: bytes) -> Any: ...

DistributedLock

Redis-based distributed lock with three acquisition phases: spin (tight loop, no delay), single attempt, and wait (with backoff). Define a base, derive per-use variants.

from sotkalib.redis import DistributedLock, DLSettings

# base instance
Locker = DistributedLock(pool, DLSettings())

# use
async with Locker.wait(timeout=30.0, backoff=exponential_delay(0.1, 2)).acquire("resource:123", ttl=10):
	...  # lock held

# shorthand
async with Locker.spin(attempts=100).no_wait().acq("resource:123", timeout=5):
	...

Builder methods: .wait(), .no_wait(), .spin(), .if_taken(), .exc()

Backoff helpers:

from sotkalib.redis import plain_delay, additive_delay, exponential_delay

plain_delay(0.5)               # constant 0.5s
additive_delay(0.1, 0.1)      # 0.1, 0.2, 0.3, ...
exponential_delay(0.1, 2)     # 0.1, 0.2, 0.4, 0.8, ...

Raises ContextLockError on failure. The .can_retry attribute indicates whether the caller should retry.


http -- HTTP client with retry and middleware

aiohttp wrapper with configurable retry logic, status code handling, exception routing, and a middleware pipeline. Like the Redis utilities, define a base HTTPSession and derive variants via settings or middleware.

from http import HTTPStatus
from sotkalib.http import HTTPSession, ClientSettings, StatusSettings, ExceptionSettings

# base config -- define once
client = HTTPSession(ClientSettings(
    timeout=10.0,
    maximum_retries=3,
    base=1.0,              # base delay
    backoff=2.0,           # exponential backoff factor
    status_settings=StatusSettings(
        to_retry={HTTPStatus.TOO_MANY_REQUESTS, HTTPStatus.SERVICE_UNAVAILABLE},
        to_raise={HTTPStatus.FORBIDDEN},
        not_found_as_none=True,
        unspecified="retry",   # or "raise"
    ),
    exception_settings=ExceptionSettings(
        to_retry=(TimeoutError, ServerDisconnectedError),
        to_raise=(ContentTypeError,),
        unspecified="retry",
    ),
))

# branch from base config with |
base = ClientSettings(timeout=10.0, maximum_retries=3)

async with (
    base 
    | ClientSettings(maximum_retries=5) 
    | StatusSettings(not_found_as_none=False)
) as http:
    resp = await http.get("https://api.example.com/users/1")
    data = await http.post("https://api.example.com/users", json=payload)

Middleware

Middleware wraps the request pipeline. Each middleware receives a RequestContext and a next callable. .use() is generic -- a middleware can change the session's return type. The base HTTPSession returns aiohttp.ClientResponse | None; a middleware that deserializes JSON can produce an HTTPSession[dict], etc.

async def auth_middleware(ctx: RequestContext, next):
    ctx.merge_headers({"Authorization": "Bearer ..."})
    return await next(ctx)

async def logging_middleware(ctx: RequestContext, next):
    result = await next(ctx)
    print(f"{ctx.method} {ctx.url} -> {ctx.status} in {ctx.elapsed:.2f}s")
    return result

# passthrough middleware -- return type unchanged (HTTPSession[ClientResponse | None])
client = HTTPSession().use(auth_middleware).use(logging_middleware)

# type-transforming middleware -- return type becomes HTTPSession[dict]
async def json_middleware(ctx: RequestContext, next) -> dict:
    resp = await next(ctx)
    return await resp.json() if resp else {}

json_client: HTTPSession[dict] = HTTPSession().use(auth_middleware).use(json_middleware)

async with json_client as http:
    data: dict = await http.get("https://api.example.com/users/1")

RequestContext exposes: method, url, attempt, max_attempts, response, elapsed, attempt_elapsed, is_retry, status, errors, last_error, state (arbitrary dict for middleware to share data).


exceptions -- Structured API errors

APIError

Structured error with status code, error code, description, and context. Serializes to JSON via an ErrorSchema Pydantic model.

from sotkalib.exceptions import APIError

raise APIError(
    status=409,
    code="CONFLICT",
    desc="Username already taken",
    ctx={"field": "username"},
)

Exception handlers

Decorators that catch exceptions, log them, and re-raise as ArgsIncludedError with the caller's local variables captured from the stack (useful for debugging).

from sotkalib.exceptions import exception_handler, aexception_handler

@exception_handler
def sync_fn(): ...

@aexception_handler
async def async_fn(): ...

traceback_from

from sotkalib.exceptions import traceback_from

try:
    ...
except Exception as e:
    tb_str = traceback_from(e)  # formatted traceback string

func -- Functional utilities

type guards

from sotkalib.func import or_raise, type_or_raise

user = or_raise(maybe_none, "user not found")     # raises ValueError if None
port = type_or_raise(value, int, "port must be int")  # raises TypeError

suppress

Context manager with two modes:

from sotkalib.func import suppress

with suppress():  # suppresses all exceptions
	risky()

with suppress(mode="exact", exact_types=[KeyError, IndexError]):  # only these types (exact match, not subclasses)
	d["missing"]

Async checks

from sotkalib.func import asyncfn, asyncfn_or_raise

def my_func(): ...

asyncfn(my_func)            # True if coroutine function
asyncfn_or_raise(my_func)   # raises TypeError if not

Deferred awaiting

from sotkalib.func import defer, defer_ok, defer_exc, defer_exc_mute

async def coro(): ...

# awaited in finally (always runs)
async with defer(coro()):     
    ...

# awaited only if no exception
async with defer_ok(coro()):   
    ...

# awaited only if exception raised
# -> bubbles exception up
async with defer_exc(coro()):  
    ...

# awaited only if exception raised
# -> silences exception
async with defer_exc_mute(coro()):    # awaited only if no exception
    ...

json -- Safe serialization

Recursively serializes Python objects to JSON bytes via orjson. Handles datetime, Decimal, UUID, Enum, bytes, Pydantic models, and nested structures. Falls back to str() for unknown types. Depth-limited to prevent infinite recursion.

from sotkalib.json import safe_serialize
from sotkalib.time import now
from decimal import Decimal
from pydantic import BaseModel

class PM(BaseModel): 
	a: bytes = "2"
	b: int = 3

pydantic_model = PM()
	
raw: bytes = safe_serialize(
	{
		"created": now(),
		"amount":  Decimal("19.99"),
		"profile": pydantic_model,
	}
)

log -- Loguru logging

Cached loguru logger factory. Names are transformed for readability ("myapp.db.session" -> "myapp -> db -> session").

from sotkalib.log import get_logger

log = get_logger("myapp.service")
log.info("starting")

enum -- Enum mixins

All mixins extend StrEnum.

  • UppercaseMixin -- auto-uppercases member values
  • ValidatorMixin -- .validate(val=..., req=True/False), .get(val, default), .in_(*members), .values()
  • ValuesMixin -- .values_list(), .values_set(), .names_list(), .names_set()
from sotkalib.enum import ValidatorMixin, UppercaseMixin, ValuesMixin

class Color(ValidatorMixin, UppercaseMixin):
    RED = auto()
    GREEN = auto()

col: Color = Color.RED
Color.validate(val="RED", req=True)   # Color.RED
Color.get("blue", default=Color.RED)  # Color.RED
col.in_(Color.RED, Color.GREEN) # True

time -- UTC helpers

from sotkalib.time import utcnow, now
from datetime import timezone, timedelta

tz = timezone(offset=timedelta(hours=12))
utcnow() # datetime.now(UTC)
now()    # datetime.now() with local tz
now(tz)  # datetime.now(tz)

type -- Sentinel types and runtime protocol checking

Unset sentinel

Unset singleton to distinguish "not provided" from None.

from sotkalib.type import Unset, unset, UnsetT


def update(name: str | UnsetT = Unset):
	if not unset(name):
		...

implements -- Runtime protocol checking

Check if a class implements a Protocol at runtime, with optional signature and type hint validation.

from typing import Protocol
from sotkalib.type import implements, DoesNotImplementError


class UserGetter(Protocol):
	def get_user(self, user_id: int) -> str: ...


class DB:
	def get_user(self, user_id: int) -> str:
		return "user"


# Check without raising
if implements(DB, UserGetter, early=True):
	print("DB implements UserGetter")

# Strict check with raising
try:
	implements(DB, UserGetter, disallow_extra=True)
except DoesNotImplementError as e:
	print(e.violations)  # list of what failed

Options:

  • signatures -- compare callable signatures (default: True)
  • type_hints -- compare type annotations (default: True)
  • disallow_extra -- flag extra parameters not in protocol (default: False)
  • early -- return bool instead of raising (default: False)

CheckableProtocol -- Protocol with % operator

A Protocol subclass that supports the % operator for concise runtime checks.

from sotkalib.type import CheckableProtocol

class UserGetter(CheckableProtocol):
    @property
    def val(self) -> int: ...

class Impl:
    val: int

class Impl2:
    val: str  # wrong type

print(Impl % UserGetter)   # True
print(Impl2 % UserGetter) # False (val is str, not int)

dict -- Dictionary utilities

Extended dict (mod_dict) with attribute access and chainable filter methods. Works with the Unset sentinel.

from sotkalib.dict.util import mod_dict, valid, not_none
from sotkalib.type import Unset

d = mod_dict(a=1, b=None, c=Unset)

v = d.a  # 1 (attribute access)
d.valid()  # mod_dict(a=1, b=None)  -- strips Unset values
d.not_none()  # mod_dict(a=1, c=Unset) -- strips None values
d.keys_()  # [a, b, c] as list

Standalone functions: valid(d), unset(d), not_none(d)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sotkalib-0.1.6.post1.tar.gz (30.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sotkalib-0.1.6.post1-py3-none-any.whl (43.6 kB view details)

Uploaded Python 3

File details

Details for the file sotkalib-0.1.6.post1.tar.gz.

File metadata

  • Download URL: sotkalib-0.1.6.post1.tar.gz
  • Upload date:
  • Size: 30.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for sotkalib-0.1.6.post1.tar.gz
Algorithm Hash digest
SHA256 12c78a7f8389a6113d659f61a86a0b937efa5fc8835d171e247083cd85119926
MD5 8d18914739dd5fcc5b88653602078877
BLAKE2b-256 cd0cb89a45cb4861b7f86017b9e0df8c50444a9a0d8b1c0a04959c5eab232cd4

See more details on using hashes here.

File details

Details for the file sotkalib-0.1.6.post1-py3-none-any.whl.

File metadata

  • Download URL: sotkalib-0.1.6.post1-py3-none-any.whl
  • Upload date:
  • Size: 43.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for sotkalib-0.1.6.post1-py3-none-any.whl
Algorithm Hash digest
SHA256 7d4633a5e137cd036604d8e4f3fd6f82ca2f8143d6b8910ab566d18d1292ffb3
MD5 07ccc8b1e3a773ac5a2e549cad8956a2
BLAKE2b-256 dbdcb402dc56969350b3b7db627f4cee3879c4228c62a81757b756996f5ffb65

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page