Skip to main content

Shared FastAPI building blocks: base schemas, ORM model, async repository, exceptions, pagination and settings — the conventions used across Tempest projects.

Project description

tempest-fastapi-sdk

Shared FastAPI/SQLAlchemy/Pydantic building blocks used across Tempest projects: base schemas, ORM model, async repository, pagination, settings, exceptions, Alembic helper, FastStream/TaskIQ broker managers, Redis cache, Server-Sent Events, Web Push and the utility classes (PasswordUtils, JWTUtils, EmailUtils, UploadUtils, MetricsUtils, LogUtils).

The goal is to start every new backend with the same opinionated foundation already in place — no copy-pasting BaseModel, no rewriting the same CRUD repository, no re-inventing the exception envelope.


Table of contents


Install

pip install tempest-fastapi-sdk

Via pyproject.toml:

dependencies = [
    "tempest-fastapi-sdk>=0.7.1",
]

Requires Python >=3.11.

Optional extras

Feature-rich helpers pull in third-party dependencies that you only need when you actually use the helper. Pick the extras the service consumes:

Extra Pulls in Unlocks
[auth] bcrypt, PyJWT PasswordUtils, JWTUtils
[email] aiosmtplib EmailUtils
[upload] aiofiles, python-multipart UploadUtils
[cache] redis AsyncRedisManager
[webpush] pywebpush, cryptography WebPushDispatcher
[metrics] psutil, nvidia-ml-py MetricsUtils
[queue] faststream[rabbit] AsyncBrokerManager (FastStream)
[tasks] taskiq, taskiq-aio-pika AsyncTaskBrokerManager (TaskIQ)
[all] everything above every helper
pip install "tempest-fastapi-sdk[auth,upload]"   # only what the service uses
pip install "tempest-fastapi-sdk[all]"           # or pull everything

Since 0.7.1 every optional dependency is imported lazily at first instantiation, so import tempest_fastapi_sdk works with any subset of extras — instantiating a helper whose extra is missing raises ImportError with a clear hint pointing at the right one.


What's inside

Module Exports
tempest_fastapi_sdk.schemas BaseSchema, BaseResponseSchema, BasePaginationFilterSchema, BasePaginationSchema[T], CursorPaginationFilterSchema, CursorPaginationSchema, encode_cursor, decode_cursor, build_pagination_link_header
tempest_fastapi_sdk.db BaseModel, BaseRepository[ModelType], AsyncDatabaseManager, AlembicHelper, NAMING_CONVENTION, AuditMixin, SoftDeleteMixin
tempest_fastapi_sdk.exceptions AppException, NotFoundException, ConflictException, ValidationException, UnauthorizedException, ForbiddenException, InvalidTokenException, ExpiredTokenException, FileTooLargeException, InvalidFileTypeException
tempest_fastapi_sdk.settings BaseAppSettings, ServerSettings, LogSettings, DatabaseSettings, RedisSettings, RabbitMQSettings, JWTSettings, CORSSettings, EmailSettings, UploadSettings, TokenSettings, WebPushSettings, TaskIQSettings
tempest_fastapi_sdk.api register_exception_handlers, app_exception_handler, apply_cors, make_health_router, make_tool_spec_router, make_token_dependency, make_bearer_token_dependency, make_jwt_user_dependency, make_role_dependency, make_permission_dependency, require_x_token, run_server, RequestIDMiddleware, RateLimitMiddleware, WebhookSignatureVerifier, HealthCheck
tempest_fastapi_sdk.controllers BaseController
tempest_fastapi_sdk.services BaseService
tempest_fastapi_sdk.core configure_logging, JSONFormatter, get_request_id/set_request_id/clear_request_id, request_id_ctx
tempest_fastapi_sdk.sse EventStream, ServerSentEvent, sse_response
tempest_fastapi_sdk.cache (extra: [cache]) AsyncRedisManager, cached
tempest_fastapi_sdk.webpush (extra: [webpush]) WebPushDispatcher, WebPushError, WebPushGoneError, WebPushSubscriptionSchema, WebPushKeysSchema, WebPushPayloadSchema
tempest_fastapi_sdk.queue (extra: [queue]) AsyncBrokerManager (FastStream lifecycle wrapper)
tempest_fastapi_sdk.tasks (extra: [tasks]) AsyncTaskBrokerManager (TaskIQ lifecycle wrapper), AsyncTaskScheduler (periodic / cron tasks)
tempest_fastapi_sdk.utils to_utc, utcnow, modify_dict, LogUtils, PasswordUtils (extra: [auth]), JWTUtils (extra: [auth]), EmailUtils (extra: [email]), UploadUtils (extra: [upload]), MetricsUtils/CPUMetrics/MemoryMetrics/DiskMetrics/GPUMetrics/SystemMetrics (extra: [metrics]), BR regex helpers (CPF, CNPJ, CPFOrCNPJ, PhoneBR, CEP, is_valid_*, normalize_*, only_digits, *_PATTERN)

Core primitives are re-exported from tempest_fastapi_sdk at the top level — from tempest_fastapi_sdk import BaseModel, BaseRepository, AppException always works. The extras-gated managers in tempest_fastapi_sdk.cache, tempest_fastapi_sdk.queue and tempest_fastapi_sdk.tasks must be imported from their own submodule (from tempest_fastapi_sdk.queue import AsyncBrokerManager).


Architecture overview

The SDK assumes a layered architecture where each layer has a single, narrow responsibility:

HTTP request
    │
    ▼
┌─────────────┐    receive HTTP, validate input via schemas,
│   Router    │    call service, return response schema.
└──────┬──────┘    No business logic, no DB access.
       │
       ▼
┌─────────────┐    orchestrate use case across one or more services;
│ Controller  │    handle cross-service coordination only.
└──────┬──────┘    Optional — skip for simple CRUD.
       │
       ▼
┌─────────────┐    business rules, validation beyond Pydantic,
│   Service   │    domain decisions. Calls one or more repositories.
└──────┬──────┘    No HTTP types, no SQLAlchemy types.
       │
       ▼
┌─────────────┐    raw data access via SQLAlchemy. CRUD, filters,
│ Repository  │    pagination. Translates between ORM and schemas
└──────┬──────┘    via map_to_* methods. No business decisions.
       │
       ▼
┌─────────────┐    SQLAlchemy AsyncSession on top of asyncpg/aiosqlite.
│  Database   │
└─────────────┘

The SDK ships BaseModel, BaseRepository, BaseSchema and the exception/settings primitives. Routers, services and controllers are your code — the SDK gives you the conventions so they all look the same across projects.


Tutorial — building the Users feature

We'll build a complete Users feature from scratch, end to end. Every file below is something you write in your project; SDK primitives are imported.

1. Project layout

The canonical layout every Python service shipped against this SDK should adopt — main.py is a one-liner, src/server.py exposes both run() and the importable app (or re-exports it from src/api/app.py), api/dependencies/ is always a package (auth + factory providers), controllers/ is mandatory even when it's only a thin pass-through, and repositories/ lives under db/.

my-service/
├── main.py                       # one-liner: from src.server import run; run()
└── src/
    ├── __init__.py               # re-exports `run` from src.server
    ├── server.py                 # programmatic uvicorn.run(...) + module-level `app`
    ├── core/
    │   ├── __init__.py
    │   ├── settings.py           # Settings(BaseAppSettings, mixins...)
    │   └── exceptions.py         # domain exceptions (UserNotFoundError, ...)
    ├── db/
    │   ├── __init__.py           # re-exports BaseModel + every model
    │   ├── models/
    │   │   ├── __init__.py
    │   │   └── user.py           # UserModel(BaseModel)
    │   └── repositories/
    │       ├── __init__.py
    │       └── user.py           # UserRepository(BaseRepository[UserModel])
    ├── schemas/
    │   ├── __init__.py
    │   └── user.py               # UserCreate/Update/Response/Filter
    ├── services/
    │   ├── __init__.py
    │   └── user.py               # UserService — business logic
    ├── controllers/
    │   ├── __init__.py
    │   └── user.py               # UserController — orchestration (thin pass-through OK)
    └── api/
        ├── __init__.py
        ├── app.py                # create_app() — middleware, CORS, exception handlers, routers
        ├── routers/
        │   ├── __init__.py
        │   └── users.py
        └── dependencies/         # ALWAYS a package, never a flat module
            ├── __init__.py
            ├── auth.py           # X-Token / current_user / require_role dependencies
            └── controllers.py    # get_<X>_controller / get_<X>_service factories

Each __init__.py re-exports every public symbol from its directory so consumers always do from src.schemas import UserCreateSchema (not from src.schemas.user import UserCreateSchema). This keeps refactors painless — move files around without breaking imports.

If your service has no controllers/services/repositories yet, still ship empty packages with the right names — uniformity matters more than skipping a directory. Drop db/, utils/, queue/ or tasks/ only when the service genuinely doesn't need persistence/utilities/messaging.

2. Settings, server, app factory & entry point

Four files map onto four responsibilities:

File Responsibility
src/core/settings.py Settings(BaseAppSettings, ...mixins) — one source of truth for env vars.
src/api/app.py create_app() factory + middleware + CORS + exception handlers + router includes + module-level app instance.
src/server.py run() invoking uvicorn.run("src.api.app:app", ...) programmatically, plus re-exports app so external runners (gunicorn, uvicorn CLI) can import it.
main.py Process entry point — a single line under if __name__ == "__main__": calling run().
# src/core/settings.py
from tempest_fastapi_sdk import BaseAppSettings, DatabaseSettings, ServerSettings


class Settings(ServerSettings, DatabaseSettings, BaseAppSettings):
    """All environment-driven configuration lives here.

    BaseAppSettings ships `env_file=.env`, `extra=ignore`,
    `case_sensitive=True`, `frozen=True` and `str_strip_whitespace=True`.
    ServerSettings adds SERVER_HOST/PORT/RELOAD, DatabaseSettings adds
    DATABASE_URL/ECHO/POOL_*.
    """

    JWT_SECRET: str
    JWT_ALGORITHM: str = "HS256"
    JWT_TTL_HOURS: int = 1

    SMTP_HOST: str = "localhost"
    SMTP_PORT: int = 587
    SMTP_USERNAME: str | None = None
    SMTP_PASSWORD: str | None = None
    SMTP_FROM_ADDR: str = "noreply@example.com"

    UPLOAD_DIR: str = "./var/uploads"


settings = Settings()
# src/api/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import FastAPI

from tempest_fastapi_sdk import (
    AsyncDatabaseManager,
    RequestIDMiddleware,
    make_health_router,
    register_exception_handlers,
)

from src.api.routers import users
from src.core.settings import settings


db = AsyncDatabaseManager(settings.DATABASE_URL)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    """Connect on startup, dispose on shutdown."""
    await db.connect()
    try:
        yield
    finally:
        await db.disconnect()


def create_app() -> FastAPI:
    """Build and configure the FastAPI app."""
    app = FastAPI(title="my-service", version="0.1.0", lifespan=lifespan)

    app.add_middleware(RequestIDMiddleware)
    register_exception_handlers(app)

    # Meta endpoints sit at the root prefix.
    app.include_router(make_health_router(db=db, version="0.1.0"))

    # Business endpoints sit under /api/<domain>.
    app.include_router(users.router, prefix="/api")
    return app


app = create_app()
# src/server.py
from tempest_fastapi_sdk import run_server

from src.api.app import app  # noqa: F401 — re-exported for external runners
from src.core.settings import settings


def run() -> None:
    """Start the API server programmatically."""
    run_server("src.api.app:app", settings=settings)


__all__: list[str] = ["app", "run"]

run_server reads SERVER_HOST / SERVER_PORT / SERVER_RELOAD from settings (falling back to 127.0.0.1 / 8000 / False) and forwards any extra kwargs (workers=, log_config=, ssl_*=) verbatim to uvicorn.run. See the Programmatic server entry point recipe.

# src/__init__.py
from src.server import run

__all__: list[str] = ["run"]
# main.py
from src.server import run

if __name__ == "__main__":
    run()

Bind defaults: 127.0.0.1 for internal services (the SDK's ServerSettings.SERVER_HOST default), 0.0.0.0 only when the service is consumed by a separate origin (e.g. a frontend dev server). Never start uvicorn via subprocess.run(["uvicorn", ...]) — always go through run_server (or uvicorn.run("src.api.app:app", ...) directly) so reload, signal handling and graceful shutdown behave correctly.

3. ORM model

# src/db/models/user.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import BaseModel


class UserModel(BaseModel):
    """One row per registered user.

    Inherits from BaseModel, so it automatically gets:
    - id (UUID v4, cross-DB portable via sqlalchemy.Uuid)
    - is_active (bool, soft-delete flag)
    - created_at, updated_at (timezone-aware TIMESTAMP, set by Python AND
      the DB so the instance attribute is populated right after flush)
    - __tablename__ = "user" (auto: class name without "Model" suffix,
      snake-cased; override by assigning __tablename__ explicitly)
    - __eq__/__hash__ by (type, id) so the same row across sessions
      compares equal
    - to_dict(exclude, include, remove_none) and
      update_from_dict(data, allowed_fields) helpers
    """

    name: Mapped[str] = mapped_column(String(64), nullable=False)
    email: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
    password_hash: Mapped[str] = mapped_column(String(128), nullable=False)

Re-export it:

# src/db/models/__init__.py
from src.db.models.user import UserModel

__all__: list[str] = ["UserModel"]
# src/db/__init__.py
from src.db.models import UserModel
from tempest_fastapi_sdk import BaseModel

__all__: list[str] = ["BaseModel", "UserModel"]

Tip: Always import models in src/db/__init__.py. SQLAlchemy needs to "see" every model before BaseModel.metadata is complete, so Alembic autogenerate and create_tables() work correctly.

4. Schemas

The recommended naming pattern: one *Create, *Update, *Response and *Filter schema per resource.

# src/schemas/user.py
from pydantic import EmailStr, Field

from tempest_fastapi_sdk import (
    BasePaginationFilterSchema,
    BaseResponseSchema,
    BaseSchema,
)


class UserCreateSchema(BaseSchema):
    """Payload for POST /users."""

    name: str = Field(min_length=1, max_length=64)
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)


class UserUpdateSchema(BaseSchema):
    """Partial payload for PATCH /users/{id}. Every field optional."""

    name: str | None = Field(default=None, min_length=1, max_length=64)
    email: EmailStr | None = None


class UserResponseSchema(BaseResponseSchema):
    """Outbound representation.

    Inherits id/is_active/created_at/updated_at from BaseResponseSchema
    (timestamps already normalized to UTC by the field validator).
    """

    name: str
    email: EmailStr


class UserFilterSchema(BasePaginationFilterSchema):
    """Query-string filters for GET /users.

    Inherits page/size/order_by/ascending/is_active from
    BasePaginationFilterSchema. Add domain-level filters below.
    """

    name: str | None = None              # ILIKE %name% search
    email: EmailStr | None = None        # exact-match filter
# src/schemas/__init__.py
from src.schemas.user import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)

__all__: list[str] = [
    "UserCreateSchema",
    "UserFilterSchema",
    "UserResponseSchema",
    "UserUpdateSchema",
]

5. Domain exceptions

The SDK ships generic NotFoundException, ConflictException, etc. Subclass them per domain so error responses have a useful code field:

# src/core/exceptions.py
from typing import ClassVar

from tempest_fastapi_sdk import ConflictException, NotFoundException


class UserNotFoundError(NotFoundException):
    message: str = "Usuário não encontrado"
    code: ClassVar[str] = "USER_NOT_FOUND"


class UserEmailAlreadyTakenError(ConflictException):
    message: str = "Já existe um usuário com esse e-mail"
    code: ClassVar[str] = "USER_EMAIL_TAKEN"

The SDK's exception handler (register_exception_handlers) serializes them to:

{
    "detail": "Usuário não encontrado",
    "code": "USER_NOT_FOUND",
    "details": {}
}

The frontend branches on code, not on the (potentially translated) message.

6. Repository

# src/db/repositories/user.py
from typing import ClassVar

from tempest_fastapi_sdk import AppException, BaseRepository

from src.core.exceptions import UserNotFoundError
from src.db.models import UserModel
from src.schemas import UserResponseSchema


class UserRepository(BaseRepository[UserModel]):
    """Data-access layer for users."""

    model: type[UserModel] = UserModel
    not_found_exception: ClassVar[type[AppException]] = UserNotFoundError

    def map_to_schema(self, instance: UserModel) -> UserResponseSchema:
        return UserResponseSchema.model_validate(instance)

    def map_to_response(self, instance: UserModel) -> UserResponseSchema:
        return self.map_to_schema(instance)

Per-domain error messages (optional but recommended in real apps):

class UserRepository(BaseRepository[UserModel]):
    model: type[UserModel] = UserModel
    not_found_exception: ClassVar[type[AppException]] = UserNotFoundError

    def __init__(self, session: AsyncSession) -> None:
        super().__init__(
            session,
            not_found_message="Usuário não encontrado",
            create_conflict_message="Já existe um usuário com esse e-mail",
            update_conflict_message="Conflito ao atualizar usuário",
        )

The base repo gives you 17 methods for free — see the reference table below. Add custom methods on top:

async def get_by_email(self, email: str) -> UserModel:
    """Look up a user by email. Raises UserNotFoundError on miss."""
    return await self.get({"email": email})

7. Service

The service is where business rules live. It calls one or more repositories and never touches HTTP or SQLAlchemy types directly.

# src/services/user.py
from uuid import UUID

from tempest_fastapi_sdk import (
    BasePaginationSchema,
    PasswordUtils,
)

from src.core.exceptions import UserEmailAlreadyTakenError
from src.db.repositories import UserRepository
from src.schemas import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)


class UserService:
    """Business logic for the user domain."""

    def __init__(
        self,
        repository: UserRepository,
        passwords: PasswordUtils,
    ) -> None:
        """Initialize the service.

        Args:
            repository (UserRepository): User-domain repository.
            passwords (PasswordUtils): Shared bcrypt helper.
        """
        self.repo: UserRepository = repository
        self.passwords: PasswordUtils = passwords

    async def create(self, data: UserCreateSchema) -> UserResponseSchema:
        if await self.repo.exists({"email": data.email}):
            raise UserEmailAlreadyTakenError()
        user = self.repo.map_to_model(
            {
                **data.to_dict(exclude=["password"]),
                "password_hash": self.passwords.hash(data.password),
            }
        )
        user = await self.repo.add(user)
        return self.repo.map_to_response(user)

    async def get(self, user_id: UUID) -> UserResponseSchema:
        user = await self.repo.get_by_id(user_id)
        return self.repo.map_to_response(user)

    async def update(
        self,
        user_id: UUID,
        data: UserUpdateSchema,
    ) -> UserResponseSchema:
        user = await self.repo.get_by_id(user_id)
        user.update_from_dict(
            data.to_dict(),
            allowed_fields={"name", "email"},   # prevents mass-assignment
        )
        user = await self.repo.update(user)
        return self.repo.map_to_response(user)

    async def soft_delete(self, user_id: UUID) -> None:
        await self.repo.soft_delete(user_id)

    async def list_paginated(
        self,
        filters: UserFilterSchema,
    ) -> BasePaginationSchema[UserResponseSchema]:
        page = await self.repo.paginate(
            filters=filters.get_conditions(),
            page=filters.page,
            page_size=filters.size,
            order_by=filters.order_by,
            ascending=filters.ascending,
        )
        return BasePaginationSchema[UserResponseSchema](
            items=[self.repo.map_to_response(u) for u in page["items"]],
            total=page["total"],
            page=page["page"],
            size=page["size"],
            pages=page["pages"],
        )

8. Controller

Even when there's no orchestration to do, controllers/ exists as a thin pass-through so the import graph stays uniform across services. The day a use case needs to coordinate two services (or fan out to a queue), the controller is already there.

# src/controllers/user.py
from uuid import UUID

from tempest_fastapi_sdk import BasePaginationSchema

from src.schemas import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)
from src.services.user import UserService


class UserController:
    """Orchestrate user use cases.

    Today every method is a thin pass-through to ``UserService``. As
    soon as a use case needs to coordinate more than one service —
    e.g. signup also sends a welcome email and enqueues a CRM sync —
    the orchestration lives here, not in the router and not in the
    service.
    """

    def __init__(self, service: UserService) -> None:
        self.service: UserService = service

    async def create(self, data: UserCreateSchema) -> UserResponseSchema:
        return await self.service.create(data)

    async def get(self, user_id: UUID) -> UserResponseSchema:
        return await self.service.get(user_id)

    async def update(
        self,
        user_id: UUID,
        data: UserUpdateSchema,
    ) -> UserResponseSchema:
        return await self.service.update(user_id, data)

    async def soft_delete(self, user_id: UUID) -> None:
        await self.service.soft_delete(user_id)

    async def list_paginated(
        self,
        filters: UserFilterSchema,
    ) -> BasePaginationSchema[UserResponseSchema]:
        return await self.service.list_paginated(filters)

9. Dependency providers

api/dependencies/ is always a package. auth.py hosts the shared-secret / current-user dependencies; controllers.py (or services.py when there is no controller layer yet) hosts the factory providers the routers depend on. Never construct controllers or services inline inside the router file.

# src/api/dependencies/controllers.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import PasswordUtils

from src.api.app import db
from src.controllers.user import UserController
from src.db.repositories import UserRepository
from src.services.user import UserService


# Stateless utilities — instantiate once per process.
_passwords: PasswordUtils = PasswordUtils()


def get_user_controller(
    session: AsyncSession = Depends(db.session_dependency),
) -> UserController:
    """Wire repository → service → controller for a single request."""
    repository = UserRepository(session)
    service = UserService(repository=repository, passwords=_passwords)
    return UserController(service=service)
# src/api/dependencies/__init__.py
from src.api.dependencies.controllers import get_user_controller

__all__: list[str] = ["get_user_controller"]

10. Router

Routers receive controllers via FastAPI Depends — no inline construction, no business logic, no DB calls. Business endpoints sit under /api/<domain> (the prefix is added at the include site in src/api/app.py); meta endpoints (/health, /tool-spec) stay at the root prefix.

# src/api/routers/users.py
from uuid import UUID

from fastapi import APIRouter, Depends, status

from tempest_fastapi_sdk import BasePaginationSchema

from src.api.dependencies import get_user_controller
from src.controllers.user import UserController
from src.schemas import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)


router = APIRouter(prefix="/users", tags=["users"])


@router.post(
    "",
    response_model=UserResponseSchema,
    status_code=status.HTTP_201_CREATED,
)
async def create_user(
    data: UserCreateSchema,
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    return await controller.create(data)


@router.get("/{user_id}", response_model=UserResponseSchema)
async def get_user(
    user_id: UUID,
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    return await controller.get(user_id)


@router.patch("/{user_id}", response_model=UserResponseSchema)
async def update_user(
    user_id: UUID,
    data: UserUpdateSchema,
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    return await controller.update(user_id, data)


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
    user_id: UUID,
    controller: UserController = Depends(get_user_controller),
) -> None:
    await controller.soft_delete(user_id)


@router.get("", response_model=BasePaginationSchema[UserResponseSchema])
async def list_users(
    filters: UserFilterSchema = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> BasePaginationSchema[UserResponseSchema]:
    return await controller.list_paginated(filters)

11. Pagination

The pagination contract is enforced end-to-end by SDK primitives:

  • UserFilterSchema(BasePaginationFilterSchema) parses ?page=&size=&order_by=&ascending=&is_active=&name= from the query string and exposes .get_conditions() returning only the domain-level filters (without pagination keys).
  • UserRepository.paginate(...) runs the query with the filter dict + ordering + offset/limit + count, returning {items, total, page, size, pages}.
  • BasePaginationSchema[UserResponseSchema] wraps the result so OpenAPI documents the response shape correctly.
GET /api/users?page=2&size=20&order_by=name&ascending=true&is_active=true&name=ana

Returns:

{
    "items": [
        {"id": "...", "name": "Ana ...", "email": "...", ...},
        ...
    ],
    "total": 142,
    "page": 2,
    "size": 20,
    "pages": 8
}

Recipes

Authentication recipe

End-to-end signup + login + protected route using PasswordUtils and JWTUtils. Requires the [auth] extra.

Wire the utility singletons

# src/core/security.py
from datetime import timedelta

from tempest_fastapi_sdk import JWTUtils, PasswordUtils

from src.core.settings import settings


passwords = PasswordUtils(rounds=12)

tokens = JWTUtils(
    secret=settings.JWT_SECRET,
    algorithm=settings.JWT_ALGORITHM,
    default_ttl=timedelta(hours=settings.JWT_TTL_HOURS),
    issuer="my-app",
)

Signup

Reuse the UserService.create defined in the tutorial — it already hashes the password.

Login

# src/schemas/auth.py
from pydantic import EmailStr

from tempest_fastapi_sdk import BaseSchema


class LoginSchema(BaseSchema):
    email: EmailStr
    password: str


class TokenResponseSchema(BaseSchema):
    access_token: str
    token_type: str = "bearer"
# src/services/auth.py
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import JWTUtils, PasswordUtils, UnauthorizedException

from src.db.repositories import UserRepository
from src.schemas.auth import LoginSchema, TokenResponseSchema


class AuthService:
    def __init__(
        self,
        session: AsyncSession,
        passwords: PasswordUtils,
        tokens: JWTUtils,
    ) -> None:
        self.repo = UserRepository(session)
        self.passwords = passwords
        self.tokens = tokens

    async def login(self, data: LoginSchema) -> TokenResponseSchema:
        user = await self.repo.get_or_none({"email": data.email})
        if user is None or not self.passwords.verify(
            data.password, user.password_hash
        ):
            # Same error for both cases — don't leak which one failed.
            raise UnauthorizedException(message="E-mail ou senha inválidos")
        token = self.tokens.encode({"sub": str(user.id)})
        return TokenResponseSchema(access_token=token)
# src/api/routers/auth.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from src.api.app import db
from src.core.security import passwords, tokens
from src.schemas.auth import LoginSchema, TokenResponseSchema
from src.services.auth import AuthService


router = APIRouter(prefix="/auth", tags=["auth"])


def get_auth_service(
    session: AsyncSession = Depends(db.session_dependency),
) -> AuthService:
    return AuthService(session, passwords, tokens)


@router.post("/login", response_model=TokenResponseSchema)
async def login(
    data: LoginSchema,
    service: AuthService = Depends(get_auth_service),
) -> TokenResponseSchema:
    return await service.login(data)

Protect a route — JWT dependency

Use make_jwt_user_dependency to wire the bearer scheme + JWT decode + user load in one call. The single seam is user_loader(subject), an async callable that maps the JWT subject claim to your domain UserModel.

# src/api/dependencies/auth.py
from uuid import UUID

from tempest_fastapi_sdk import make_jwt_user_dependency

from src.api.app import db
from src.core.security import tokens
from src.db.models import UserModel
from src.db.repositories import UserRepository


async def load_user(subject: str) -> UserModel:
    """Resolve the JWT subject (a UUID string) to a persisted user.

    Opens its own session so the dependency stays request-scope-agnostic
    (the loader is called once per request, and SDK exceptions raised
    inside translate to the canonical 401/404 envelope).
    """
    async with db.get_session_context() as session:
        repo = UserRepository(session)
        return await repo.get_by_id(UUID(subject))


get_current_user = make_jwt_user_dependency(tokens, load_user)
get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True)
# Use in any route
@router.get("/me", response_model=UserResponseSchema)
async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema:
    return UserResponseSchema.model_validate(current)

Soft auth (optional user)

get_current_user_or_none above already uses soft=True — it returns None instead of raising on a missing or invalid token, so endpoints can work both authenticated and anonymous:

@router.get("/feed")
async def feed(
    current: UserModel | None = Depends(get_current_user_or_none),
) -> FeedResponseSchema:
    return await feed_service.list(viewer=current)

Under the hood soft=True calls tokens.decode_or_none (no exception on expired/invalid tokens) and skips the loader when the subject is missing.


File uploads recipe

Avatar endpoint with validation + cleanup. Requires the [upload] extra.

# src/core/storage.py
from tempest_fastapi_sdk import UploadUtils

from src.core.settings import settings


avatar_storage = UploadUtils(
    upload_dir=f"{settings.UPLOAD_DIR}/avatars",
    max_size_bytes=5 * 1024 * 1024,            # 5 MiB
    allowed_extensions={"png", "jpg", "jpeg", "webp"},
    allowed_mimetypes={"image/png", "image/jpeg", "image/webp"},
)
# src/api/routers/users.py (extension)
from fastapi import UploadFile

from src.api.dependencies import get_user_controller
from src.controllers.user import UserController
from src.core.storage import avatar_storage


@router.post("/{user_id}/avatar", response_model=UserResponseSchema)
async def upload_avatar(
    user_id: UUID,
    file: UploadFile,
    current: UserModel = Depends(get_current_user),
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    if current.id != user_id:
        raise ForbiddenException(message="Só pode editar o próprio avatar")
    path = await avatar_storage.save(file, subdir=str(user_id))
    return await controller.set_avatar(user_id, str(path))

Add set_avatar to both the service and the controller (the controller stays a thin pass-through unless orchestration is needed — e.g. firing an "avatar updated" event):

# src/services/user.py
class UserService:
    async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema:
        user = await self.repo.get_by_id(user_id)
        # Delete previous file when replacing.
        if user.avatar_path and user.avatar_path != path:
            avatar_storage.delete(user.avatar_path)
        user.avatar_path = path
        user = await self.repo.update(user)
        return self.repo.map_to_response(user)


# src/controllers/user.py
class UserController:
    async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema:
        return await self.service.set_avatar(user_id, path)

UploadUtils.save() raises FileTooLargeException (413) or InvalidFileTypeException (415) on rejection — the SDK's exception handler already returns the right status code with a code field on the response.

Serving the file back

Local-disk uploads are best served by an upstream (nginx / Caddy) so FastAPI doesn't stream bytes. For dev:

from fastapi.staticfiles import StaticFiles

app.mount(
    "/static/uploads",
    StaticFiles(directory=settings.UPLOAD_DIR),
    name="uploads",
)

Construct the public URL in the response schema:

class UserResponseSchema(BaseResponseSchema):
    name: str
    email: EmailStr
    avatar_url: str | None = None

    @field_validator("avatar_url", mode="before")
    @classmethod
    def _absolute_url(cls, value: str | None) -> str | None:
        if value is None:
            return None
        # avatar_path stored as relative path → public URL
        return f"/static/uploads/{value}"

Transactional email recipe

Password reset flow using EmailUtils + a short-lived JWT. Requires the [email] extra.

# src/core/mailer.py
from tempest_fastapi_sdk import EmailUtils

from src.core.settings import settings


mailer = EmailUtils(
    host=settings.SMTP_HOST,
    port=settings.SMTP_PORT,
    from_addr=settings.SMTP_FROM_ADDR,
    username=settings.SMTP_USERNAME,
    password=settings.SMTP_PASSWORD,
    use_starttls=True,
)
# src/services/password_reset.py
from datetime import timedelta

from tempest_fastapi_sdk import EmailUtils, JWTUtils, NotFoundException

from src.db.repositories import UserRepository


class PasswordResetService:
    def __init__(
        self,
        repo: UserRepository,
        tokens: JWTUtils,
        mailer: EmailUtils,
    ) -> None:
        self.repo = repo
        self.tokens = tokens
        self.mailer = mailer

    async def request_reset(self, email: str) -> None:
        """Send a password-reset link to `email`.

        Always returns silently — don't reveal whether the email
        is registered or not (avoids account enumeration).
        """
        user = await self.repo.get_or_none({"email": email})
        if user is None:
            return
        token = self.tokens.encode(
            {"sub": str(user.id), "purpose": "password_reset"},
            ttl=timedelta(minutes=15),
        )
        reset_url = f"https://my-app.com/reset-password?token={token}"
        await self.mailer.send(
            to=user.email,
            subject="Reset your password",
            body=f"Click here to reset your password: {reset_url}",
            html=f'<p>Click <a href="{reset_url}">here</a> to reset.</p>',
        )

    async def consume_reset(
        self,
        token: str,
        new_password: str,
        passwords: PasswordUtils,
    ) -> None:
        # `decode` raises InvalidTokenException / ExpiredTokenException
        # (both 401). Caught by the SDK handler.
        payload = self.tokens.decode(token)
        if payload.get("purpose") != "password_reset":
            raise InvalidTokenException()
        user = await self.repo.get_by_id(UUID(payload["sub"]))
        user.password_hash = passwords.hash(new_password)
        await self.repo.update(user)

Alembic migrations recipe

Full workflow: bootstrap → first migration → apply → CI gate.

Bootstrap once per project

# scripts/alembic_init.py
from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper(config_path="alembic.ini", db_url=settings.DB_URL)
helper.init(
    directory="alembic",
    metadata_module="app.db",        # exposes BaseModel
    metadata_attr="BaseModel",
    db_url=settings.DB_URL,
)

Run once: uv run python scripts/alembic_init.py.

This creates:

alembic.ini                 # SDK-curated config (UTC timezone, date-prefixed file template)
alembic/
├── env.py                  # SDK template (already wires target_metadata, compare_type, batch mode)
├── script.py.mako
└── versions/

Author migrations

# scripts/make_migration.py
import sys

from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
helper.revision(
    message=sys.argv[1],
    autogenerate=True,
)
uv run python scripts/make_migration.py "add users table"

Generated file lands at alembic/versions/2026_05_16_1432-ae12cd34_add_users_table.py — the date prefix means files sort chronologically and merge conflicts are obvious.

Apply on startup

# src/api/app.py — extend lifespan
import asyncio

from tempest_fastapi_sdk import AlembicHelper


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # Run pending migrations before serving traffic.
    helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
    await asyncio.to_thread(helper.upgrade)

    await db.connect()
    yield
    await db.disconnect()

CI gate — schema must match models

# scripts/check_migrations.py
import sys

from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
if not helper.check():
    print("Schema drift detected — run make_migration.py and commit.")
    sys.exit(1)
print("Schema is in sync.")
# .github/workflows/ci.yml
- name: Check migrations are in sync
  run: uv run python scripts/check_migrations.py

Utility helpers recipe

Small stateless helpers from tempest_fastapi_sdk.utils that the SDK itself relies on and that show up across every service. Available without any extra.

Helper Signature Purpose
utcnow() () -> datetime Current time as a timezone-aware UTC datetime — the SDK uses this for created_at / updated_at defaults.
to_utc(value) (datetime) -> datetime Coerce naive datetimes to UTC (assumed UTC) and aware datetimes to UTC via astimezone. Used by BaseResponseSchema field validators.
modify_dict(data, exclude=None, include=None) (dict, list[str] | None, dict | None) -> dict Single-pass filter + merge. Drop sensitive keys before logging or merge computed fields when mapping payloads to ORM models.

Timestamps the same way everywhere

utcnow is the canonical "now" for the SDK. Use it for soft-delete timestamps, JWT iat / exp, audit trails — anything where mixing naive and aware datetimes would burn you later.

from datetime import timedelta

from tempest_fastapi_sdk import to_utc, utcnow


now = utcnow()                      # timezone-aware UTC
expires_at = now + timedelta(hours=1)

# Normalize whatever the caller gave you
incoming = request.json()["scheduled_for"]              # naive or aware
scheduled_for = to_utc(datetime.fromisoformat(incoming))

A naive datetime is tagged with UTC (not converted from local time) so it's predictable in headless workers and Docker containers where time.timezone is anyone's guess.

Drop sensitive keys before logging / mapping

modify_dict is the tiny utility that powers BaseSchema.to_dict(exclude=..., include=...) and BaseModel.update_from_dict(...). Use it directly when you don't want to call into Pydantic round-trips:

from tempest_fastapi_sdk import LogUtils, modify_dict

log = LogUtils("app.users")

payload = {"email": "ana@example.com", "password": "s3cr3t", "name": "Ana"}

# Strip password before logging
log.info("user_signup", **modify_dict(payload, exclude=["password"]))

# Merge a computed hash before persisting
user_row = modify_dict(
    payload,
    exclude=["password"],
    include={"password_hash": passwords.hash(payload["password"])},
)

include wins over data, so it doubles as a "set or override" helper without mutating the source dict.

Where every other helper is documented

Every helper has its own recipe — this section is the quick map:

Helper Recipe
PasswordUtils, JWTUtils Authentication recipe
EmailUtils Transactional email recipe
UploadUtils File uploads recipe
LogUtils + configure_logging Structured logging & request IDs recipe
MetricsUtils (CPU/memory/disk/GPU) System metrics recipe
CPF, CNPJ, CPFOrCNPJ, PhoneBR, is_valid_*, normalize_*, only_digits BR document & phone validation recipe

BR document & phone validation recipe

tempest_fastapi_sdk.utils.regex ships ready-to-use regex patterns, validators, normalizers and Pydantic types for the identity/contact fields that show up in almost every Brazilian API. No extra required — pure stdlib + Pydantic (already a core dependency).

Symbol Kind Purpose
CPF_PATTERN, CNPJ_PATTERN, CPF_CNPJ_PATTERN, PHONE_BR_PATTERN re.Pattern[str] Compiled regex (masked or raw input).
is_valid_cpf, is_valid_cnpj, is_valid_cpf_cnpj (str) -> bool Format match + check-digit math. All-same-digit sequences rejected.
is_valid_phone_br (str) -> bool BR phone shape: optional +55, optional DDD, optional 9th digit.
normalize_cpf, normalize_cnpj, normalize_cpf_cnpj, normalize_phone_br (str) -> str Strip mask to digits-only; raise ValueError if invalid.
only_digits (str) -> str Strip every non-digit character.
CPF, CNPJ, CPFOrCNPJ, PhoneBR Annotated[str, AfterValidator(...)] Drop-in Pydantic field types — validate + normalize automatically.

Schema usage

from pydantic import EmailStr, Field

from tempest_fastapi_sdk import BaseSchema
from tempest_fastapi_sdk.utils import CPF, CPFOrCNPJ, PhoneBR


class CustomerCreateSchema(BaseSchema):
    """Payload for POST /customers.

    `document` accepts CPF or CNPJ in masked or raw form and is
    stored digits-only after validation. `phone` is normalized the
    same way. Invalid values surface as a Pydantic `ValidationError`
    (HTTP 422 via the SDK exception handler).
    """

    name: str = Field(min_length=1, max_length=128)
    email: EmailStr
    document: CPFOrCNPJ
    phone: PhoneBR

Valid input:

{
    "name": "Ana",
    "email": "ana@example.com",
    "document": "529.982.247-25",
    "phone": "+55 (11) 98888-7777"
}

After validation:

CustomerCreateSchema(...).document  # "52998224725"
CustomerCreateSchema(...).phone     # "5511988887777"

Manual validation (services, controllers, queue handlers)

from tempest_fastapi_sdk.utils import (
    is_valid_cpf_cnpj,
    normalize_cpf_cnpj,
    only_digits,
)

if not is_valid_cpf_cnpj(raw_document):
    raise ValidationException(message="Documento inválido")

document_digits = normalize_cpf_cnpj(raw_document)

Filtering by stored digits

The normalizers strip masks before saving, so repository filters and unique constraints all work on the canonical digits-only form:

await repo.get({"document": normalize_cpf_cnpj(query)})

Testing recipe

pytest + pytest-asyncio + in-memory SQLite + FastAPI TestClient.

Shared fixtures

# tests/conftest.py
from collections.abc import AsyncGenerator

import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import AsyncDatabaseManager

from src.api.app import create_app
from src.db import BaseModel       # importing BaseModel ensures models are registered


@pytest_asyncio.fixture
async def db() -> AsyncGenerator[AsyncDatabaseManager, None]:
    """Fresh in-memory DB per test."""
    manager = AsyncDatabaseManager("sqlite+aiosqlite:///:memory:")
    await manager.connect()
    await manager.create_tables()
    try:
        yield manager
    finally:
        await manager.drop_tables()
        await manager.disconnect()


@pytest_asyncio.fixture
async def session(db: AsyncDatabaseManager) -> AsyncGenerator[AsyncSession, None]:
    """Managed session bound to the in-memory DB."""
    async with db.get_session_context() as session:
        yield session


@pytest_asyncio.fixture
async def client(db: AsyncDatabaseManager) -> AsyncGenerator[TestClient, None]:
    """FastAPI TestClient with the SDK manager overridden to use SQLite."""
    app = create_app()
    # Override the session dependency to use the test DB.
    from src.api.app import db as production_db

    app.dependency_overrides[production_db.session_dependency] = db.session_dependency

    async with TestClient(app) as client:
        yield client

Repository test

# tests/repositories/test_user.py
import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from src.core.exceptions import UserNotFoundError
from src.db.models import UserModel
from src.db.repositories import UserRepository


class TestUserRepository:
    async def test_get_by_email_raises_when_missing(
        self, session: AsyncSession
    ) -> None:
        repo = UserRepository(session)
        with pytest.raises(UserNotFoundError):
            await repo.get({"email": "ghost@example.com"})

    async def test_add_and_get(self, session: AsyncSession) -> None:
        repo = UserRepository(session)
        user = await repo.add(
            UserModel(
                name="Ana", email="ana@example.com", password_hash="x"
            )
        )
        loaded = await repo.get_by_id(user.id)
        assert loaded.name == "Ana"

Endpoint test

# tests/api/test_users.py
from fastapi.testclient import TestClient


class TestUsersAPI:
    def test_create_user(self, client: TestClient) -> None:
        response = client.post(
            "/api/users",
            json={
                "name": "Ana",
                "email": "ana@example.com",
                "password": "hunter22",
            },
        )
        assert response.status_code == 201
        body = response.json()
        assert body["email"] == "ana@example.com"
        assert "password" not in body
        assert "password_hash" not in body

    def test_get_user_not_found(self, client: TestClient) -> None:
        response = client.get("/api/users/00000000-0000-0000-0000-000000000000")
        assert response.status_code == 404
        body = response.json()
        # SDK envelope is always {detail, code, details}
        assert body["code"] == "USER_NOT_FOUND"

tempest_fastapi_sdk.testing helpers

tempest_fastapi_sdk.testing ships framework-agnostic helpers that don't require pytest to be importable — wrap them in @pytest.fixture (or any other harness) inside the consuming project's conftest.py. Useful when a test doesn't need a full AsyncDatabaseManager (no lifespan, no health-check probes).

Helper Purpose
create_test_engine(url="sqlite+aiosqlite:///:memory:", **engine_kwargs) Build a throw-away AsyncEngine.
create_test_session_factory(engine) Build a sessionmaker bound to the engine.
init_test_metadata(engine, metadata=None) Create every SQLAlchemy table on the engine (defaults to BaseModel.metadata).
drop_test_metadata(engine, metadata=None) Drop every table.
test_database(url="sqlite+aiosqlite:///:memory:", metadata=None) Async context manager — yields an engine with metadata pre-created, drops everything and disposes on exit.
test_session(url="sqlite+aiosqlite:///:memory:", metadata=None) Async context manager — yields an AsyncSession on top of a fresh test_database.
# tests/conftest.py
from collections.abc import AsyncGenerator

import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession

from tempest_fastapi_sdk.testing import test_database, test_session


@pytest_asyncio.fixture
async def engine() -> AsyncGenerator[AsyncEngine, None]:
    """Yield a fresh in-memory SQLite engine for each test."""
    async with test_database() as e:
        yield e


@pytest_asyncio.fixture
async def session() -> AsyncGenerator[AsyncSession, None]:
    """Yield a managed AsyncSession bound to the in-memory engine."""
    async with test_session() as s:
        yield s

Use the one-shot test_session() context manager for ad-hoc tests that don't need cross-fixture sharing:

from tempest_fastapi_sdk.testing import test_session

from src.db.models import UserModel
from src.db.repositories import UserRepository


async def test_repo_directly() -> None:
    async with test_session() as session:
        repo = UserRepository(session)
        await repo.add(UserModel(name="Ana", email="ana@example.com", password_hash="x"))
        assert await repo.count() == 1

Pass metadata= when the project mixes the SDK BaseModel.metadata with a second, isolated metadata (rare — keep one BaseModel per service whenever possible).

Application bootstrap recipe

Section 2 of the tutorial shows the minimal create_app(). This recipe is the extended version, wiring everything tempest_fastapi_sdk.api ships — exception handlers, CORS, request-ID middleware, the health router with extra checks, a shared-secret token dependency and an extra Redis manager — all from the same canonical src/api/app.py location. The bootstrapping pattern stays identical; only the contents of create_app() grow.

# src/api/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI

from tempest_fastapi_sdk import (
    AsyncDatabaseManager,
    RequestIDMiddleware,
    apply_cors,
    configure_logging,
    make_health_router,
    make_token_dependency,
    register_exception_handlers,
)
from tempest_fastapi_sdk.cache import AsyncRedisManager

from src.core.settings import settings


configure_logging(level=settings.LOG_LEVEL, json_output=settings.LOG_JSON)

db = AsyncDatabaseManager(
    settings.DATABASE_URL,
    echo=settings.DATABASE_ECHO,
    pool_size=settings.DATABASE_POOL_SIZE,
    max_overflow=settings.DATABASE_MAX_OVERFLOW,
    pool_recycle=settings.DATABASE_POOL_RECYCLE,
)
redis = AsyncRedisManager(settings.REDIS_URL)
require_token = make_token_dependency(settings.TOKEN_SECRET)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    await db.connect()
    await redis.connect()
    try:
        yield
    finally:
        await redis.disconnect()
        await db.disconnect()


def create_app() -> FastAPI:
    """Build and configure the FastAPI app."""
    app = FastAPI(
        title="my-service",
        version=settings.VERSION,
        lifespan=lifespan,
    )

    app.add_middleware(RequestIDMiddleware)
    apply_cors(app, settings)
    register_exception_handlers(app)

    # Meta endpoints at the root prefix.
    app.include_router(
        make_health_router(
            db=db,
            checks={"redis": redis.health_check},
            version=settings.VERSION,
        ),
    )

    # Business endpoints under /api/<domain>, guarded by the shared secret.
    from src.api.routers import users

    app.include_router(
        users.router,
        prefix="/api",
        dependencies=[Depends(require_token)],
    )
    return app


app = create_app()

Key points:

  • src/server.py and main.py (one-liner) stay exactly as in Section 2 of the tutorial — only create_app() changes when you add primitives. Never start uvicorn via subprocess.run(["uvicorn", ...]); always import app from src.api.app or call uvicorn.run("src.api.app:app", ...) programmatically from src/server.py.
  • RequestIDMiddleware reads/writes X-Request-ID and seeds request_id_ctx so every log line emitted during the request carries the correlation ID.
  • apply_cors(app, settings) reads CORSSettings defaults; pass keyword overrides for one-off changes.
  • register_exception_handlers(app) wires every AppException subclass to the canonical {detail, code, details} envelope.
  • make_health_router(db=db, checks={"redis": redis.health_check}, version=...) mounts GET /health/liveness and GET /health/readiness (returns 503 when any check fails) at the root prefix.
  • make_token_dependency(secret) returns an async dependency that validates X-Token via hmac.compare_digest; pass an empty string to disable in dev. The dependency lives next to the rest of the auth glue in src/api/dependencies/auth.py once it grows beyond the one-liner above.

Structured logging & request IDs recipe

configure_logging installs a JSON handler on the root logger that emits one-line JSON records carrying the active request ID. LogUtils is a thin facade that adds level methods accepting structured **fields.

from tempest_fastapi_sdk import LogUtils, configure_logging
from tempest_fastapi_sdk.core import get_request_id

# Imperative — call once during bootstrap.
configure_logging(level="INFO", json_output=True)

# Facade — handy for service-wide singletons.
log = LogUtils("app.users", level="INFO")
log.info("user_created", user_id=str(user.id), email=user.email)
log.warning("login_throttled", ip="1.2.3.4", attempts=5)

try:
    risky()
except RuntimeError:
    log.exception("risky_failed", op="reconcile")  # appends traceback

# Surface the correlation ID outside the log line if needed.
request_id = get_request_id()

JSON output (single line — formatted here for readability):

{
  "timestamp": "2026-05-16T20:14:33.412+00:00Z",
  "level": "INFO",
  "logger": "app.users",
  "message": "user_created",
  "request_id": "d83e4b0c-7c2f-4bd6-aaa1-7d4f6cf5e5e9",
  "user_id": "9c1a5b2d-...",
  "email": "ana@example.com"
}

The middleware accepts a custom header name (RequestIDMiddleware(app, header_name="X-Correlation-ID")); the same header is echoed back on every response.

Settings mixins composition recipe

BaseAppSettings is the configured pydantic-settings base. The SDK also exposes composable mixins for the most common dependencies; pick the ones the service needs and put BaseAppSettings at the end of the MRO so its model_config wins.

# src/core/settings.py
from pydantic import Field

from tempest_fastapi_sdk import (
    BaseAppSettings,
    CORSSettings,
    DatabaseSettings,
    EmailSettings,
    JWTSettings,
    LogSettings,
    RabbitMQSettings,
    RedisSettings,
    ServerSettings,
    TaskIQSettings,
    TokenSettings,
    UploadSettings,
    WebPushSettings,
)


class Settings(
    ServerSettings,
    LogSettings,
    DatabaseSettings,
    RedisSettings,
    RabbitMQSettings,
    TaskIQSettings,
    JWTSettings,
    CORSSettings,
    EmailSettings,
    UploadSettings,
    TokenSettings,
    WebPushSettings,
    BaseAppSettings,
):
    """Service-wide settings."""

    VERSION: str = Field(default="0.0.0")


settings = Settings()

Each mixin owns its own env-var prefix — pick only the ones the service needs:

Mixin Env vars
ServerSettings SERVER_HOST, SERVER_PORT, SERVER_RELOAD, SERVER_DEBUG
LogSettings LOG_LEVEL, LOG_JSON
DatabaseSettings DATABASE_URL, DATABASE_ECHO, DATABASE_POOL_SIZE, DATABASE_MAX_OVERFLOW, DATABASE_POOL_RECYCLE
RedisSettings REDIS_URL, REDIS_DECODE_RESPONSES
RabbitMQSettings RABBITMQ_URL, RABBITMQ_PREFETCH_COUNT
TaskIQSettings TASKIQ_BROKER_URL, TASKIQ_RESULT_BACKEND_URL
JWTSettings JWT_SECRET, JWT_ALGORITHM, JWT_ACCESS_TTL_SECONDS, JWT_REFRESH_TTL_SECONDS, JWT_ISSUER
CORSSettings CORS_ORIGINS, CORS_ALLOW_CREDENTIALS, CORS_ALLOW_METHODS, CORS_ALLOW_HEADERS, CORS_EXPOSE_HEADERS, CORS_MAX_AGE
EmailSettings SMTP_HOST, SMTP_PORT, SMTP_USERNAME, SMTP_PASSWORD, SMTP_FROM_ADDR, SMTP_USE_TLS, SMTP_USE_SSL, SMTP_TIMEOUT_SECONDS
UploadSettings UPLOAD_DIR, UPLOAD_MAX_SIZE_BYTES, UPLOAD_ALLOWED_EXTENSIONS, UPLOAD_ALLOWED_MIMETYPES
TokenSettings TOKEN_SECRET
WebPushSettings VAPID_PUBLIC_KEY, VAPID_PRIVATE_KEY, VAPID_SUBJECT, WEBPUSH_DEFAULT_TTL_SECONDS

Breaking change in 0.8.0: ServerSettings previously exposed bare HOST / PORT / DEBUG / LOG_LEVEL / LOG_JSON fields. They were renamed to SERVER_HOST / SERVER_PORT / SERVER_RELOAD / SERVER_DEBUG, and LOG_LEVEL / LOG_JSON moved to the new LogSettings mixin. Update both your .env file (env var names) and any code reading settings.HOST etc.

Controllers & services layering recipe

BaseService[RepositoryT, ResponseT] and BaseController[ServiceT, ResponseT] are generic skeletons matching the SDK layering (router → controller → service → repository). They expose pass-through CRUD methods so simple endpoints can subclass them without overriding anything; you override only methods that need orchestration.

# src/services/user_service.py
from uuid import UUID

from tempest_fastapi_sdk import BaseService

from src.db.repositories import UserRepository
from src.schemas.user import UserCreate, UserResponse, UserUpdate
from src.utils.security import password_utils


class UserService(BaseService[UserRepository, UserResponse]):
    """Business logic for the user feature."""

    async def signup(self, data: UserCreate) -> UserResponse:
        # Business logic — hash the password, then delegate to the repo.
        instance = self.repository.map_to_model(
            {
                "name": data.name,
                "email": data.email,
                "password_hash": password_utils.hash(data.password),
            },
        )
        created = await self.repository.add(instance)
        return self.repository.map_to_response(created)


# src/controllers/user_controller.py
from tempest_fastapi_sdk import BaseController

from src.schemas.user import UserCreate, UserResponse
from src.services.user_service import UserService


class UserController(BaseController[UserService, UserResponse]):
    """Thin orchestration over UserService."""

    async def signup(self, data: UserCreate) -> UserResponse:
        # Pass-through today; the controller is the seam to add
        # cross-service coordination later (audit log, outbox event,
        # downstream notification, etc.) without touching the router.
        return await self.service.signup(data)


# src/api/dependencies/controllers.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from src.api.app import db
from src.controllers.user_controller import UserController
from src.db.repositories import UserRepository
from src.services.user_service import UserService


def get_user_controller(
    session: AsyncSession = Depends(db.session_dependency),
) -> UserController:
    return UserController(UserService(UserRepository(session)))


# src/api/routers/users.py
from fastapi import APIRouter, Depends, status

from src.api.dependencies.controllers import get_user_controller
from src.controllers.user_controller import UserController
from src.schemas.user import UserCreate, UserResponse

router = APIRouter(prefix="/users", tags=["users"])


@router.post(
    "/",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
)
async def create_user(
    data: UserCreate,
    controller: UserController = Depends(get_user_controller),
) -> UserResponse:
    return await controller.signup(data)

Keep controllers present even when they only pass through — the import graph stays uniform across services, so adding cross-cutting policy later doesn't change the router signature.

Audit & soft-delete mixins recipe

SoftDeleteMixin adds a deleted_at timestamp column with mark_deleted() / mark_restored() / is_deleted helpers. AuditMixin adds created_by / updated_by UUID columns with stamp_created_by(user_id) / stamp_updated_by(user_id) helpers. Mix them in alongside BaseModel:

# src/db/models/user.py
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import AuditMixin, BaseModel, SoftDeleteMixin


class UserModel(BaseModel, SoftDeleteMixin, AuditMixin):
    """Users — soft-deletable and audited."""

    name: Mapped[str] = mapped_column()
    email: Mapped[str] = mapped_column(unique=True)
    password_hash: Mapped[str] = mapped_column()

Filtering is the caller's responsibility — the mixin doesn't install a global filter. Hide soft-deleted rows from list endpoints by passing deleted_at=None (or filtering in your repository subclass):

async def list_alive(self) -> list[UserResponse]:
    instances = await self.repository.list(filters={"deleted_at": None})
    return [self.repository.map_to_response(i) for i in instances]

Stamping audit columns belongs to the service layer where the current user is in scope:

async def update(self, user_id: UUID, data: UserUpdate, *, actor_id: UUID) -> UserResponse:
    instance = await self.repository.get_by_id(user_id)
    instance.update_from_dict(data.model_dump(exclude_unset=True))
    instance.stamp_updated_by(actor_id)
    updated = await self.repository.update(instance)
    return self.repository.map_to_response(updated)

Use the mixin's helpers (mark_deleted / mark_restored) when you want the deleted_at semantics; use BaseRepository.soft_delete(id) when the existing is_active flag is enough.

Cursor pagination recipe

Cursor pagination scales better than offset pagination on big tables (no COUNT(*), stable under concurrent inserts) at the cost of losing random-access. The SDK provides CursorPaginationFilterSchema, CursorPaginationSchema[T] and the opaque encode_cursor / decode_cursor helpers.

# src/schemas/user.py
from tempest_fastapi_sdk import CursorPaginationFilterSchema, CursorPaginationSchema

from src.schemas.user import UserResponse


class UserCursorFilter(CursorPaginationFilterSchema):
    name: str | None = None  # ILIKE %value% via repository convention


UserCursorPage = CursorPaginationSchema[UserResponse]

Repository helper (cursor over created_at + id tie-break):

# src/db/repositories/user.py
from sqlalchemy import asc, desc

from tempest_fastapi_sdk import BaseRepository, decode_cursor, encode_cursor

from src.db.models.user import UserModel
from src.schemas.user import UserResponse


class UserRepository(BaseRepository[UserModel]):
    model = UserModel

    async def cursor_page(
        self,
        *,
        cursor: str | None,
        limit: int,
        ascending: bool,
        filters: dict[str, Any] | None = None,
    ) -> UserCursorPage:
        query = select(UserModel)
        if filters:
            query = self._apply_filters(query, filters)

        order = asc if ascending else desc
        query = query.order_by(order(UserModel.created_at), order(UserModel.id))

        if cursor is not None:
            state = decode_cursor(cursor)
            cmp = (UserModel.created_at, UserModel.id) > (state["value"], state["id"])
            query = query.where(cmp if ascending else ~cmp)

        query = query.limit(limit + 1)  # peek one ahead to set has_more
        result = await self.session.execute(query)
        rows = list(result.unique().scalars().all())
        has_more = len(rows) > limit
        rows = rows[:limit]
        next_cursor = (
            encode_cursor(
                {"id": str(rows[-1].id), "value": rows[-1].created_at.isoformat()},
            )
            if has_more and rows
            else None
        )
        return UserCursorPage(
            items=[self.map_to_response(r) for r in rows],
            next_cursor=next_cursor,
            has_more=has_more,
            limit=limit,
        )

Router:

@router.get("/", response_model=UserCursorPage)
async def list_users(
    f: UserCursorFilter = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> UserCursorPage:
    return await controller.service.repository.cursor_page(
        cursor=f.cursor,
        limit=f.limit,
        ascending=f.ascending,
        filters=f.get_conditions(),
    )

The cursor is opaque base64-url-safe JSON — clients never inspect it; they pass back the value verbatim until next_cursor becomes null.

Redis cache recipe

AsyncRedisManager wraps redis.asyncio with the same connect/disconnect/health-check surface as AsyncDatabaseManager. Install with [cache].

from tempest_fastapi_sdk.cache import AsyncRedisManager

cache = AsyncRedisManager(settings.REDIS_URL, decode_responses=True)

# Lifespan
await cache.connect()
...
await cache.disconnect()

# Direct use
await cache.client.set("user:123:name", "Ana", ex=300)
name = await cache.client.get("user:123:name")

# FastAPI dependency — yields the live client.
from fastapi import Depends
from redis.asyncio import Redis


@router.get("/cached")
async def cached_endpoint(
    redis: Redis = Depends(cache.client_dependency),
) -> dict[str, str]:
    value = await redis.get("greeting") or "hello"
    return {"value": value}

Wire the health check on the canonical router with make_health_router(checks={"redis": cache.health_check}) so readiness probes fail when Redis is down.

Server-Sent Events recipe

EventStream is an in-memory async queue feeding one SSE HTTP connection. ServerSentEvent encodes one frame; sse_response wraps the byte stream in a Starlette StreamingResponse with SSE-friendly headers.

# src/api/routers/events.py
import asyncio

from fastapi import APIRouter

from tempest_fastapi_sdk import EventStream, sse_response

router = APIRouter()


@router.get("/events")
async def events() -> "StreamingResponse":  # forward-declared by Starlette
    stream = EventStream(heartbeat_seconds=15.0)

    async def producer() -> None:
        for n in range(1, 4):
            await stream.publish({"n": n}, event="counter", id=str(n))
            await asyncio.sleep(1)
        await stream.close()

    asyncio.create_task(producer())
    return sse_response(stream.stream())

Browser side:

const es = new EventSource("/events");
es.addEventListener("counter", (e) => console.log("got", JSON.parse(e.data)));

heartbeat_seconds emits a : keepalive SSE comment when idle so load-balancers don't close long-lived connections. ServerSentEvent.data accepts strings, bytes or any JSON-serializable Python object — non-strings are JSON-encoded automatically. Pass retry= to hint the browser at the reconnect delay (milliseconds).

Web Push notifications recipe

WebPushDispatcher wraps the synchronous pywebpush library in asyncio.to_thread and surfaces the two errors the application cares about: WebPushGoneError (HTTP 404/410 — delete the subscription) and WebPushError (everything else). Install with [webpush].

# src/services/notifications.py
from tempest_fastapi_sdk import (
    WebPushDispatcher,
    WebPushGoneError,
    WebPushPayloadSchema,
    WebPushSubscriptionSchema,
)


dispatcher = WebPushDispatcher(
    settings.VAPID_PRIVATE_KEY,
    vapid_subject="mailto:ops@example.com",
    ttl_seconds=60,
)


async def notify_order_paid(
    subscription: WebPushSubscriptionSchema,
    order_id: str,
) -> None:
    payload = WebPushPayloadSchema(
        title="Pagamento confirmado",
        body=f"Pedido {order_id} aprovado.",
        icon="/static/icons/order.png",
        data={"orderId": order_id, "url": f"/orders/{order_id}"},
    )
    try:
        await dispatcher.send(subscription, payload)
    except WebPushGoneError:
        # Prune the subscription from your store.
        await subscriptions_repo.delete_by_endpoint(subscription.endpoint)


async def broadcast(subs: list[WebPushSubscriptionSchema], payload: WebPushPayloadSchema) -> None:
    gone = await dispatcher.send_many(subs, payload)
    if gone:
        await subscriptions_repo.delete_by_endpoints(gone)

WebPushSubscriptionSchema round-trips the exact JSON PushSubscription.toJSON() emits in the browser (it aliases expiration_timeexpirationTime), so you can store inbound subscriptions verbatim and replay them on dispatch.

Message queues — FastStream recipe

AsyncBrokerManager wraps any FastStream broker (RabbitMQ, Kafka, NATS, Redis Streams) with a uniform connect/disconnect/health-check surface. The broker instance is injected so the SDK doesn't pin a single transport.

Install with [queue] (pulls faststream[rabbit]). Pick the matching FastStream extra for other transports.

# src/queue/__init__.py
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel

from tempest_fastapi_sdk.queue import AsyncBrokerManager

from src.core.settings import settings


broker = RabbitBroker(settings.RABBITMQ_URL)
queue = AsyncBrokerManager(broker)


class OrderMessage(BaseModel):
    order_id: str
    user_id: str


@broker.subscriber("orders.paid")
async def handle_order_paid(msg: OrderMessage) -> None:
    await mark_order_paid(msg.order_id, msg.user_id)


# src/api/app.py lifespan
await queue.connect()
...
await queue.disconnect()


# Publish from anywhere in the application
await queue.publish(OrderMessage(order_id="abc", user_id="x"), queue="orders.paid")

The manager exposes:

  • connect() / disconnect() — idempotent; safe to call from FastAPI lifespan.
  • publish(message, *args, **kwargs) — passthrough to broker.publish with a RuntimeError guard when the broker isn't started.
  • lifespan() — async context manager handling start/stop, handy for short scripts.
  • broker_dependency — FastAPI Depends that yields the live broker.
  • health_check() / is_connected — true while the broker is started.

Wire it on the health router with make_health_router(checks={"queue": queue.health_check}).

Background tasks — TaskIQ recipe

AsyncTaskBrokerManager wraps any TaskIQ broker (AioPika for RabbitMQ, Redis, in-memory for tests). Install with [tasks] (pulls taskiq + taskiq-aio-pika).

# src/tasks/__init__.py
from taskiq_aio_pika import AioPikaBroker

from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager

from src.core.settings import settings


tasks = AsyncTaskBrokerManager(AioPikaBroker(settings.TASKIQ_BROKER_URL))


@tasks.task
async def send_welcome_email(to: str, name: str) -> None:
    await email_utils.send(
        to=to,
        subject="Bem-vindo!",
        body=f"Olá, {name} — sua conta foi criada.",
    )


# src/api/app.py lifespan
await tasks.connect()
...
await tasks.disconnect()


# Enqueue from a request handler
await send_welcome_email.kiq(to=user.email, name=user.name)

register_task(callable, task_name=..., **kwargs) registers a function without decorator syntax — useful when wiring third-party callables that you can't decorate at definition time. For tests, swap the broker for taskiq.InMemoryBroker() so kicked tasks execute synchronously.

The same lifespan guard rails as the queue manager apply: connect()/disconnect()/lifespan()/broker_dependency/health_check()/is_connected.

Periodic tasks scheduler recipe

AsyncTaskScheduler wraps taskiq.TaskiqScheduler + LabelScheduleSource so periodic tasks are declared with decorators alongside regular tasks and the scheduler is driven from the FastAPI lifespan. It does not execute task bodies — it kicks them into the same broker AsyncTaskBrokerManager wraps, so a worker process must be running to consume them. Requires the [tasks] extra.

# src/tasks/__init__.py
from datetime import timedelta

from taskiq_aio_pika import AioPikaBroker

from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager, AsyncTaskScheduler

from src.core.settings import settings


# Use TASKIQ_BROKER_URL (from TaskIQSettings) when the scheduler /
# task broker is a different broker than the FastStream queue
# (RABBITMQ_URL). Reuse the same RabbitMQ URL when they share the
# broker — both env vars can point to the same value.
broker = AioPikaBroker(settings.TASKIQ_BROKER_URL)
tasks = AsyncTaskBrokerManager(broker)
scheduler = AsyncTaskScheduler(broker)


@tasks.task
async def reconcile_invoices(batch_size: int = 100) -> None:
    """Background task — kicked by handlers or the scheduler."""
    ...


@scheduler.cron("*/5 * * * *")          # every five minutes
async def heartbeat() -> None:
    """Liveness ping written to the audit log."""
    ...


@scheduler.cron("0 9 * * MON-FRI", cron_offset="-03:00")  # 09:00 BRT, weekdays
async def daily_digest() -> None:
    ...


@scheduler.interval(seconds=30)         # every 30s
async def poll_remote_queue() -> None:
    ...


@scheduler.interval(timedelta(minutes=15))
async def warm_cache() -> None:
    ...

Wire it into the app lifespan next to the broker manager:

# src/api/app.py
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    await tasks.connect()
    await scheduler.connect()
    await scheduler.run_in_background()   # dev / single-process services
    try:
        yield
    finally:
        await scheduler.disconnect()
        await tasks.disconnect()

Decorator surface:

Method When to use
@scheduler.cron("*/5 * * * *", cron_offset=None) Cron expression; pass cron_offset (string like "-03:00" or timedelta) to anchor to a timezone other than UTC.
@scheduler.interval(seconds=30) / @scheduler.interval(timedelta(...)) Fixed-interval recurrence.
@scheduler.schedule([{...}, {...}]) Raw TaskIQ schedule list — combine triggers, use one-shot time, etc.
scheduler.register(func, schedule=[...], task_name=...) Register without decorator syntax (third-party callables).

Production deployments with multiple workers should run the standalone scheduler CLI instead of run_in_background(), so only one scheduler is active across the cluster:

taskiq scheduler src.tasks:scheduler.scheduler

(scheduler.scheduler is the inner TaskiqScheduler instance exposed on AsyncTaskScheduler.) The worker process stays the same:

taskiq worker src.tasks:tasks.broker

Lifecycle controls mirror the broker manager: connect() / disconnect() / lifespan() / run_in_background() / health_check() / is_connected.

System metrics recipe

MetricsUtils collects CPU, memory, disk and NVIDIA GPU usage via psutil + pynvml. Every method has a sync and an async variant (the async wrapper runs the same code via asyncio.to_thread). GPU sampling gracefully degrades to [] when pynvml or NVIDIA drivers are missing.

Install with [metrics].

from tempest_fastapi_sdk import MetricsUtils

# Synchronous, blocking call
snapshot = MetricsUtils.snapshot(disk_paths=["/", "/data"], cpu_interval=0.1)
print(snapshot.cpu.percent, snapshot.memory.percent)
for disk in snapshot.disks:
    print(disk.path, disk.percent)
for gpu in snapshot.gpus:
    print(gpu.name, gpu.utilization_percent, gpu.memory_used_bytes)

# Async — runs every collector concurrently via asyncio.gather
snapshot = await MetricsUtils.snapshot_async(disk_paths=["/"])


@router.get("/metrics")
async def metrics() -> dict[str, Any]:
    snap = await MetricsUtils.snapshot_async()
    return snap.to_dict()

Individual collectors are also available: MetricsUtils.cpu(interval=...), MetricsUtils.memory(), MetricsUtils.disk(path), MetricsUtils.disks(paths), MetricsUtils.gpus() — and their *_async variants. Each returns a typed dataclass (CPUMetrics, MemoryMetrics, DiskMetrics, GPUMetrics, SystemMetrics) with a to_dict() helper for JSON serialization.

Programmatic server entry point recipe

run_server is the canonical helper imported from src/server.py. It centralizes the host / port / reload defaults — pulling values from a ServerSettings-flavoured settings object when present — and keeps the entry point a single line.

# src/server.py
from tempest_fastapi_sdk import run_server

from src.api.app import app  # noqa: F401 — re-exported for external runners
from src.core.settings import settings


def run() -> None:
    """Start the API server programmatically."""
    run_server("src.api.app:app", settings=settings)


__all__: list[str] = ["app", "run"]
# main.py
from src.server import run

if __name__ == "__main__":
    run()

Resolution order for each kwarg is explicit argument → settings.SERVER_* → SDK default ("127.0.0.1" / 8000 / False). Extra uvicorn kwargs (workers=, log_config=, ssl_*=) are forwarded verbatim.

JWT bearer / current-user / role dependencies recipe

Four dependency factories live in tempest_fastapi_sdk.api.dependencies.auth — pick the level of abstraction you need.

Factory What you get
make_token_dependency(secret) Validate the X-Token shared-secret header (constant time).
make_bearer_token_dependency(tokens, soft=False) Decode Authorization: Bearer <jwt> and return the claims dict.
make_jwt_user_dependency(tokens, user_loader, soft=False, subject_claim="sub") Decode the bearer JWT, await user_loader(subject), return the loaded user.
make_role_dependency(tokens, ["admin"], require_all=False, roles_claim="roles") / make_permission_dependency(tokens, ["users:write"], require_all=True, permissions_claim="permissions") Decode the bearer JWT and gate the route on roles / permissions.
# src/api/dependencies/auth.py
from uuid import UUID

from tempest_fastapi_sdk import (
    JWTUtils,
    make_bearer_token_dependency,
    make_jwt_user_dependency,
    make_permission_dependency,
    make_role_dependency,
)

from src.api.app import db
from src.core.settings import settings
from src.db.models import UserModel
from src.db.repositories import UserRepository


tokens = JWTUtils(
    secret=settings.JWT_SECRET,
    algorithm=settings.JWT_ALGORITHM,
)


async def load_user(subject: str) -> UserModel:
    """Resolve the JWT subject (a UUID string) to a persisted user."""
    async with db.get_session_context() as session:
        repo = UserRepository(session)
        return await repo.get_by_id(UUID(subject))


require_bearer = make_bearer_token_dependency(tokens)
get_current_user = make_jwt_user_dependency(tokens, load_user)
get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True)

require_admin = make_role_dependency(tokens, ["admin"])
require_users_write = make_permission_dependency(tokens, ["users:write"])
# src/api/routers/users.py
from fastapi import APIRouter, Depends

from src.api.dependencies.auth import (
    get_current_user,
    require_admin,
    require_users_write,
)

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/me")
async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema:
    return UserResponseSchema.model_validate(current)


@router.delete("/{user_id}", dependencies=[Depends(require_admin)])
async def delete_user(user_id: UUID) -> None:
    ...


@router.patch(
    "/{user_id}/permissions",
    dependencies=[Depends(require_users_write)],
)
async def update_perms(user_id: UUID) -> None:
    ...

soft=True returns None instead of raising on missing/invalid tokens — useful for endpoints that work both authenticated and anonymous. subject_claim defaults to "sub" but can be any custom claim ("user_id", "uid", ...). Role dependencies accept either a string or a list of strings on the JWT claim; require_all=True requires every listed role/permission, False (default for roles, override for permissions) requires any.

CEP (Brazilian zipcode) recipe

CEP is an Annotated[str, AfterValidator(normalize_cep)] type — drop it into a Pydantic schema and inbound values are accepted as "01310-100" or "01310100", normalized to 8 digits, and rejected (ValidationError → HTTP 422 envelope) when they don't match the shape. CEPs have no check digits, so validation is format-only.

from tempest_fastapi_sdk import BaseSchema
from tempest_fastapi_sdk.utils import CEP


class AddressCreateSchema(BaseSchema):
    cep: CEP
    street: str
    number: str

Imperative variants: is_valid_cep(value), normalize_cep(value), plus CEP_PATTERN for raw regex use. Use them inside services / queue handlers where you don't want a Pydantic round-trip.

Cache decorator recipe

@cached(redis, ttl=..., key_prefix=...) memoizes the result of an async function in Redis. Cache keys are derived from the function's __qualname__ plus a SHA-256 of args/kwargs; pass key_prefix= to namespace entries so invalidation works by prefix scan.

from tempest_fastapi_sdk.cache import AsyncRedisManager, cached

from src.core.settings import settings


redis = AsyncRedisManager(settings.REDIS_URL)


@cached(redis, ttl=300, key_prefix="users:")
async def get_user_profile(user_id: str) -> dict[str, str]:
    """Hits Redis on warm cache; runs the body once every 5 minutes."""
    return await load_from_db(user_id)


# Selectively bypass the cache (read AND write) for some calls
@cached(
    redis,
    ttl=60,
    skip_cache=lambda args, kwargs: kwargs.get("fresh") is True,
)
async def list_orders(user_id: str, *, fresh: bool = False) -> list[dict]:
    ...

Defaults: ttl=300 seconds (0 disables expiry), serializer=json.dumps / deserializer=json.loads. Override serializer / deserializer for non-JSON payloads (Pydantic models — pass model_dump_json / MyModel.model_validate_json, or use pickle.dumps / pickle.loads for arbitrary objects). Corrupt cached values fall back to running the wrapped function and warn on the SDK logger.

Tool-spec router recipe

make_tool_spec_router(spec) mounts a GET /tool-spec endpoint exposing a machine-readable manifest at the root prefix — meant to sit alongside /health/liveness so external callers can discover capabilities without parsing the full OpenAPI document.

# src/api/app.py
from tempest_fastapi_sdk import (
    make_health_router,
    make_tool_spec_router,
)


def _tool_spec() -> dict[str, object]:
    """Computed per request — keeps version + counts in sync with state."""
    return {
        "service": "my-service",
        "version": settings.VERSION,
        "tools": [
            {"path": "/api/users", "method": "GET", "summary": "List users"},
            {"path": "/api/orders", "method": "POST", "summary": "Place order"},
        ],
    }


def create_app() -> FastAPI:
    app = FastAPI(...)
    app.include_router(make_health_router(db=db))
    app.include_router(make_tool_spec_router(_tool_spec))
    ...
    return app

Pass a dict (served verbatim), a sync callable (called every request) or an async callable (awaited). Override path= to expose the manifest at a different URL or tag= to group it under a different OpenAPI tag.

Webhook signature verification recipe

WebhookSignatureVerifier validates HMAC-signed inbound webhooks (Stripe / GitHub style) and exposes a FastAPI dependency that reads the raw body, checks the signature with hmac.compare_digest, and yields the body bytes so the route handler can re-parse it without re-reading the stream.

# src/api/dependencies/webhooks.py
from tempest_fastapi_sdk import WebhookSignatureVerifier

from src.core.settings import settings


github = WebhookSignatureVerifier(
    secret=settings.GITHUB_WEBHOOK_SECRET,
    algorithm="sha256",
    header_name="X-Hub-Signature-256",
    prefix="sha256=",
)
stripe = WebhookSignatureVerifier(
    secret=settings.STRIPE_WEBHOOK_SECRET,
    algorithm="sha256",
    header_name="Stripe-Signature",
    encoding="hex",
)
# src/api/routers/webhooks.py
from fastapi import APIRouter, Depends

from src.api.dependencies.webhooks import github

router = APIRouter(prefix="/webhooks", tags=["webhooks"])


@router.post("/github")
async def github_event(body: bytes = Depends(github.dependency())) -> None:
    payload = json.loads(body)
    ...

Supports hex (default) and base64 encodings, any hashlib algorithm guaranteed across platforms, and an optional prefix (e.g. "sha256=") stripped before comparison. Use the imperative verifier.verify(body, signature) from queue handlers when validation happens outside the FastAPI pipeline.

Pagination Link headers recipe

build_pagination_link_header emits an RFC 8288 Link header with first / prev / next / last rels — pair it with (or use instead of) the BasePaginationSchema body wrapper for REST clients that expect GitHub-style headers. Existing query parameters on the base URL are preserved.

from fastapi import Request, Response

from tempest_fastapi_sdk import (
    BasePaginationSchema,
    build_pagination_link_header,
)


@router.get("", response_model=list[UserResponseSchema])
async def list_users(
    request: Request,
    response: Response,
    filters: UserFilterSchema = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> list[UserResponseSchema]:
    page: BasePaginationSchema[UserResponseSchema] = await controller.list_paginated(filters)
    response.headers["Link"] = build_pagination_link_header(
        str(request.url),
        page=page.page,
        size=page.size,
        pages=page.pages,
    )
    response.headers["X-Total-Count"] = str(page.total)
    return page.items

Tweak page_param= / size_param= when your service uses non-standard query parameter names (e.g. offset / limit). Pass extra_params={"sort": "name"} to bake the current sort/filter state into every link.

Rate limit middleware recipe

RateLimitMiddleware is a lightweight in-process sliding-window limiter — each unique key (client IP by default) is allowed at most max_requests requests inside every window_seconds window. Exceeded requests get a 429 Too Many Requests with a Retry-After header.

# src/api/app.py
from tempest_fastapi_sdk import RateLimitMiddleware


def create_app() -> FastAPI:
    app = FastAPI(...)
    app.add_middleware(
        RateLimitMiddleware,
        max_requests=120,
        window_seconds=60.0,
        exempt_paths=("/health/liveness", "/health/readiness"),
    )
    ...

Pass key_func= to partition state by tenant header, authenticated user, or any request attribute:

def by_tenant(request: Request) -> str:
    return request.headers.get("X-Tenant", request.client.host or "anon")


app.add_middleware(
    RateLimitMiddleware,
    max_requests=600,
    window_seconds=60.0,
    key_func=by_tenant,
)

The state is held in-process — for multi-worker deployments either run a single uvicorn worker behind a single reverse-proxy node, or push rate limiting to the edge (nginx / Cloudflare / AWS WAF). The middleware is intentionally simple; a Redis-backed sliding-window limiter is one issue away if it shows up as a real need.

Outbox dispatcher pattern recipe

The transactional outbox pattern keeps a "to publish" table in the same database as the domain rows, so writing the row and recording the side-effect happen in a single transaction. A worker reads the outbox in order and publishes to RabbitMQ (FastStream) / TaskIQ, marking each row as dispatched only after the broker ACKs. Crashes between commit and publish replay safely on the next poll.

The SDK does not ship a dedicated OutboxDispatcher primitive — the implementation is short, opinionated, and benefits from staying in the service's db/models/ + tasks/ boundary. Use the recipe below.

# src/db/models/outbox.py
from sqlalchemy import JSON, String
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import BaseModel


class OutboxEventModel(BaseModel):
    """One row per domain event waiting to be published."""

    topic: Mapped[str] = mapped_column(String(128), nullable=False, index=True)
    payload: Mapped[dict] = mapped_column(JSON, nullable=False)
    status: Mapped[str] = mapped_column(
        String(16),
        nullable=False,
        default="pending",
        index=True,
    )
    # is_active / created_at / updated_at come from BaseModel.
# src/db/repositories/outbox.py
from sqlalchemy import select, update

from tempest_fastapi_sdk import BaseRepository

from src.db.models import OutboxEventModel


class OutboxRepository(BaseRepository[OutboxEventModel]):
    model: type[OutboxEventModel] = OutboxEventModel

    async def claim_pending(self, *, limit: int = 100) -> list[OutboxEventModel]:
        """Lock-free claim — fine for single-worker dispatcher."""
        stmt = (
            select(OutboxEventModel)
            .where(OutboxEventModel.status == "pending")
            .order_by(OutboxEventModel.created_at)
            .limit(limit)
        )
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def mark_dispatched(self, ids: list[str]) -> None:
        await self.session.execute(
            update(OutboxEventModel)
            .where(OutboxEventModel.id.in_(ids))
            .values(status="dispatched"),
        )
        await self.session.commit()
# src/services/orders.py — produce side
from src.db.models import OrderModel, OutboxEventModel


class OrderService:
    async def place_order(self, data: OrderCreateSchema) -> OrderResponseSchema:
        order = OrderModel(**data.to_dict())
        self.repo.session.add(order)
        # Same transaction as the order row.
        self.repo.session.add(
            OutboxEventModel(
                topic="orders.placed",
                payload={"order_id": str(order.id), "amount": order.amount},
            ),
        )
        await self.repo.session.flush()
        await self.repo.session.commit()
        return self.repo.map_to_response(order)
# src/tasks/__init__.py — dispatcher side
from tempest_fastapi_sdk.tasks import AsyncTaskScheduler

from src.api.app import broker as queue_broker  # FastStream AsyncBrokerManager
from src.api.app import db

scheduler = AsyncTaskScheduler(broker)


@scheduler.interval(seconds=5)
async def dispatch_outbox() -> None:
    """Poll the outbox and publish each pending event."""
    async with db.get_session_context() as session:
        repo = OutboxRepository(session)
        events = await repo.claim_pending(limit=100)
        if not events:
            return
        dispatched: list[str] = []
        for event in events:
            try:
                await queue_broker.publish(event.payload, event.topic)
                dispatched.append(str(event.id))
            except Exception:  # noqa: BLE001 — retry on next tick
                continue
        if dispatched:
            await repo.mark_dispatched(dispatched)

Trade-offs to keep in mind:

  • Order is best-effort. When a batch contains one failing publish, every later event in the same batch still runs — but they're still published in created_at order. If strict ordering matters, break on the first failure.
  • Single dispatcher. The naive claim_pending does not lock rows; running multiple dispatcher workers will double-publish. Use SELECT ... FOR UPDATE SKIP LOCKED on PostgreSQL when you need to scale out.
  • Retention. Add a periodic TRUNCATE-style job to delete dispatched rows older than N days, otherwise the outbox table grows unbounded.
  • At-least-once. Consumers must be idempotent — the dispatcher can crash after publishing but before mark_dispatched.

Migration guide 0.7 → 0.8

0.8.0 renames every field on ServerSettings, extracts log fields to a new LogSettings mixin, and adds eleven other primitives. The renames are the only breaking changes — every new primitive is opt-in.

1. Rename env vars

Old New Mixin
HOST SERVER_HOST ServerSettings
PORT SERVER_PORT ServerSettings
DEBUG SERVER_DEBUG ServerSettings
(new) SERVER_RELOAD ServerSettings
LOG_LEVEL LOG_LEVEL moved to LogSettings
LOG_JSON LOG_JSON moved to LogSettings

Mechanical sed on every .env / docker-compose.yml / deployment manifest:

sed -i \
  -e 's/^HOST=/SERVER_HOST=/' \
  -e 's/^PORT=/SERVER_PORT=/' \
  -e 's/^DEBUG=/SERVER_DEBUG=/' \
  .env .env.example .env.test

LOG_LEVEL and LOG_JSON keep their names — only the mixin moves.

2. Rename code references

# `settings.HOST` → `settings.SERVER_HOST`, same for PORT/DEBUG
grep -rn "settings\.\(HOST\|PORT\|DEBUG\)\b" src/ tests/

Replace each match with the SERVER_* form. If a service was using the old settings.DEBUG flag for application-level debug behavior, switch to settings.SERVER_DEBUG; if it was only being read for uvicorn auto-reload, switch to settings.SERVER_RELOAD.

3. Mix LogSettings into the project Settings

 from tempest_fastapi_sdk import (
     BaseAppSettings,
     CORSSettings,
     DatabaseSettings,
     JWTSettings,
+    LogSettings,
     RabbitMQSettings,
     RedisSettings,
     ServerSettings,
 )


 class Settings(
     ServerSettings,
+    LogSettings,
     DatabaseSettings,
     RedisSettings,
     RabbitMQSettings,
     JWTSettings,
     CORSSettings,
     BaseAppSettings,
 ):
     ...

Skip this step if the service never read settings.LOG_LEVEL / settings.LOG_JSONconfigure_logging accepts the values as keyword arguments directly.

4. (Optional) Adopt the new primitives

Pick what fits. None of these are required.

5. Verify

uv sync                      # picks up new pyproject deps
uv run pytest -q             # full suite
uv run ruff check src tests  # confirm no `HOST`/`PORT`/`DEBUG` references slipped

If pytest fails with a Pydantic ValidationError referencing HOST / PORT / DEBUG, an env var was not renamed (look at the process environment or .env).


Reference

BaseRepository methods

Method Purpose Raises on miss
get(filters, for_update=False) Single record matching filters not_found_exception
get_or_none(filters, for_update=False) Single record or None
get_by_id(id, for_update=False) Shortcut for get({"id": id}) not_found_exception
exists(filters) bool without loading the row
first(filters, order_by, ascending) First match ordered
list(filters, order_by, ascending) All rows; [] when empty
paginate(filters, order_by, page, page_size, ascending, query=) dict with items, total, page, size, pages
count(filters) Row count
add(model) Insert single ConflictException on integrity
add_all(models) Insert batch ConflictException on integrity
update(model) Commit mutated instance ConflictException on integrity
update_many(models) Commit batch ConflictException on integrity
bulk_update(filters, values) UPDATE ... WHERE mass mutation; rejects empty filters ConflictException on integrity
delete(id) Hard delete by PK not_found_exception
delete_many(filters) Hard delete by filter
delete_batch(ids) Hard delete several PKs
soft_delete(id) Flip is_active=False; returns row not_found_exception
restore(id) Flip is_active=True; returns row not_found_exception
map_to_schema(instance) Override in subclass NotImplementedError
map_to_response(instance) Override in subclass NotImplementedError
map_to_model(data) Default: self.model(**data)

Filter dict conventions

The dict passed to get / list / paginate / count / exists / delete_many / bulk_update understands these patterns out of the box:

Filter shape Generated SQL
{"col": value} col = value
{"col": [a, b]} col IN (a, b)
{"col": True} / {"col": False} col IS TRUE / col IS FALSE
{"name": "ana"} (string field literally named name) name ILIKE '%ana%'
{"col": date(2024, 1, 1)} (date value) date(col) = '2024-01-01'
{"start_in": date(...)} date(date_col_or_created_at) >= ...
{"end_in": date(...)} date(date_col_or_created_at) <= ...
{"col": None} filter is skipped (omit-when-None semantics)

Pass the dict from BasePaginationFilterSchema.get_conditions() for query-string-driven filters.

Lifecycle managers

Every SDK-shipped manager follows the same core shape: connect() to start, disconnect() to stop, and health_check() to plug into make_health_router(checks=...). Brokers and the scheduler additionally expose lifespan() (async context manager) and is_connected (read-only state). The tables below list the manager-specific surface.

AsyncDatabaseManager

Method / Property Purpose
__init__(url, *, echo=False, pool_size=10, max_overflow=20, pool_recycle=3600, **engine_kwargs) Build the manager (engine is lazy).
await connect() Create the engine + sessionmaker.
await disconnect() Dispose the engine.
is_connected (property) True while the engine is alive.
db_url_safe (property) DSN with password redacted (safe to log).
await get_session() Return a single AsyncSession.
async with get_session_context() Yield an AsyncSession; commits on success, rolls back on raise.
async session_dependency() FastAPI Depends-compatible generator.
await create_tables() Issue CREATE TABLE for every model on BaseModel.metadata (tests / local dev — production schemas go through Alembic).
await drop_tables() Issue DROP TABLE for every model on BaseModel.metadata.
await health_check() Ping with SELECT 1; returns bool.

AsyncRedisManager (extra: [cache])

Method / Property Purpose
__init__(url, *, decode_responses=True, **client_kwargs) Wrap a redis.asyncio.Redis client.
await connect() Build the client.
await disconnect() Close the client + release the pool.
client (property) Live Redis instance; raises RuntimeError before connect.
async with get_client_context() Yield the same client inside an async with.
async client_dependency() FastAPI Depends-compatible generator.
await health_check() PING returns bool.

Pair with @cached(redis, ttl=..., key_prefix=...) for function-level memoization — see the Cache decorator recipe.

AsyncBrokerManager (extra: [queue])

Method / Property Purpose
__init__(broker) Wrap any FastStream broker (RabbitBroker, KafkaBroker, ...).
await connect() Start the broker; idempotent.
await disconnect() Stop the broker.
broker (attribute) The wrapped FastStream broker — use broker.publisher(...) / broker.subscriber(...) directly.
await publish(message, *args, **kwargs) Forward to broker.publish; raises before connect.
async with lifespan() Connect on enter, disconnect on exit.
async broker_dependency() FastAPI Depends-compatible generator yielding the broker.
await health_check() True while the broker is started.
is_connected (property) Read-only state.

AsyncTaskBrokerManager (extra: [tasks])

Method / Property Purpose
__init__(broker) Wrap any TaskIQ broker (AioPikaBroker, RedisBroker, InMemoryBroker, ...).
await connect() broker.startup(); idempotent.
await disconnect() broker.shutdown().
broker (attribute) The wrapped TaskIQ broker.
task(*args, **kwargs) Decorator forwarding to broker.task.
register_task(func, *, task_name=None, **kwargs) Register without decorator syntax.
async with lifespan() Connect on enter, disconnect on exit.
async broker_dependency() FastAPI Depends-compatible generator.
await health_check() True while the broker is started.
is_connected (property) Read-only state.

AsyncTaskScheduler (extra: [tasks])

Method / Property Purpose
__init__(broker, sources=None) Wrap TaskiqScheduler + LabelScheduleSource (default).
broker (attribute) Same broker tasks are kicked into.
sources (attribute) List of ScheduleSource instances.
scheduler (attribute) Underlying taskiq.TaskiqScheduler.
@cron(expr, *, cron_offset=None, task_name=None, **labels) Register a cron-scheduled task.
@interval(seconds_or_timedelta, *, task_name=None, **labels) Register a fixed-interval task.
@schedule(spec, *, task_name=None, **labels) Register with raw TaskIQ schedule list.
register(func, *, schedule, task_name=None, **labels) Decorator-free registration.
await connect() scheduler.startup() + every source.startup().
await disconnect() Cancel the background loop (if any) and shut down.
await run_in_background() Spawn an in-process SchedulerLoop task (dev / single-process).
async with lifespan() Connect on enter, disconnect on exit.
await health_check() True while started and (when applicable) the loop is alive.
is_connected (property) Read-only state.

Production deployments should run the standalone CLI instead of run_in_background():

taskiq worker src.tasks:tasks.broker
taskiq scheduler src.tasks:scheduler.scheduler

Error envelope

Every AppException (and any subclass) is serialized by register_exception_handlers into:

{
    "detail": "Human-readable message",
    "code": "MACHINE_READABLE_CODE",
    "details": {"any": "structured context"}
}
Exception Default status_code Default code
AppException 500 INTERNAL_SERVER_ERROR
NotFoundException 404 NOT_FOUND
ConflictException 409 CONFLICT
ValidationException 422 VALIDATION_ERROR
UnauthorizedException 401 UNAUTHORIZED
InvalidTokenException 401 INVALID_TOKEN
ExpiredTokenException 401 TOKEN_EXPIRED
ForbiddenException 403 FORBIDDEN
FileTooLargeException 413 FILE_TOO_LARGE
InvalidFileTypeException 415 INVALID_FILE_TYPE

Subclasses set message/code/status_code as class attributes; instances can override message and attach a details dict at construction time.


Conventions

  • Layered architecture: routers → controllers → services → repositories. Never skip layers.
  • Async-first: every I/O method is async. Use SQLAlchemy 2.0 patterns (select, not session.query()).
  • Collections return []: never raise on empty results. Only single-resource lookups raise NotFoundException.
  • Soft delete by default: is_active=False instead of physical delete when applicable.
  • UTC everywhere: timestamps normalized via to_utc; BaseResponseSchema enforces this on field validators.
  • Double quotes: enforced by ruff format.
  • Full typing: every parameter, return value and class attribute is typed. Any is explicit, never implicit.
  • Docstrings in English: Google-style covering description / args / returns / raises.
  • Frontend branches on code, not on the (translatable) message.

Development

make install     # uv sync --all-extras
make test        # pytest with coverage
make lint        # ruff check .
make fmt         # ruff format .
make type        # mypy --strict
make check       # lint + fmt-check + type + test (every gate)
make ci          # check + build + smoke install in a clean venv (mirrors GitHub Actions)
make build       # uv build → dist/
make clean       # remove caches and build artifacts

Run make (or make help) to list every target. The Makefile is just thin wrappers around uv — direct invocations still work too:

uv sync --all-extras
uv run pytest
uv run ruff check .
uv run mypy tempest_fastapi_sdk
uv build

The CI gate (.github/workflows/ci.yml) runs the equivalent of make ci on every push and pull request against main. The wheel-build smoke step installs the freshly built artifact into a clean Python 3.13 virtualenv and imports the top-level surface to guard against the empty-wheel / missing-package-data class of bugs.


Release

Releases are published to PyPI automatically when a v*.*.* tag is pushed.

The pipeline (.github/workflows/release-pypi.yml) does three things:

  1. Version sanity check. The tag (v0.1.0), pyproject.toml's version field and tempest_fastapi_sdk.__version__ must all match. Mismatched releases abort before any artifact is built.
  2. Build + verify. Run the full validation suite (tests + lint + mypy), build sdist/wheel with uv build, check metadata with twine check, and verify the wheel actually contains the package files + the bundled env.py.template Alembic template.
  3. Publish via Trusted Publishing. Uses OIDC against PyPI — no long-lived API tokens stored in repo secrets.

Cutting a release

make release VERSION=0.2.0

This single target:

  1. Refuses to run if the working tree is dirty.
  2. Bumps pyproject.toml and tempest_fastapi_sdk/__init__.py to the requested version.
  3. Runs make check (lint + format + mypy + tests) so a broken commit never gets tagged.
  4. Commits the bump as chore: release v0.2.0 and creates the v0.2.0 tag locally.
  5. Prints the two git push commands you still need to run — pushing is left manual on purpose so you can review the commit one last time.
# Review then push
git show v0.2.0
git push origin main
git push origin v0.2.0

GitHub Actions picks up the tag, runs the release pipeline and publishes the artifacts to PyPI.

Manual flow (no Makefile) — same result:

$EDITOR pyproject.toml                              # version = "0.2.0"
$EDITOR tempest_fastapi_sdk/__init__.py             # __version__ = "0.2.0"
git commit -am "chore: release v0.2.0"
git tag v0.2.0
git push origin main v0.2.0

One-time PyPI Trusted Publishing setup

PyPI needs to know which workflow is allowed to publish on the project's behalf. Done once per project:

  1. Create the project on PyPI (name: tempest-fastapi-sdk). For brand-new projects, register a placeholder release manually first or use pending publishers so the first OIDC-driven upload can create it.
  2. On the project's PyPI settings page, add a Trusted Publisher pointing to:
    • Owner: mauriciobenjamin700
    • Repository: tempest-fastapi-sdk
    • Workflow filename: release-pypi.yml
    • Environment: pypi (must match release-pypi.yml's environment.name)
  3. In the GitHub repository, create an environment named pypi (Settings → Environments → New environment). Optionally restrict deployments to tags matching v*.*.* for an extra safety net.

After this, every v*.*.* tag triggers a publish. No PYPI tokens are stored anywhere.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tempest_fastapi_sdk-0.8.0.tar.gz (329.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tempest_fastapi_sdk-0.8.0-py3-none-any.whl (140.4 kB view details)

Uploaded Python 3

File details

Details for the file tempest_fastapi_sdk-0.8.0.tar.gz.

File metadata

  • Download URL: tempest_fastapi_sdk-0.8.0.tar.gz
  • Upload date:
  • Size: 329.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for tempest_fastapi_sdk-0.8.0.tar.gz
Algorithm Hash digest
SHA256 c23ba2cad3bb84f4e4f396d9a7605331b6ca4a418ef9491c98ba60a4dd6c971f
MD5 3be256abdf70fefa6f75d8af3306d349
BLAKE2b-256 e0a524eee775f7f170e1b4254ab416491c7d731d3cc852b3a7ea21038cec6120

See more details on using hashes here.

Provenance

The following attestation bundles were made for tempest_fastapi_sdk-0.8.0.tar.gz:

Publisher: release-pypi.yml on mauriciobenjamin700/tempest-fastapi-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file tempest_fastapi_sdk-0.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for tempest_fastapi_sdk-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c2ebce773a3cd074b4d594b4fc0658ba3a92bcd9c466e0e283e27f387046dff8
MD5 1ad2785919cda3d1c086447a0e270bb5
BLAKE2b-256 a83dcb24095181dff43083a9c10ec4f3e11951b2a9b4e20c415ba65e755a5897

See more details on using hashes here.

Provenance

The following attestation bundles were made for tempest_fastapi_sdk-0.8.0-py3-none-any.whl:

Publisher: release-pypi.yml on mauriciobenjamin700/tempest-fastapi-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page