Shared FastAPI building blocks: base schemas, ORM model, async repository, exceptions, pagination and settings — the conventions used across Tempest projects.
Project description
tempest-fastapi-sdk
Shared FastAPI/SQLAlchemy/Pydantic building blocks used across Tempest projects: base schemas, ORM model, async repository, pagination, settings, exceptions, Alembic helper, FastStream/TaskIQ broker managers, Redis cache, Server-Sent Events, Web Push and the utility classes (PasswordUtils, JWTUtils, EmailUtils, UploadUtils, MetricsUtils, LogUtils).
The goal is to start every new backend with the same opinionated foundation already in place — no copy-pasting BaseModel, no rewriting the same CRUD repository, no re-inventing the exception envelope.
Table of contents
- Install
- What's inside
- Architecture overview
- Tutorial — building the Users feature
- Recipes
- Authentication
- File uploads
- Transactional email
- Alembic migrations
- Utility helpers (
utcnow,to_utc,modify_dict) - BR document & phone validation
- Testing
- Application bootstrap (
create_app) - Structured logging & request IDs
- Settings mixins composition
- Controllers & services layering
- Audit & soft-delete mixins
- Cursor pagination
- Redis cache (
AsyncRedisManager) - Server-Sent Events (SSE)
- Web Push notifications
- Message queues — FastStream (
AsyncBrokerManager) - Background tasks — TaskIQ (
AsyncTaskBrokerManager) - Periodic tasks scheduler (
AsyncTaskScheduler) - System metrics (
MetricsUtils) - Programmatic server entry point (
run_server) - JWT bearer / current-user / role dependencies
- CEP (Brazilian zipcode)
- Cache decorator (
@cached) - Tool-spec router (
make_tool_spec_router) - Webhook signature verification (
WebhookSignatureVerifier) - Pagination Link headers (
build_pagination_link_header) - Rate limit middleware (
RateLimitMiddleware) - Outbox dispatcher pattern
- Migration guide 0.7 → 0.8
- Reference
- Conventions
- Development
- Release
- License
Install
pip install tempest-fastapi-sdk
Via pyproject.toml:
dependencies = [
"tempest-fastapi-sdk>=0.7.1",
]
Requires Python >=3.11.
Optional extras
Feature-rich helpers pull in third-party dependencies that you only need when you actually use the helper. Pick the extras the service consumes:
| Extra | Pulls in | Unlocks |
|---|---|---|
[auth] |
bcrypt, PyJWT |
PasswordUtils, JWTUtils |
[email] |
aiosmtplib |
EmailUtils |
[upload] |
aiofiles, python-multipart |
UploadUtils |
[cache] |
redis |
AsyncRedisManager |
[webpush] |
pywebpush, cryptography |
WebPushDispatcher |
[metrics] |
psutil, nvidia-ml-py |
MetricsUtils |
[queue] |
faststream[rabbit] |
AsyncBrokerManager (FastStream) |
[tasks] |
taskiq, taskiq-aio-pika |
AsyncTaskBrokerManager (TaskIQ) |
[all] |
everything above | every helper |
pip install "tempest-fastapi-sdk[auth,upload]" # only what the service uses
pip install "tempest-fastapi-sdk[all]" # or pull everything
Since 0.7.1 every optional dependency is imported lazily at first instantiation, so import tempest_fastapi_sdk works with any subset of extras — instantiating a helper whose extra is missing raises ImportError with a clear hint pointing at the right one.
What's inside
| Module | Exports |
|---|---|
tempest_fastapi_sdk.schemas |
BaseSchema, BaseResponseSchema, BasePaginationFilterSchema, BasePaginationSchema[T], CursorPaginationFilterSchema, CursorPaginationSchema, encode_cursor, decode_cursor, build_pagination_link_header |
tempest_fastapi_sdk.db |
BaseModel, BaseRepository[ModelType], AsyncDatabaseManager, AlembicHelper, NAMING_CONVENTION, AuditMixin, SoftDeleteMixin |
tempest_fastapi_sdk.exceptions |
AppException, NotFoundException, ConflictException, ValidationException, UnauthorizedException, ForbiddenException, InvalidTokenException, ExpiredTokenException, FileTooLargeException, InvalidFileTypeException |
tempest_fastapi_sdk.settings |
BaseAppSettings, ServerSettings, LogSettings, DatabaseSettings, RedisSettings, RabbitMQSettings, JWTSettings, CORSSettings, EmailSettings, UploadSettings, TokenSettings, WebPushSettings, TaskIQSettings |
tempest_fastapi_sdk.api |
register_exception_handlers, app_exception_handler, apply_cors, make_health_router, make_tool_spec_router, make_token_dependency, make_bearer_token_dependency, make_jwt_user_dependency, make_role_dependency, make_permission_dependency, require_x_token, run_server, RequestIDMiddleware, RateLimitMiddleware, WebhookSignatureVerifier, HealthCheck |
tempest_fastapi_sdk.controllers |
BaseController |
tempest_fastapi_sdk.services |
BaseService |
tempest_fastapi_sdk.core |
configure_logging, JSONFormatter, get_request_id/set_request_id/clear_request_id, request_id_ctx |
tempest_fastapi_sdk.sse |
EventStream, ServerSentEvent, sse_response |
tempest_fastapi_sdk.cache (extra: [cache]) |
AsyncRedisManager, cached |
tempest_fastapi_sdk.webpush (extra: [webpush]) |
WebPushDispatcher, WebPushError, WebPushGoneError, WebPushSubscriptionSchema, WebPushKeysSchema, WebPushPayloadSchema |
tempest_fastapi_sdk.queue (extra: [queue]) |
AsyncBrokerManager (FastStream lifecycle wrapper) |
tempest_fastapi_sdk.tasks (extra: [tasks]) |
AsyncTaskBrokerManager (TaskIQ lifecycle wrapper), AsyncTaskScheduler (periodic / cron tasks) |
tempest_fastapi_sdk.utils |
to_utc, utcnow, modify_dict, LogUtils, PasswordUtils (extra: [auth]), JWTUtils (extra: [auth]), EmailUtils (extra: [email]), UploadUtils (extra: [upload]), MetricsUtils/CPUMetrics/MemoryMetrics/DiskMetrics/GPUMetrics/SystemMetrics (extra: [metrics]), BR regex helpers (CPF, CNPJ, CPFOrCNPJ, PhoneBR, CEP, is_valid_*, normalize_*, only_digits, *_PATTERN) |
Core primitives are re-exported from tempest_fastapi_sdk at the top level — from tempest_fastapi_sdk import BaseModel, BaseRepository, AppException always works. The extras-gated managers in tempest_fastapi_sdk.cache, tempest_fastapi_sdk.queue and tempest_fastapi_sdk.tasks must be imported from their own submodule (from tempest_fastapi_sdk.queue import AsyncBrokerManager).
Architecture overview
The SDK assumes a layered architecture where each layer has a single, narrow responsibility:
HTTP request
│
▼
┌─────────────┐ receive HTTP, validate input via schemas,
│ Router │ call service, return response schema.
└──────┬──────┘ No business logic, no DB access.
│
▼
┌─────────────┐ orchestrate use case across one or more services;
│ Controller │ handle cross-service coordination only.
└──────┬──────┘ Optional — skip for simple CRUD.
│
▼
┌─────────────┐ business rules, validation beyond Pydantic,
│ Service │ domain decisions. Calls one or more repositories.
└──────┬──────┘ No HTTP types, no SQLAlchemy types.
│
▼
┌─────────────┐ raw data access via SQLAlchemy. CRUD, filters,
│ Repository │ pagination. Translates between ORM and schemas
└──────┬──────┘ via map_to_* methods. No business decisions.
│
▼
┌─────────────┐ SQLAlchemy AsyncSession on top of asyncpg/aiosqlite.
│ Database │
└─────────────┘
The SDK ships BaseModel, BaseRepository, BaseSchema and the exception/settings primitives. Routers, services and controllers are your code — the SDK gives you the conventions so they all look the same across projects.
Tutorial — building the Users feature
We'll build a complete Users feature from scratch, end to end. Every file below is something you write in your project; SDK primitives are imported.
1. Project layout
The canonical layout every Python service shipped against this SDK should adopt — main.py is a one-liner, src/server.py exposes both run() and the importable app (or re-exports it from src/api/app.py), api/dependencies/ is always a package (auth + factory providers), controllers/ is mandatory even when it's only a thin pass-through, and repositories/ lives under db/.
my-service/
├── main.py # one-liner: from src.server import run; run()
└── src/
├── __init__.py # re-exports `run` from src.server
├── server.py # programmatic uvicorn.run(...) + module-level `app`
├── core/
│ ├── __init__.py
│ ├── settings.py # Settings(BaseAppSettings, mixins...)
│ └── exceptions.py # domain exceptions (UserNotFoundError, ...)
├── db/
│ ├── __init__.py # re-exports BaseModel + every model
│ ├── models/
│ │ ├── __init__.py
│ │ └── user.py # UserModel(BaseModel)
│ └── repositories/
│ ├── __init__.py
│ └── user.py # UserRepository(BaseRepository[UserModel])
├── schemas/
│ ├── __init__.py
│ └── user.py # UserCreate/Update/Response/Filter
├── services/
│ ├── __init__.py
│ └── user.py # UserService — business logic
├── controllers/
│ ├── __init__.py
│ └── user.py # UserController — orchestration (thin pass-through OK)
└── api/
├── __init__.py
├── app.py # create_app() — middleware, CORS, exception handlers, routers
├── routers/
│ ├── __init__.py
│ └── users.py
└── dependencies/ # ALWAYS a package, never a flat module
├── __init__.py
├── auth.py # X-Token / current_user / require_role dependencies
└── controllers.py # get_<X>_controller / get_<X>_service factories
Each __init__.py re-exports every public symbol from its directory so consumers always do from src.schemas import UserCreateSchema (not from src.schemas.user import UserCreateSchema). This keeps refactors painless — move files around without breaking imports.
If your service has no controllers/services/repositories yet, still ship empty packages with the right names — uniformity matters more than skipping a directory. Drop db/, utils/, queue/ or tasks/ only when the service genuinely doesn't need persistence/utilities/messaging.
2. Settings, server, app factory & entry point
Four files map onto four responsibilities:
| File | Responsibility |
|---|---|
src/core/settings.py |
Settings(BaseAppSettings, ...mixins) — one source of truth for env vars. |
src/api/app.py |
create_app() factory + middleware + CORS + exception handlers + router includes + module-level app instance. |
src/server.py |
run() invoking uvicorn.run("src.api.app:app", ...) programmatically, plus re-exports app so external runners (gunicorn, uvicorn CLI) can import it. |
main.py |
Process entry point — a single line under if __name__ == "__main__": calling run(). |
# src/core/settings.py
from tempest_fastapi_sdk import BaseAppSettings, DatabaseSettings, ServerSettings
class Settings(ServerSettings, DatabaseSettings, BaseAppSettings):
"""All environment-driven configuration lives here.
BaseAppSettings ships `env_file=.env`, `extra=ignore`,
`case_sensitive=True`, `frozen=True` and `str_strip_whitespace=True`.
ServerSettings adds SERVER_HOST/PORT/RELOAD, DatabaseSettings adds
DATABASE_URL/ECHO/POOL_*.
"""
JWT_SECRET: str
JWT_ALGORITHM: str = "HS256"
JWT_TTL_HOURS: int = 1
SMTP_HOST: str = "localhost"
SMTP_PORT: int = 587
SMTP_USERNAME: str | None = None
SMTP_PASSWORD: str | None = None
SMTP_FROM_ADDR: str = "noreply@example.com"
UPLOAD_DIR: str = "./var/uploads"
settings = Settings()
# src/api/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from tempest_fastapi_sdk import (
AsyncDatabaseManager,
RequestIDMiddleware,
make_health_router,
register_exception_handlers,
)
from src.api.routers import users
from src.core.settings import settings
db = AsyncDatabaseManager(settings.DATABASE_URL)
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
"""Connect on startup, dispose on shutdown."""
await db.connect()
try:
yield
finally:
await db.disconnect()
def create_app() -> FastAPI:
"""Build and configure the FastAPI app."""
app = FastAPI(title="my-service", version="0.1.0", lifespan=lifespan)
app.add_middleware(RequestIDMiddleware)
register_exception_handlers(app)
# Meta endpoints sit at the root prefix.
app.include_router(make_health_router(db=db, version="0.1.0"))
# Business endpoints sit under /api/<domain>.
app.include_router(users.router, prefix="/api")
return app
app = create_app()
# src/server.py
from tempest_fastapi_sdk import run_server
from src.api.app import app # noqa: F401 — re-exported for external runners
from src.core.settings import settings
def run() -> None:
"""Start the API server programmatically."""
run_server("src.api.app:app", settings=settings)
__all__: list[str] = ["app", "run"]
run_server reads SERVER_HOST / SERVER_PORT / SERVER_RELOAD from settings (falling back to 127.0.0.1 / 8000 / False) and forwards any extra kwargs (workers=, log_config=, ssl_*=) verbatim to uvicorn.run. See the Programmatic server entry point recipe.
# src/__init__.py
from src.server import run
__all__: list[str] = ["run"]
# main.py
from src.server import run
if __name__ == "__main__":
run()
Bind defaults: 127.0.0.1 for internal services (the SDK's ServerSettings.SERVER_HOST default), 0.0.0.0 only when the service is consumed by a separate origin (e.g. a frontend dev server). Never start uvicorn via subprocess.run(["uvicorn", ...]) — always go through run_server (or uvicorn.run("src.api.app:app", ...) directly) so reload, signal handling and graceful shutdown behave correctly.
3. ORM model
# src/db/models/user.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from tempest_fastapi_sdk import BaseModel
class UserModel(BaseModel):
"""One row per registered user.
Inherits from BaseModel, so it automatically gets:
- id (UUID v4, cross-DB portable via sqlalchemy.Uuid)
- is_active (bool, soft-delete flag)
- created_at, updated_at (timezone-aware TIMESTAMP, set by Python AND
the DB so the instance attribute is populated right after flush)
- __tablename__ = "user" (auto: class name without "Model" suffix,
snake-cased; override by assigning __tablename__ explicitly)
- __eq__/__hash__ by (type, id) so the same row across sessions
compares equal
- to_dict(exclude, include, remove_none) and
update_from_dict(data, allowed_fields) helpers
"""
name: Mapped[str] = mapped_column(String(64), nullable=False)
email: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
password_hash: Mapped[str] = mapped_column(String(128), nullable=False)
Re-export it:
# src/db/models/__init__.py
from src.db.models.user import UserModel
__all__: list[str] = ["UserModel"]
# src/db/__init__.py
from src.db.models import UserModel
from tempest_fastapi_sdk import BaseModel
__all__: list[str] = ["BaseModel", "UserModel"]
Tip: Always import models in
src/db/__init__.py. SQLAlchemy needs to "see" every model beforeBaseModel.metadatais complete, so Alembic autogenerate andcreate_tables()work correctly.
4. Schemas
The recommended naming pattern: one *Create, *Update, *Response and *Filter schema per resource.
# src/schemas/user.py
from pydantic import EmailStr, Field
from tempest_fastapi_sdk import (
BasePaginationFilterSchema,
BaseResponseSchema,
BaseSchema,
)
class UserCreateSchema(BaseSchema):
"""Payload for POST /users."""
name: str = Field(min_length=1, max_length=64)
email: EmailStr
password: str = Field(min_length=8, max_length=128)
class UserUpdateSchema(BaseSchema):
"""Partial payload for PATCH /users/{id}. Every field optional."""
name: str | None = Field(default=None, min_length=1, max_length=64)
email: EmailStr | None = None
class UserResponseSchema(BaseResponseSchema):
"""Outbound representation.
Inherits id/is_active/created_at/updated_at from BaseResponseSchema
(timestamps already normalized to UTC by the field validator).
"""
name: str
email: EmailStr
class UserFilterSchema(BasePaginationFilterSchema):
"""Query-string filters for GET /users.
Inherits page/size/order_by/ascending/is_active from
BasePaginationFilterSchema. Add domain-level filters below.
"""
name: str | None = None # ILIKE %name% search
email: EmailStr | None = None # exact-match filter
# src/schemas/__init__.py
from src.schemas.user import (
UserCreateSchema,
UserFilterSchema,
UserResponseSchema,
UserUpdateSchema,
)
__all__: list[str] = [
"UserCreateSchema",
"UserFilterSchema",
"UserResponseSchema",
"UserUpdateSchema",
]
5. Domain exceptions
The SDK ships generic NotFoundException, ConflictException, etc. Subclass them per domain so error responses have a useful code field:
# src/core/exceptions.py
from typing import ClassVar
from tempest_fastapi_sdk import ConflictException, NotFoundException
class UserNotFoundError(NotFoundException):
message: str = "Usuário não encontrado"
code: ClassVar[str] = "USER_NOT_FOUND"
class UserEmailAlreadyTakenError(ConflictException):
message: str = "Já existe um usuário com esse e-mail"
code: ClassVar[str] = "USER_EMAIL_TAKEN"
The SDK's exception handler (register_exception_handlers) serializes them to:
{
"detail": "Usuário não encontrado",
"code": "USER_NOT_FOUND",
"details": {}
}
The frontend branches on code, not on the (potentially translated) message.
6. Repository
# src/db/repositories/user.py
from typing import ClassVar
from tempest_fastapi_sdk import AppException, BaseRepository
from src.core.exceptions import UserNotFoundError
from src.db.models import UserModel
from src.schemas import UserResponseSchema
class UserRepository(BaseRepository[UserModel]):
"""Data-access layer for users."""
model: type[UserModel] = UserModel
not_found_exception: ClassVar[type[AppException]] = UserNotFoundError
def map_to_schema(self, instance: UserModel) -> UserResponseSchema:
return UserResponseSchema.model_validate(instance)
def map_to_response(self, instance: UserModel) -> UserResponseSchema:
return self.map_to_schema(instance)
Per-domain error messages (optional but recommended in real apps):
class UserRepository(BaseRepository[UserModel]):
model: type[UserModel] = UserModel
not_found_exception: ClassVar[type[AppException]] = UserNotFoundError
def __init__(self, session: AsyncSession) -> None:
super().__init__(
session,
not_found_message="Usuário não encontrado",
create_conflict_message="Já existe um usuário com esse e-mail",
update_conflict_message="Conflito ao atualizar usuário",
)
The base repo gives you 17 methods for free — see the reference table below. Add custom methods on top:
async def get_by_email(self, email: str) -> UserModel:
"""Look up a user by email. Raises UserNotFoundError on miss."""
return await self.get({"email": email})
7. Service
The service is where business rules live. It calls one or more repositories and never touches HTTP or SQLAlchemy types directly.
# src/services/user.py
from uuid import UUID
from tempest_fastapi_sdk import (
BasePaginationSchema,
PasswordUtils,
)
from src.core.exceptions import UserEmailAlreadyTakenError
from src.db.repositories import UserRepository
from src.schemas import (
UserCreateSchema,
UserFilterSchema,
UserResponseSchema,
UserUpdateSchema,
)
class UserService:
"""Business logic for the user domain."""
def __init__(
self,
repository: UserRepository,
passwords: PasswordUtils,
) -> None:
"""Initialize the service.
Args:
repository (UserRepository): User-domain repository.
passwords (PasswordUtils): Shared bcrypt helper.
"""
self.repo: UserRepository = repository
self.passwords: PasswordUtils = passwords
async def create(self, data: UserCreateSchema) -> UserResponseSchema:
if await self.repo.exists({"email": data.email}):
raise UserEmailAlreadyTakenError()
user = self.repo.map_to_model(
{
**data.to_dict(exclude=["password"]),
"password_hash": self.passwords.hash(data.password),
}
)
user = await self.repo.add(user)
return self.repo.map_to_response(user)
async def get(self, user_id: UUID) -> UserResponseSchema:
user = await self.repo.get_by_id(user_id)
return self.repo.map_to_response(user)
async def update(
self,
user_id: UUID,
data: UserUpdateSchema,
) -> UserResponseSchema:
user = await self.repo.get_by_id(user_id)
user.update_from_dict(
data.to_dict(),
allowed_fields={"name", "email"}, # prevents mass-assignment
)
user = await self.repo.update(user)
return self.repo.map_to_response(user)
async def soft_delete(self, user_id: UUID) -> None:
await self.repo.soft_delete(user_id)
async def list_paginated(
self,
filters: UserFilterSchema,
) -> BasePaginationSchema[UserResponseSchema]:
page = await self.repo.paginate(
filters=filters.get_conditions(),
page=filters.page,
page_size=filters.size,
order_by=filters.order_by,
ascending=filters.ascending,
)
return BasePaginationSchema[UserResponseSchema](
items=[self.repo.map_to_response(u) for u in page["items"]],
total=page["total"],
page=page["page"],
size=page["size"],
pages=page["pages"],
)
8. Controller
Even when there's no orchestration to do, controllers/ exists as a thin pass-through so the import graph stays uniform across services. The day a use case needs to coordinate two services (or fan out to a queue), the controller is already there.
# src/controllers/user.py
from uuid import UUID
from tempest_fastapi_sdk import BasePaginationSchema
from src.schemas import (
UserCreateSchema,
UserFilterSchema,
UserResponseSchema,
UserUpdateSchema,
)
from src.services.user import UserService
class UserController:
"""Orchestrate user use cases.
Today every method is a thin pass-through to ``UserService``. As
soon as a use case needs to coordinate more than one service —
e.g. signup also sends a welcome email and enqueues a CRM sync —
the orchestration lives here, not in the router and not in the
service.
"""
def __init__(self, service: UserService) -> None:
self.service: UserService = service
async def create(self, data: UserCreateSchema) -> UserResponseSchema:
return await self.service.create(data)
async def get(self, user_id: UUID) -> UserResponseSchema:
return await self.service.get(user_id)
async def update(
self,
user_id: UUID,
data: UserUpdateSchema,
) -> UserResponseSchema:
return await self.service.update(user_id, data)
async def soft_delete(self, user_id: UUID) -> None:
await self.service.soft_delete(user_id)
async def list_paginated(
self,
filters: UserFilterSchema,
) -> BasePaginationSchema[UserResponseSchema]:
return await self.service.list_paginated(filters)
9. Dependency providers
api/dependencies/ is always a package. auth.py hosts the shared-secret / current-user dependencies; controllers.py (or services.py when there is no controller layer yet) hosts the factory providers the routers depend on. Never construct controllers or services inline inside the router file.
# src/api/dependencies/controllers.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from tempest_fastapi_sdk import PasswordUtils
from src.api.app import db
from src.controllers.user import UserController
from src.db.repositories import UserRepository
from src.services.user import UserService
# Stateless utilities — instantiate once per process.
_passwords: PasswordUtils = PasswordUtils()
def get_user_controller(
session: AsyncSession = Depends(db.session_dependency),
) -> UserController:
"""Wire repository → service → controller for a single request."""
repository = UserRepository(session)
service = UserService(repository=repository, passwords=_passwords)
return UserController(service=service)
# src/api/dependencies/__init__.py
from src.api.dependencies.controllers import get_user_controller
__all__: list[str] = ["get_user_controller"]
10. Router
Routers receive controllers via FastAPI Depends — no inline construction, no business logic, no DB calls. Business endpoints sit under /api/<domain> (the prefix is added at the include site in src/api/app.py); meta endpoints (/health, /tool-spec) stay at the root prefix.
# src/api/routers/users.py
from uuid import UUID
from fastapi import APIRouter, Depends, status
from tempest_fastapi_sdk import BasePaginationSchema
from src.api.dependencies import get_user_controller
from src.controllers.user import UserController
from src.schemas import (
UserCreateSchema,
UserFilterSchema,
UserResponseSchema,
UserUpdateSchema,
)
router = APIRouter(prefix="/users", tags=["users"])
@router.post(
"",
response_model=UserResponseSchema,
status_code=status.HTTP_201_CREATED,
)
async def create_user(
data: UserCreateSchema,
controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
return await controller.create(data)
@router.get("/{user_id}", response_model=UserResponseSchema)
async def get_user(
user_id: UUID,
controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
return await controller.get(user_id)
@router.patch("/{user_id}", response_model=UserResponseSchema)
async def update_user(
user_id: UUID,
data: UserUpdateSchema,
controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
return await controller.update(user_id, data)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: UUID,
controller: UserController = Depends(get_user_controller),
) -> None:
await controller.soft_delete(user_id)
@router.get("", response_model=BasePaginationSchema[UserResponseSchema])
async def list_users(
filters: UserFilterSchema = Depends(),
controller: UserController = Depends(get_user_controller),
) -> BasePaginationSchema[UserResponseSchema]:
return await controller.list_paginated(filters)
11. Pagination
The pagination contract is enforced end-to-end by SDK primitives:
UserFilterSchema(BasePaginationFilterSchema)parses?page=&size=&order_by=&ascending=&is_active=&name=from the query string and exposes.get_conditions()returning only the domain-level filters (without pagination keys).UserRepository.paginate(...)runs the query with the filter dict + ordering + offset/limit + count, returning{items, total, page, size, pages}.BasePaginationSchema[UserResponseSchema]wraps the result so OpenAPI documents the response shape correctly.
GET /api/users?page=2&size=20&order_by=name&ascending=true&is_active=true&name=ana
Returns:
{
"items": [
{"id": "...", "name": "Ana ...", "email": "...", ...},
...
],
"total": 142,
"page": 2,
"size": 20,
"pages": 8
}
Recipes
Authentication recipe
End-to-end signup + login + protected route using PasswordUtils and JWTUtils. Requires the [auth] extra.
Wire the utility singletons
# src/core/security.py
from datetime import timedelta
from tempest_fastapi_sdk import JWTUtils, PasswordUtils
from src.core.settings import settings
passwords = PasswordUtils(rounds=12)
tokens = JWTUtils(
secret=settings.JWT_SECRET,
algorithm=settings.JWT_ALGORITHM,
default_ttl=timedelta(hours=settings.JWT_TTL_HOURS),
issuer="my-app",
)
Signup
Reuse the UserService.create defined in the tutorial — it already hashes the password.
Login
# src/schemas/auth.py
from pydantic import EmailStr
from tempest_fastapi_sdk import BaseSchema
class LoginSchema(BaseSchema):
email: EmailStr
password: str
class TokenResponseSchema(BaseSchema):
access_token: str
token_type: str = "bearer"
# src/services/auth.py
from sqlalchemy.ext.asyncio import AsyncSession
from tempest_fastapi_sdk import JWTUtils, PasswordUtils, UnauthorizedException
from src.db.repositories import UserRepository
from src.schemas.auth import LoginSchema, TokenResponseSchema
class AuthService:
def __init__(
self,
session: AsyncSession,
passwords: PasswordUtils,
tokens: JWTUtils,
) -> None:
self.repo = UserRepository(session)
self.passwords = passwords
self.tokens = tokens
async def login(self, data: LoginSchema) -> TokenResponseSchema:
user = await self.repo.get_or_none({"email": data.email})
if user is None or not self.passwords.verify(
data.password, user.password_hash
):
# Same error for both cases — don't leak which one failed.
raise UnauthorizedException(message="E-mail ou senha inválidos")
token = self.tokens.encode({"sub": str(user.id)})
return TokenResponseSchema(access_token=token)
# src/api/routers/auth.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.api.app import db
from src.core.security import passwords, tokens
from src.schemas.auth import LoginSchema, TokenResponseSchema
from src.services.auth import AuthService
router = APIRouter(prefix="/auth", tags=["auth"])
def get_auth_service(
session: AsyncSession = Depends(db.session_dependency),
) -> AuthService:
return AuthService(session, passwords, tokens)
@router.post("/login", response_model=TokenResponseSchema)
async def login(
data: LoginSchema,
service: AuthService = Depends(get_auth_service),
) -> TokenResponseSchema:
return await service.login(data)
Protect a route — JWT dependency
Use make_jwt_user_dependency to wire the bearer scheme + JWT decode + user load in one call. The single seam is user_loader(subject), an async callable that maps the JWT subject claim to your domain UserModel.
# src/api/dependencies/auth.py
from uuid import UUID
from tempest_fastapi_sdk import make_jwt_user_dependency
from src.api.app import db
from src.core.security import tokens
from src.db.models import UserModel
from src.db.repositories import UserRepository
async def load_user(subject: str) -> UserModel:
"""Resolve the JWT subject (a UUID string) to a persisted user.
Opens its own session so the dependency stays request-scope-agnostic
(the loader is called once per request, and SDK exceptions raised
inside translate to the canonical 401/404 envelope).
"""
async with db.get_session_context() as session:
repo = UserRepository(session)
return await repo.get_by_id(UUID(subject))
get_current_user = make_jwt_user_dependency(tokens, load_user)
get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True)
# Use in any route
@router.get("/me", response_model=UserResponseSchema)
async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema:
return UserResponseSchema.model_validate(current)
Soft auth (optional user)
get_current_user_or_none above already uses soft=True — it returns None instead of raising on a missing or invalid token, so endpoints can work both authenticated and anonymous:
@router.get("/feed")
async def feed(
current: UserModel | None = Depends(get_current_user_or_none),
) -> FeedResponseSchema:
return await feed_service.list(viewer=current)
Under the hood soft=True calls tokens.decode_or_none (no exception on expired/invalid tokens) and skips the loader when the subject is missing.
File uploads recipe
Avatar endpoint with validation + cleanup. Requires the [upload] extra.
# src/core/storage.py
from tempest_fastapi_sdk import UploadUtils
from src.core.settings import settings
avatar_storage = UploadUtils(
upload_dir=f"{settings.UPLOAD_DIR}/avatars",
max_size_bytes=5 * 1024 * 1024, # 5 MiB
allowed_extensions={"png", "jpg", "jpeg", "webp"},
allowed_mimetypes={"image/png", "image/jpeg", "image/webp"},
)
# src/api/routers/users.py (extension)
from fastapi import UploadFile
from src.api.dependencies import get_user_controller
from src.controllers.user import UserController
from src.core.storage import avatar_storage
@router.post("/{user_id}/avatar", response_model=UserResponseSchema)
async def upload_avatar(
user_id: UUID,
file: UploadFile,
current: UserModel = Depends(get_current_user),
controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
if current.id != user_id:
raise ForbiddenException(message="Só pode editar o próprio avatar")
path = await avatar_storage.save(file, subdir=str(user_id))
return await controller.set_avatar(user_id, str(path))
Add set_avatar to both the service and the controller (the controller stays a thin pass-through unless orchestration is needed — e.g. firing an "avatar updated" event):
# src/services/user.py
class UserService:
async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema:
user = await self.repo.get_by_id(user_id)
# Delete previous file when replacing.
if user.avatar_path and user.avatar_path != path:
avatar_storage.delete(user.avatar_path)
user.avatar_path = path
user = await self.repo.update(user)
return self.repo.map_to_response(user)
# src/controllers/user.py
class UserController:
async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema:
return await self.service.set_avatar(user_id, path)
UploadUtils.save() raises FileTooLargeException (413) or InvalidFileTypeException (415) on rejection — the SDK's exception handler already returns the right status code with a code field on the response.
Serving the file back
Local-disk uploads are best served by an upstream (nginx / Caddy) so FastAPI doesn't stream bytes. For dev:
from fastapi.staticfiles import StaticFiles
app.mount(
"/static/uploads",
StaticFiles(directory=settings.UPLOAD_DIR),
name="uploads",
)
Construct the public URL in the response schema:
class UserResponseSchema(BaseResponseSchema):
name: str
email: EmailStr
avatar_url: str | None = None
@field_validator("avatar_url", mode="before")
@classmethod
def _absolute_url(cls, value: str | None) -> str | None:
if value is None:
return None
# avatar_path stored as relative path → public URL
return f"/static/uploads/{value}"
Transactional email recipe
Password reset flow using EmailUtils + a short-lived JWT. Requires the [email] extra.
# src/core/mailer.py
from tempest_fastapi_sdk import EmailUtils
from src.core.settings import settings
mailer = EmailUtils(
host=settings.SMTP_HOST,
port=settings.SMTP_PORT,
from_addr=settings.SMTP_FROM_ADDR,
username=settings.SMTP_USERNAME,
password=settings.SMTP_PASSWORD,
use_starttls=True,
)
# src/services/password_reset.py
from datetime import timedelta
from tempest_fastapi_sdk import EmailUtils, JWTUtils, NotFoundException
from src.db.repositories import UserRepository
class PasswordResetService:
def __init__(
self,
repo: UserRepository,
tokens: JWTUtils,
mailer: EmailUtils,
) -> None:
self.repo = repo
self.tokens = tokens
self.mailer = mailer
async def request_reset(self, email: str) -> None:
"""Send a password-reset link to `email`.
Always returns silently — don't reveal whether the email
is registered or not (avoids account enumeration).
"""
user = await self.repo.get_or_none({"email": email})
if user is None:
return
token = self.tokens.encode(
{"sub": str(user.id), "purpose": "password_reset"},
ttl=timedelta(minutes=15),
)
reset_url = f"https://my-app.com/reset-password?token={token}"
await self.mailer.send(
to=user.email,
subject="Reset your password",
body=f"Click here to reset your password: {reset_url}",
html=f'<p>Click <a href="{reset_url}">here</a> to reset.</p>',
)
async def consume_reset(
self,
token: str,
new_password: str,
passwords: PasswordUtils,
) -> None:
# `decode` raises InvalidTokenException / ExpiredTokenException
# (both 401). Caught by the SDK handler.
payload = self.tokens.decode(token)
if payload.get("purpose") != "password_reset":
raise InvalidTokenException()
user = await self.repo.get_by_id(UUID(payload["sub"]))
user.password_hash = passwords.hash(new_password)
await self.repo.update(user)
Alembic migrations recipe
Full workflow: bootstrap → first migration → apply → CI gate.
Bootstrap once per project
# scripts/alembic_init.py
from tempest_fastapi_sdk import AlembicHelper
from src.core.settings import settings
helper = AlembicHelper(config_path="alembic.ini", db_url=settings.DB_URL)
helper.init(
directory="alembic",
metadata_module="app.db", # exposes BaseModel
metadata_attr="BaseModel",
db_url=settings.DB_URL,
)
Run once: uv run python scripts/alembic_init.py.
This creates:
alembic.ini # SDK-curated config (UTC timezone, date-prefixed file template)
alembic/
├── env.py # SDK template (already wires target_metadata, compare_type, batch mode)
├── script.py.mako
└── versions/
Author migrations
# scripts/make_migration.py
import sys
from tempest_fastapi_sdk import AlembicHelper
from src.core.settings import settings
helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
helper.revision(
message=sys.argv[1],
autogenerate=True,
)
uv run python scripts/make_migration.py "add users table"
Generated file lands at alembic/versions/2026_05_16_1432-ae12cd34_add_users_table.py — the date prefix means files sort chronologically and merge conflicts are obvious.
Apply on startup
# src/api/app.py — extend lifespan
import asyncio
from tempest_fastapi_sdk import AlembicHelper
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Run pending migrations before serving traffic.
helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
await asyncio.to_thread(helper.upgrade)
await db.connect()
yield
await db.disconnect()
CI gate — schema must match models
# scripts/check_migrations.py
import sys
from tempest_fastapi_sdk import AlembicHelper
from src.core.settings import settings
helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
if not helper.check():
print("Schema drift detected — run make_migration.py and commit.")
sys.exit(1)
print("Schema is in sync.")
# .github/workflows/ci.yml
- name: Check migrations are in sync
run: uv run python scripts/check_migrations.py
Utility helpers recipe
Small stateless helpers from tempest_fastapi_sdk.utils that the SDK itself relies on and that show up across every service. Available without any extra.
| Helper | Signature | Purpose |
|---|---|---|
utcnow() |
() -> datetime |
Current time as a timezone-aware UTC datetime — the SDK uses this for created_at / updated_at defaults. |
to_utc(value) |
(datetime) -> datetime |
Coerce naive datetimes to UTC (assumed UTC) and aware datetimes to UTC via astimezone. Used by BaseResponseSchema field validators. |
modify_dict(data, exclude=None, include=None) |
(dict, list[str] | None, dict | None) -> dict |
Single-pass filter + merge. Drop sensitive keys before logging or merge computed fields when mapping payloads to ORM models. |
Timestamps the same way everywhere
utcnow is the canonical "now" for the SDK. Use it for soft-delete timestamps, JWT iat / exp, audit trails — anything where mixing naive and aware datetimes would burn you later.
from datetime import timedelta
from tempest_fastapi_sdk import to_utc, utcnow
now = utcnow() # timezone-aware UTC
expires_at = now + timedelta(hours=1)
# Normalize whatever the caller gave you
incoming = request.json()["scheduled_for"] # naive or aware
scheduled_for = to_utc(datetime.fromisoformat(incoming))
A naive datetime is tagged with UTC (not converted from local time) so it's predictable in headless workers and Docker containers where time.timezone is anyone's guess.
Drop sensitive keys before logging / mapping
modify_dict is the tiny utility that powers BaseSchema.to_dict(exclude=..., include=...) and BaseModel.update_from_dict(...). Use it directly when you don't want to call into Pydantic round-trips:
from tempest_fastapi_sdk import LogUtils, modify_dict
log = LogUtils("app.users")
payload = {"email": "ana@example.com", "password": "s3cr3t", "name": "Ana"}
# Strip password before logging
log.info("user_signup", **modify_dict(payload, exclude=["password"]))
# Merge a computed hash before persisting
user_row = modify_dict(
payload,
exclude=["password"],
include={"password_hash": passwords.hash(payload["password"])},
)
include wins over data, so it doubles as a "set or override" helper without mutating the source dict.
Where every other helper is documented
Every helper has its own recipe — this section is the quick map:
| Helper | Recipe |
|---|---|
PasswordUtils, JWTUtils |
Authentication recipe |
EmailUtils |
Transactional email recipe |
UploadUtils |
File uploads recipe |
LogUtils + configure_logging |
Structured logging & request IDs recipe |
MetricsUtils (CPU/memory/disk/GPU) |
System metrics recipe |
CPF, CNPJ, CPFOrCNPJ, PhoneBR, is_valid_*, normalize_*, only_digits |
BR document & phone validation recipe |
BR document & phone validation recipe
tempest_fastapi_sdk.utils.regex ships ready-to-use regex patterns, validators, normalizers and Pydantic types for the identity/contact fields that show up in almost every Brazilian API. No extra required — pure stdlib + Pydantic (already a core dependency).
| Symbol | Kind | Purpose |
|---|---|---|
CPF_PATTERN, CNPJ_PATTERN, CPF_CNPJ_PATTERN, PHONE_BR_PATTERN |
re.Pattern[str] |
Compiled regex (masked or raw input). |
is_valid_cpf, is_valid_cnpj, is_valid_cpf_cnpj |
(str) -> bool |
Format match + check-digit math. All-same-digit sequences rejected. |
is_valid_phone_br |
(str) -> bool |
BR phone shape: optional +55, optional DDD, optional 9th digit. |
normalize_cpf, normalize_cnpj, normalize_cpf_cnpj, normalize_phone_br |
(str) -> str |
Strip mask to digits-only; raise ValueError if invalid. |
only_digits |
(str) -> str |
Strip every non-digit character. |
CPF, CNPJ, CPFOrCNPJ, PhoneBR |
Annotated[str, AfterValidator(...)] |
Drop-in Pydantic field types — validate + normalize automatically. |
Schema usage
from pydantic import EmailStr, Field
from tempest_fastapi_sdk import BaseSchema
from tempest_fastapi_sdk.utils import CPF, CPFOrCNPJ, PhoneBR
class CustomerCreateSchema(BaseSchema):
"""Payload for POST /customers.
`document` accepts CPF or CNPJ in masked or raw form and is
stored digits-only after validation. `phone` is normalized the
same way. Invalid values surface as a Pydantic `ValidationError`
(HTTP 422 via the SDK exception handler).
"""
name: str = Field(min_length=1, max_length=128)
email: EmailStr
document: CPFOrCNPJ
phone: PhoneBR
Valid input:
{
"name": "Ana",
"email": "ana@example.com",
"document": "529.982.247-25",
"phone": "+55 (11) 98888-7777"
}
After validation:
CustomerCreateSchema(...).document # "52998224725"
CustomerCreateSchema(...).phone # "5511988887777"
Manual validation (services, controllers, queue handlers)
from tempest_fastapi_sdk.utils import (
is_valid_cpf_cnpj,
normalize_cpf_cnpj,
only_digits,
)
if not is_valid_cpf_cnpj(raw_document):
raise ValidationException(message="Documento inválido")
document_digits = normalize_cpf_cnpj(raw_document)
Filtering by stored digits
The normalizers strip masks before saving, so repository filters and unique constraints all work on the canonical digits-only form:
await repo.get({"document": normalize_cpf_cnpj(query)})
Testing recipe
pytest + pytest-asyncio + in-memory SQLite + FastAPI TestClient.
Shared fixtures
# tests/conftest.py
from collections.abc import AsyncGenerator
import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession
from tempest_fastapi_sdk import AsyncDatabaseManager
from src.api.app import create_app
from src.db import BaseModel # importing BaseModel ensures models are registered
@pytest_asyncio.fixture
async def db() -> AsyncGenerator[AsyncDatabaseManager, None]:
"""Fresh in-memory DB per test."""
manager = AsyncDatabaseManager("sqlite+aiosqlite:///:memory:")
await manager.connect()
await manager.create_tables()
try:
yield manager
finally:
await manager.drop_tables()
await manager.disconnect()
@pytest_asyncio.fixture
async def session(db: AsyncDatabaseManager) -> AsyncGenerator[AsyncSession, None]:
"""Managed session bound to the in-memory DB."""
async with db.get_session_context() as session:
yield session
@pytest_asyncio.fixture
async def client(db: AsyncDatabaseManager) -> AsyncGenerator[TestClient, None]:
"""FastAPI TestClient with the SDK manager overridden to use SQLite."""
app = create_app()
# Override the session dependency to use the test DB.
from src.api.app import db as production_db
app.dependency_overrides[production_db.session_dependency] = db.session_dependency
async with TestClient(app) as client:
yield client
Repository test
# tests/repositories/test_user.py
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.exceptions import UserNotFoundError
from src.db.models import UserModel
from src.db.repositories import UserRepository
class TestUserRepository:
async def test_get_by_email_raises_when_missing(
self, session: AsyncSession
) -> None:
repo = UserRepository(session)
with pytest.raises(UserNotFoundError):
await repo.get({"email": "ghost@example.com"})
async def test_add_and_get(self, session: AsyncSession) -> None:
repo = UserRepository(session)
user = await repo.add(
UserModel(
name="Ana", email="ana@example.com", password_hash="x"
)
)
loaded = await repo.get_by_id(user.id)
assert loaded.name == "Ana"
Endpoint test
# tests/api/test_users.py
from fastapi.testclient import TestClient
class TestUsersAPI:
def test_create_user(self, client: TestClient) -> None:
response = client.post(
"/api/users",
json={
"name": "Ana",
"email": "ana@example.com",
"password": "hunter22",
},
)
assert response.status_code == 201
body = response.json()
assert body["email"] == "ana@example.com"
assert "password" not in body
assert "password_hash" not in body
def test_get_user_not_found(self, client: TestClient) -> None:
response = client.get("/api/users/00000000-0000-0000-0000-000000000000")
assert response.status_code == 404
body = response.json()
# SDK envelope is always {detail, code, details}
assert body["code"] == "USER_NOT_FOUND"
tempest_fastapi_sdk.testing helpers
tempest_fastapi_sdk.testing ships framework-agnostic helpers that don't require pytest to be importable — wrap them in @pytest.fixture (or any other harness) inside the consuming project's conftest.py. Useful when a test doesn't need a full AsyncDatabaseManager (no lifespan, no health-check probes).
| Helper | Purpose |
|---|---|
create_test_engine(url="sqlite+aiosqlite:///:memory:", **engine_kwargs) |
Build a throw-away AsyncEngine. |
create_test_session_factory(engine) |
Build a sessionmaker bound to the engine. |
init_test_metadata(engine, metadata=None) |
Create every SQLAlchemy table on the engine (defaults to BaseModel.metadata). |
drop_test_metadata(engine, metadata=None) |
Drop every table. |
test_database(url="sqlite+aiosqlite:///:memory:", metadata=None) |
Async context manager — yields an engine with metadata pre-created, drops everything and disposes on exit. |
test_session(url="sqlite+aiosqlite:///:memory:", metadata=None) |
Async context manager — yields an AsyncSession on top of a fresh test_database. |
# tests/conftest.py
from collections.abc import AsyncGenerator
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
from tempest_fastapi_sdk.testing import test_database, test_session
@pytest_asyncio.fixture
async def engine() -> AsyncGenerator[AsyncEngine, None]:
"""Yield a fresh in-memory SQLite engine for each test."""
async with test_database() as e:
yield e
@pytest_asyncio.fixture
async def session() -> AsyncGenerator[AsyncSession, None]:
"""Yield a managed AsyncSession bound to the in-memory engine."""
async with test_session() as s:
yield s
Use the one-shot test_session() context manager for ad-hoc tests that don't need cross-fixture sharing:
from tempest_fastapi_sdk.testing import test_session
from src.db.models import UserModel
from src.db.repositories import UserRepository
async def test_repo_directly() -> None:
async with test_session() as session:
repo = UserRepository(session)
await repo.add(UserModel(name="Ana", email="ana@example.com", password_hash="x"))
assert await repo.count() == 1
Pass metadata= when the project mixes the SDK BaseModel.metadata with a second, isolated metadata (rare — keep one BaseModel per service whenever possible).
Application bootstrap recipe
Section 2 of the tutorial shows the minimal create_app(). This recipe is the extended version, wiring everything tempest_fastapi_sdk.api ships — exception handlers, CORS, request-ID middleware, the health router with extra checks, a shared-secret token dependency and an extra Redis manager — all from the same canonical src/api/app.py location. The bootstrapping pattern stays identical; only the contents of create_app() grow.
# src/api/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from tempest_fastapi_sdk import (
AsyncDatabaseManager,
RequestIDMiddleware,
apply_cors,
configure_logging,
make_health_router,
make_token_dependency,
register_exception_handlers,
)
from tempest_fastapi_sdk.cache import AsyncRedisManager
from src.core.settings import settings
configure_logging(level=settings.LOG_LEVEL, json_output=settings.LOG_JSON)
db = AsyncDatabaseManager(
settings.DATABASE_URL,
echo=settings.DATABASE_ECHO,
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_MAX_OVERFLOW,
pool_recycle=settings.DATABASE_POOL_RECYCLE,
)
redis = AsyncRedisManager(settings.REDIS_URL)
require_token = make_token_dependency(settings.TOKEN_SECRET)
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
await db.connect()
await redis.connect()
try:
yield
finally:
await redis.disconnect()
await db.disconnect()
def create_app() -> FastAPI:
"""Build and configure the FastAPI app."""
app = FastAPI(
title="my-service",
version=settings.VERSION,
lifespan=lifespan,
)
app.add_middleware(RequestIDMiddleware)
apply_cors(app, settings)
register_exception_handlers(app)
# Meta endpoints at the root prefix.
app.include_router(
make_health_router(
db=db,
checks={"redis": redis.health_check},
version=settings.VERSION,
),
)
# Business endpoints under /api/<domain>, guarded by the shared secret.
from src.api.routers import users
app.include_router(
users.router,
prefix="/api",
dependencies=[Depends(require_token)],
)
return app
app = create_app()
Key points:
src/server.pyandmain.py(one-liner) stay exactly as in Section 2 of the tutorial — onlycreate_app()changes when you add primitives. Never start uvicorn viasubprocess.run(["uvicorn", ...]); always importappfromsrc.api.appor calluvicorn.run("src.api.app:app", ...)programmatically fromsrc/server.py.RequestIDMiddlewarereads/writesX-Request-IDand seedsrequest_id_ctxso every log line emitted during the request carries the correlation ID.apply_cors(app, settings)readsCORSSettingsdefaults; pass keyword overrides for one-off changes.register_exception_handlers(app)wires everyAppExceptionsubclass to the canonical{detail, code, details}envelope.make_health_router(db=db, checks={"redis": redis.health_check}, version=...)mountsGET /health/livenessandGET /health/readiness(returns503when any check fails) at the root prefix.make_token_dependency(secret)returns an async dependency that validatesX-Tokenviahmac.compare_digest; pass an empty string to disable in dev. The dependency lives next to the rest of the auth glue insrc/api/dependencies/auth.pyonce it grows beyond the one-liner above.
Structured logging & request IDs recipe
configure_logging installs a JSON handler on the root logger that emits one-line JSON records carrying the active request ID. LogUtils is a thin facade that adds level methods accepting structured **fields.
from tempest_fastapi_sdk import LogUtils, configure_logging
from tempest_fastapi_sdk.core import get_request_id
# Imperative — call once during bootstrap.
configure_logging(level="INFO", json_output=True)
# Facade — handy for service-wide singletons.
log = LogUtils("app.users", level="INFO")
log.info("user_created", user_id=str(user.id), email=user.email)
log.warning("login_throttled", ip="1.2.3.4", attempts=5)
try:
risky()
except RuntimeError:
log.exception("risky_failed", op="reconcile") # appends traceback
# Surface the correlation ID outside the log line if needed.
request_id = get_request_id()
JSON output (single line — formatted here for readability):
{
"timestamp": "2026-05-16T20:14:33.412+00:00Z",
"level": "INFO",
"logger": "app.users",
"message": "user_created",
"request_id": "d83e4b0c-7c2f-4bd6-aaa1-7d4f6cf5e5e9",
"user_id": "9c1a5b2d-...",
"email": "ana@example.com"
}
The middleware accepts a custom header name (RequestIDMiddleware(app, header_name="X-Correlation-ID")); the same header is echoed back on every response.
Settings mixins composition recipe
BaseAppSettings is the configured pydantic-settings base. The SDK also exposes composable mixins for the most common dependencies; pick the ones the service needs and put BaseAppSettings at the end of the MRO so its model_config wins.
# src/core/settings.py
from pydantic import Field
from tempest_fastapi_sdk import (
BaseAppSettings,
CORSSettings,
DatabaseSettings,
EmailSettings,
JWTSettings,
LogSettings,
RabbitMQSettings,
RedisSettings,
ServerSettings,
TaskIQSettings,
TokenSettings,
UploadSettings,
WebPushSettings,
)
class Settings(
ServerSettings,
LogSettings,
DatabaseSettings,
RedisSettings,
RabbitMQSettings,
TaskIQSettings,
JWTSettings,
CORSSettings,
EmailSettings,
UploadSettings,
TokenSettings,
WebPushSettings,
BaseAppSettings,
):
"""Service-wide settings."""
VERSION: str = Field(default="0.0.0")
settings = Settings()
Each mixin owns its own env-var prefix — pick only the ones the service needs:
| Mixin | Env vars |
|---|---|
ServerSettings |
SERVER_HOST, SERVER_PORT, SERVER_RELOAD, SERVER_DEBUG |
LogSettings |
LOG_LEVEL, LOG_JSON |
DatabaseSettings |
DATABASE_URL, DATABASE_ECHO, DATABASE_POOL_SIZE, DATABASE_MAX_OVERFLOW, DATABASE_POOL_RECYCLE |
RedisSettings |
REDIS_URL, REDIS_DECODE_RESPONSES |
RabbitMQSettings |
RABBITMQ_URL, RABBITMQ_PREFETCH_COUNT |
TaskIQSettings |
TASKIQ_BROKER_URL, TASKIQ_RESULT_BACKEND_URL |
JWTSettings |
JWT_SECRET, JWT_ALGORITHM, JWT_ACCESS_TTL_SECONDS, JWT_REFRESH_TTL_SECONDS, JWT_ISSUER |
CORSSettings |
CORS_ORIGINS, CORS_ALLOW_CREDENTIALS, CORS_ALLOW_METHODS, CORS_ALLOW_HEADERS, CORS_EXPOSE_HEADERS, CORS_MAX_AGE |
EmailSettings |
SMTP_HOST, SMTP_PORT, SMTP_USERNAME, SMTP_PASSWORD, SMTP_FROM_ADDR, SMTP_USE_TLS, SMTP_USE_SSL, SMTP_TIMEOUT_SECONDS |
UploadSettings |
UPLOAD_DIR, UPLOAD_MAX_SIZE_BYTES, UPLOAD_ALLOWED_EXTENSIONS, UPLOAD_ALLOWED_MIMETYPES |
TokenSettings |
TOKEN_SECRET |
WebPushSettings |
VAPID_PUBLIC_KEY, VAPID_PRIVATE_KEY, VAPID_SUBJECT, WEBPUSH_DEFAULT_TTL_SECONDS |
Breaking change in 0.8.0:
ServerSettingspreviously exposed bareHOST/PORT/DEBUG/LOG_LEVEL/LOG_JSONfields. They were renamed toSERVER_HOST/SERVER_PORT/SERVER_RELOAD/SERVER_DEBUG, andLOG_LEVEL/LOG_JSONmoved to the newLogSettingsmixin. Update both your.envfile (env var names) and any code readingsettings.HOSTetc.
Controllers & services layering recipe
BaseService[RepositoryT, ResponseT] and BaseController[ServiceT, ResponseT] are generic skeletons matching the SDK layering (router → controller → service → repository). They expose pass-through CRUD methods so simple endpoints can subclass them without overriding anything; you override only methods that need orchestration.
# src/services/user_service.py
from uuid import UUID
from tempest_fastapi_sdk import BaseService
from src.db.repositories import UserRepository
from src.schemas.user import UserCreate, UserResponse, UserUpdate
from src.utils.security import password_utils
class UserService(BaseService[UserRepository, UserResponse]):
"""Business logic for the user feature."""
async def signup(self, data: UserCreate) -> UserResponse:
# Business logic — hash the password, then delegate to the repo.
instance = self.repository.map_to_model(
{
"name": data.name,
"email": data.email,
"password_hash": password_utils.hash(data.password),
},
)
created = await self.repository.add(instance)
return self.repository.map_to_response(created)
# src/controllers/user_controller.py
from tempest_fastapi_sdk import BaseController
from src.schemas.user import UserCreate, UserResponse
from src.services.user_service import UserService
class UserController(BaseController[UserService, UserResponse]):
"""Thin orchestration over UserService."""
async def signup(self, data: UserCreate) -> UserResponse:
# Pass-through today; the controller is the seam to add
# cross-service coordination later (audit log, outbox event,
# downstream notification, etc.) without touching the router.
return await self.service.signup(data)
# src/api/dependencies/controllers.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.api.app import db
from src.controllers.user_controller import UserController
from src.db.repositories import UserRepository
from src.services.user_service import UserService
def get_user_controller(
session: AsyncSession = Depends(db.session_dependency),
) -> UserController:
return UserController(UserService(UserRepository(session)))
# src/api/routers/users.py
from fastapi import APIRouter, Depends, status
from src.api.dependencies.controllers import get_user_controller
from src.controllers.user_controller import UserController
from src.schemas.user import UserCreate, UserResponse
router = APIRouter(prefix="/users", tags=["users"])
@router.post(
"/",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
)
async def create_user(
data: UserCreate,
controller: UserController = Depends(get_user_controller),
) -> UserResponse:
return await controller.signup(data)
Keep controllers present even when they only pass through — the import graph stays uniform across services, so adding cross-cutting policy later doesn't change the router signature.
Audit & soft-delete mixins recipe
SoftDeleteMixin adds a deleted_at timestamp column with mark_deleted() / mark_restored() / is_deleted helpers. AuditMixin adds created_by / updated_by UUID columns with stamp_created_by(user_id) / stamp_updated_by(user_id) helpers. Mix them in alongside BaseModel:
# src/db/models/user.py
from sqlalchemy.orm import Mapped, mapped_column
from tempest_fastapi_sdk import AuditMixin, BaseModel, SoftDeleteMixin
class UserModel(BaseModel, SoftDeleteMixin, AuditMixin):
"""Users — soft-deletable and audited."""
name: Mapped[str] = mapped_column()
email: Mapped[str] = mapped_column(unique=True)
password_hash: Mapped[str] = mapped_column()
Filtering is the caller's responsibility — the mixin doesn't install a global filter. Hide soft-deleted rows from list endpoints by passing deleted_at=None (or filtering in your repository subclass):
async def list_alive(self) -> list[UserResponse]:
instances = await self.repository.list(filters={"deleted_at": None})
return [self.repository.map_to_response(i) for i in instances]
Stamping audit columns belongs to the service layer where the current user is in scope:
async def update(self, user_id: UUID, data: UserUpdate, *, actor_id: UUID) -> UserResponse:
instance = await self.repository.get_by_id(user_id)
instance.update_from_dict(data.model_dump(exclude_unset=True))
instance.stamp_updated_by(actor_id)
updated = await self.repository.update(instance)
return self.repository.map_to_response(updated)
Use the mixin's helpers (mark_deleted / mark_restored) when you want the deleted_at semantics; use BaseRepository.soft_delete(id) when the existing is_active flag is enough.
Cursor pagination recipe
Cursor pagination scales better than offset pagination on big tables (no COUNT(*), stable under concurrent inserts) at the cost of losing random-access. The SDK provides CursorPaginationFilterSchema, CursorPaginationSchema[T] and the opaque encode_cursor / decode_cursor helpers.
# src/schemas/user.py
from tempest_fastapi_sdk import CursorPaginationFilterSchema, CursorPaginationSchema
from src.schemas.user import UserResponse
class UserCursorFilter(CursorPaginationFilterSchema):
name: str | None = None # ILIKE %value% via repository convention
UserCursorPage = CursorPaginationSchema[UserResponse]
Repository helper (cursor over created_at + id tie-break):
# src/db/repositories/user.py
from sqlalchemy import asc, desc
from tempest_fastapi_sdk import BaseRepository, decode_cursor, encode_cursor
from src.db.models.user import UserModel
from src.schemas.user import UserResponse
class UserRepository(BaseRepository[UserModel]):
model = UserModel
async def cursor_page(
self,
*,
cursor: str | None,
limit: int,
ascending: bool,
filters: dict[str, Any] | None = None,
) -> UserCursorPage:
query = select(UserModel)
if filters:
query = self._apply_filters(query, filters)
order = asc if ascending else desc
query = query.order_by(order(UserModel.created_at), order(UserModel.id))
if cursor is not None:
state = decode_cursor(cursor)
cmp = (UserModel.created_at, UserModel.id) > (state["value"], state["id"])
query = query.where(cmp if ascending else ~cmp)
query = query.limit(limit + 1) # peek one ahead to set has_more
result = await self.session.execute(query)
rows = list(result.unique().scalars().all())
has_more = len(rows) > limit
rows = rows[:limit]
next_cursor = (
encode_cursor(
{"id": str(rows[-1].id), "value": rows[-1].created_at.isoformat()},
)
if has_more and rows
else None
)
return UserCursorPage(
items=[self.map_to_response(r) for r in rows],
next_cursor=next_cursor,
has_more=has_more,
limit=limit,
)
Router:
@router.get("/", response_model=UserCursorPage)
async def list_users(
f: UserCursorFilter = Depends(),
controller: UserController = Depends(get_user_controller),
) -> UserCursorPage:
return await controller.service.repository.cursor_page(
cursor=f.cursor,
limit=f.limit,
ascending=f.ascending,
filters=f.get_conditions(),
)
The cursor is opaque base64-url-safe JSON — clients never inspect it; they pass back the value verbatim until next_cursor becomes null.
Redis cache recipe
AsyncRedisManager wraps redis.asyncio with the same connect/disconnect/health-check surface as AsyncDatabaseManager. Install with [cache].
from tempest_fastapi_sdk.cache import AsyncRedisManager
cache = AsyncRedisManager(settings.REDIS_URL, decode_responses=True)
# Lifespan
await cache.connect()
...
await cache.disconnect()
# Direct use
await cache.client.set("user:123:name", "Ana", ex=300)
name = await cache.client.get("user:123:name")
# FastAPI dependency — yields the live client.
from fastapi import Depends
from redis.asyncio import Redis
@router.get("/cached")
async def cached_endpoint(
redis: Redis = Depends(cache.client_dependency),
) -> dict[str, str]:
value = await redis.get("greeting") or "hello"
return {"value": value}
Wire the health check on the canonical router with make_health_router(checks={"redis": cache.health_check}) so readiness probes fail when Redis is down.
Server-Sent Events recipe
EventStream is an in-memory async queue feeding one SSE HTTP connection. ServerSentEvent encodes one frame; sse_response wraps the byte stream in a Starlette StreamingResponse with SSE-friendly headers.
# src/api/routers/events.py
import asyncio
from fastapi import APIRouter
from tempest_fastapi_sdk import EventStream, sse_response
router = APIRouter()
@router.get("/events")
async def events() -> "StreamingResponse": # forward-declared by Starlette
stream = EventStream(heartbeat_seconds=15.0)
async def producer() -> None:
for n in range(1, 4):
await stream.publish({"n": n}, event="counter", id=str(n))
await asyncio.sleep(1)
await stream.close()
asyncio.create_task(producer())
return sse_response(stream.stream())
Browser side:
const es = new EventSource("/events");
es.addEventListener("counter", (e) => console.log("got", JSON.parse(e.data)));
heartbeat_seconds emits a : keepalive SSE comment when idle so load-balancers don't close long-lived connections. ServerSentEvent.data accepts strings, bytes or any JSON-serializable Python object — non-strings are JSON-encoded automatically. Pass retry= to hint the browser at the reconnect delay (milliseconds).
Web Push notifications recipe
WebPushDispatcher wraps the synchronous pywebpush library in asyncio.to_thread and surfaces the two errors the application cares about: WebPushGoneError (HTTP 404/410 — delete the subscription) and WebPushError (everything else). Install with [webpush].
# src/services/notifications.py
from tempest_fastapi_sdk import (
WebPushDispatcher,
WebPushGoneError,
WebPushPayloadSchema,
WebPushSubscriptionSchema,
)
dispatcher = WebPushDispatcher(
settings.VAPID_PRIVATE_KEY,
vapid_subject="mailto:ops@example.com",
ttl_seconds=60,
)
async def notify_order_paid(
subscription: WebPushSubscriptionSchema,
order_id: str,
) -> None:
payload = WebPushPayloadSchema(
title="Pagamento confirmado",
body=f"Pedido {order_id} aprovado.",
icon="/static/icons/order.png",
data={"orderId": order_id, "url": f"/orders/{order_id}"},
)
try:
await dispatcher.send(subscription, payload)
except WebPushGoneError:
# Prune the subscription from your store.
await subscriptions_repo.delete_by_endpoint(subscription.endpoint)
async def broadcast(subs: list[WebPushSubscriptionSchema], payload: WebPushPayloadSchema) -> None:
gone = await dispatcher.send_many(subs, payload)
if gone:
await subscriptions_repo.delete_by_endpoints(gone)
WebPushSubscriptionSchema round-trips the exact JSON PushSubscription.toJSON() emits in the browser (it aliases expiration_time ↔ expirationTime), so you can store inbound subscriptions verbatim and replay them on dispatch.
Message queues — FastStream recipe
AsyncBrokerManager wraps any FastStream broker (RabbitMQ, Kafka, NATS, Redis Streams) with a uniform connect/disconnect/health-check surface. The broker instance is injected so the SDK doesn't pin a single transport.
Install with [queue] (pulls faststream[rabbit]). Pick the matching FastStream extra for other transports.
# src/queue/__init__.py
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
from tempest_fastapi_sdk.queue import AsyncBrokerManager
from src.core.settings import settings
broker = RabbitBroker(settings.RABBITMQ_URL)
queue = AsyncBrokerManager(broker)
class OrderMessage(BaseModel):
order_id: str
user_id: str
@broker.subscriber("orders.paid")
async def handle_order_paid(msg: OrderMessage) -> None:
await mark_order_paid(msg.order_id, msg.user_id)
# src/api/app.py lifespan
await queue.connect()
...
await queue.disconnect()
# Publish from anywhere in the application
await queue.publish(OrderMessage(order_id="abc", user_id="x"), queue="orders.paid")
The manager exposes:
connect()/disconnect()— idempotent; safe to call from FastAPI lifespan.publish(message, *args, **kwargs)— passthrough tobroker.publishwith aRuntimeErrorguard when the broker isn't started.lifespan()— async context manager handling start/stop, handy for short scripts.broker_dependency— FastAPIDependsthat yields the live broker.health_check()/is_connected— true while the broker is started.
Wire it on the health router with make_health_router(checks={"queue": queue.health_check}).
Background tasks — TaskIQ recipe
AsyncTaskBrokerManager wraps any TaskIQ broker (AioPika for RabbitMQ, Redis, in-memory for tests). Install with [tasks] (pulls taskiq + taskiq-aio-pika).
# src/tasks/__init__.py
from taskiq_aio_pika import AioPikaBroker
from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager
from src.core.settings import settings
tasks = AsyncTaskBrokerManager(AioPikaBroker(settings.TASKIQ_BROKER_URL))
@tasks.task
async def send_welcome_email(to: str, name: str) -> None:
await email_utils.send(
to=to,
subject="Bem-vindo!",
body=f"Olá, {name} — sua conta foi criada.",
)
# src/api/app.py lifespan
await tasks.connect()
...
await tasks.disconnect()
# Enqueue from a request handler
await send_welcome_email.kiq(to=user.email, name=user.name)
register_task(callable, task_name=..., **kwargs) registers a function without decorator syntax — useful when wiring third-party callables that you can't decorate at definition time. For tests, swap the broker for taskiq.InMemoryBroker() so kicked tasks execute synchronously.
The same lifespan guard rails as the queue manager apply: connect()/disconnect()/lifespan()/broker_dependency/health_check()/is_connected.
Periodic tasks scheduler recipe
AsyncTaskScheduler wraps taskiq.TaskiqScheduler + LabelScheduleSource so periodic tasks are declared with decorators alongside regular tasks and the scheduler is driven from the FastAPI lifespan. It does not execute task bodies — it kicks them into the same broker AsyncTaskBrokerManager wraps, so a worker process must be running to consume them. Requires the [tasks] extra.
# src/tasks/__init__.py
from datetime import timedelta
from taskiq_aio_pika import AioPikaBroker
from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager, AsyncTaskScheduler
from src.core.settings import settings
# Use TASKIQ_BROKER_URL (from TaskIQSettings) when the scheduler /
# task broker is a different broker than the FastStream queue
# (RABBITMQ_URL). Reuse the same RabbitMQ URL when they share the
# broker — both env vars can point to the same value.
broker = AioPikaBroker(settings.TASKIQ_BROKER_URL)
tasks = AsyncTaskBrokerManager(broker)
scheduler = AsyncTaskScheduler(broker)
@tasks.task
async def reconcile_invoices(batch_size: int = 100) -> None:
"""Background task — kicked by handlers or the scheduler."""
...
@scheduler.cron("*/5 * * * *") # every five minutes
async def heartbeat() -> None:
"""Liveness ping written to the audit log."""
...
@scheduler.cron("0 9 * * MON-FRI", cron_offset="-03:00") # 09:00 BRT, weekdays
async def daily_digest() -> None:
...
@scheduler.interval(seconds=30) # every 30s
async def poll_remote_queue() -> None:
...
@scheduler.interval(timedelta(minutes=15))
async def warm_cache() -> None:
...
Wire it into the app lifespan next to the broker manager:
# src/api/app.py
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
await tasks.connect()
await scheduler.connect()
await scheduler.run_in_background() # dev / single-process services
try:
yield
finally:
await scheduler.disconnect()
await tasks.disconnect()
Decorator surface:
| Method | When to use |
|---|---|
@scheduler.cron("*/5 * * * *", cron_offset=None) |
Cron expression; pass cron_offset (string like "-03:00" or timedelta) to anchor to a timezone other than UTC. |
@scheduler.interval(seconds=30) / @scheduler.interval(timedelta(...)) |
Fixed-interval recurrence. |
@scheduler.schedule([{...}, {...}]) |
Raw TaskIQ schedule list — combine triggers, use one-shot time, etc. |
scheduler.register(func, schedule=[...], task_name=...) |
Register without decorator syntax (third-party callables). |
Production deployments with multiple workers should run the standalone scheduler CLI instead of run_in_background(), so only one scheduler is active across the cluster:
taskiq scheduler src.tasks:scheduler.scheduler
(scheduler.scheduler is the inner TaskiqScheduler instance exposed on AsyncTaskScheduler.) The worker process stays the same:
taskiq worker src.tasks:tasks.broker
Lifecycle controls mirror the broker manager: connect() / disconnect() / lifespan() / run_in_background() / health_check() / is_connected.
System metrics recipe
MetricsUtils collects CPU, memory, disk and NVIDIA GPU usage via psutil + pynvml. Every method has a sync and an async variant (the async wrapper runs the same code via asyncio.to_thread). GPU sampling gracefully degrades to [] when pynvml or NVIDIA drivers are missing.
Install with [metrics].
from tempest_fastapi_sdk import MetricsUtils
# Synchronous, blocking call
snapshot = MetricsUtils.snapshot(disk_paths=["/", "/data"], cpu_interval=0.1)
print(snapshot.cpu.percent, snapshot.memory.percent)
for disk in snapshot.disks:
print(disk.path, disk.percent)
for gpu in snapshot.gpus:
print(gpu.name, gpu.utilization_percent, gpu.memory_used_bytes)
# Async — runs every collector concurrently via asyncio.gather
snapshot = await MetricsUtils.snapshot_async(disk_paths=["/"])
@router.get("/metrics")
async def metrics() -> dict[str, Any]:
snap = await MetricsUtils.snapshot_async()
return snap.to_dict()
Individual collectors are also available: MetricsUtils.cpu(interval=...), MetricsUtils.memory(), MetricsUtils.disk(path), MetricsUtils.disks(paths), MetricsUtils.gpus() — and their *_async variants. Each returns a typed dataclass (CPUMetrics, MemoryMetrics, DiskMetrics, GPUMetrics, SystemMetrics) with a to_dict() helper for JSON serialization.
Programmatic server entry point recipe
run_server is the canonical helper imported from src/server.py. It centralizes the host / port / reload defaults — pulling values from a ServerSettings-flavoured settings object when present — and keeps the entry point a single line.
# src/server.py
from tempest_fastapi_sdk import run_server
from src.api.app import app # noqa: F401 — re-exported for external runners
from src.core.settings import settings
def run() -> None:
"""Start the API server programmatically."""
run_server("src.api.app:app", settings=settings)
__all__: list[str] = ["app", "run"]
# main.py
from src.server import run
if __name__ == "__main__":
run()
Resolution order for each kwarg is explicit argument → settings.SERVER_* → SDK default ("127.0.0.1" / 8000 / False). Extra uvicorn kwargs (workers=, log_config=, ssl_*=) are forwarded verbatim.
JWT bearer / current-user / role dependencies recipe
Four dependency factories live in tempest_fastapi_sdk.api.dependencies.auth — pick the level of abstraction you need.
| Factory | What you get |
|---|---|
make_token_dependency(secret) |
Validate the X-Token shared-secret header (constant time). |
make_bearer_token_dependency(tokens, soft=False) |
Decode Authorization: Bearer <jwt> and return the claims dict. |
make_jwt_user_dependency(tokens, user_loader, soft=False, subject_claim="sub") |
Decode the bearer JWT, await user_loader(subject), return the loaded user. |
make_role_dependency(tokens, ["admin"], require_all=False, roles_claim="roles") / make_permission_dependency(tokens, ["users:write"], require_all=True, permissions_claim="permissions") |
Decode the bearer JWT and gate the route on roles / permissions. |
# src/api/dependencies/auth.py
from uuid import UUID
from tempest_fastapi_sdk import (
JWTUtils,
make_bearer_token_dependency,
make_jwt_user_dependency,
make_permission_dependency,
make_role_dependency,
)
from src.api.app import db
from src.core.settings import settings
from src.db.models import UserModel
from src.db.repositories import UserRepository
tokens = JWTUtils(
secret=settings.JWT_SECRET,
algorithm=settings.JWT_ALGORITHM,
)
async def load_user(subject: str) -> UserModel:
"""Resolve the JWT subject (a UUID string) to a persisted user."""
async with db.get_session_context() as session:
repo = UserRepository(session)
return await repo.get_by_id(UUID(subject))
require_bearer = make_bearer_token_dependency(tokens)
get_current_user = make_jwt_user_dependency(tokens, load_user)
get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True)
require_admin = make_role_dependency(tokens, ["admin"])
require_users_write = make_permission_dependency(tokens, ["users:write"])
# src/api/routers/users.py
from fastapi import APIRouter, Depends
from src.api.dependencies.auth import (
get_current_user,
require_admin,
require_users_write,
)
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/me")
async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema:
return UserResponseSchema.model_validate(current)
@router.delete("/{user_id}", dependencies=[Depends(require_admin)])
async def delete_user(user_id: UUID) -> None:
...
@router.patch(
"/{user_id}/permissions",
dependencies=[Depends(require_users_write)],
)
async def update_perms(user_id: UUID) -> None:
...
soft=True returns None instead of raising on missing/invalid tokens — useful for endpoints that work both authenticated and anonymous. subject_claim defaults to "sub" but can be any custom claim ("user_id", "uid", ...). Role dependencies accept either a string or a list of strings on the JWT claim; require_all=True requires every listed role/permission, False (default for roles, override for permissions) requires any.
CEP (Brazilian zipcode) recipe
CEP is an Annotated[str, AfterValidator(normalize_cep)] type — drop it into a Pydantic schema and inbound values are accepted as "01310-100" or "01310100", normalized to 8 digits, and rejected (ValidationError → HTTP 422 envelope) when they don't match the shape. CEPs have no check digits, so validation is format-only.
from tempest_fastapi_sdk import BaseSchema
from tempest_fastapi_sdk.utils import CEP
class AddressCreateSchema(BaseSchema):
cep: CEP
street: str
number: str
Imperative variants: is_valid_cep(value), normalize_cep(value), plus CEP_PATTERN for raw regex use. Use them inside services / queue handlers where you don't want a Pydantic round-trip.
Cache decorator recipe
@cached(redis, ttl=..., key_prefix=...) memoizes the result of an async function in Redis. Cache keys are derived from the function's __qualname__ plus a SHA-256 of args/kwargs; pass key_prefix= to namespace entries so invalidation works by prefix scan.
from tempest_fastapi_sdk.cache import AsyncRedisManager, cached
from src.core.settings import settings
redis = AsyncRedisManager(settings.REDIS_URL)
@cached(redis, ttl=300, key_prefix="users:")
async def get_user_profile(user_id: str) -> dict[str, str]:
"""Hits Redis on warm cache; runs the body once every 5 minutes."""
return await load_from_db(user_id)
# Selectively bypass the cache (read AND write) for some calls
@cached(
redis,
ttl=60,
skip_cache=lambda args, kwargs: kwargs.get("fresh") is True,
)
async def list_orders(user_id: str, *, fresh: bool = False) -> list[dict]:
...
Defaults: ttl=300 seconds (0 disables expiry), serializer=json.dumps / deserializer=json.loads. Override serializer / deserializer for non-JSON payloads (Pydantic models — pass model_dump_json / MyModel.model_validate_json, or use pickle.dumps / pickle.loads for arbitrary objects). Corrupt cached values fall back to running the wrapped function and warn on the SDK logger.
Tool-spec router recipe
make_tool_spec_router(spec) mounts a GET /tool-spec endpoint exposing a machine-readable manifest at the root prefix — meant to sit alongside /health/liveness so external callers can discover capabilities without parsing the full OpenAPI document.
# src/api/app.py
from tempest_fastapi_sdk import (
make_health_router,
make_tool_spec_router,
)
def _tool_spec() -> dict[str, object]:
"""Computed per request — keeps version + counts in sync with state."""
return {
"service": "my-service",
"version": settings.VERSION,
"tools": [
{"path": "/api/users", "method": "GET", "summary": "List users"},
{"path": "/api/orders", "method": "POST", "summary": "Place order"},
],
}
def create_app() -> FastAPI:
app = FastAPI(...)
app.include_router(make_health_router(db=db))
app.include_router(make_tool_spec_router(_tool_spec))
...
return app
Pass a dict (served verbatim), a sync callable (called every request) or an async callable (awaited). Override path= to expose the manifest at a different URL or tag= to group it under a different OpenAPI tag.
Webhook signature verification recipe
WebhookSignatureVerifier validates HMAC-signed inbound webhooks (Stripe / GitHub style) and exposes a FastAPI dependency that reads the raw body, checks the signature with hmac.compare_digest, and yields the body bytes so the route handler can re-parse it without re-reading the stream.
# src/api/dependencies/webhooks.py
from tempest_fastapi_sdk import WebhookSignatureVerifier
from src.core.settings import settings
github = WebhookSignatureVerifier(
secret=settings.GITHUB_WEBHOOK_SECRET,
algorithm="sha256",
header_name="X-Hub-Signature-256",
prefix="sha256=",
)
stripe = WebhookSignatureVerifier(
secret=settings.STRIPE_WEBHOOK_SECRET,
algorithm="sha256",
header_name="Stripe-Signature",
encoding="hex",
)
# src/api/routers/webhooks.py
from fastapi import APIRouter, Depends
from src.api.dependencies.webhooks import github
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
@router.post("/github")
async def github_event(body: bytes = Depends(github.dependency())) -> None:
payload = json.loads(body)
...
Supports hex (default) and base64 encodings, any hashlib algorithm guaranteed across platforms, and an optional prefix (e.g. "sha256=") stripped before comparison. Use the imperative verifier.verify(body, signature) from queue handlers when validation happens outside the FastAPI pipeline.
Pagination Link headers recipe
build_pagination_link_header emits an RFC 8288 Link header with first / prev / next / last rels — pair it with (or use instead of) the BasePaginationSchema body wrapper for REST clients that expect GitHub-style headers. Existing query parameters on the base URL are preserved.
from fastapi import Request, Response
from tempest_fastapi_sdk import (
BasePaginationSchema,
build_pagination_link_header,
)
@router.get("", response_model=list[UserResponseSchema])
async def list_users(
request: Request,
response: Response,
filters: UserFilterSchema = Depends(),
controller: UserController = Depends(get_user_controller),
) -> list[UserResponseSchema]:
page: BasePaginationSchema[UserResponseSchema] = await controller.list_paginated(filters)
response.headers["Link"] = build_pagination_link_header(
str(request.url),
page=page.page,
size=page.size,
pages=page.pages,
)
response.headers["X-Total-Count"] = str(page.total)
return page.items
Tweak page_param= / size_param= when your service uses non-standard query parameter names (e.g. offset / limit). Pass extra_params={"sort": "name"} to bake the current sort/filter state into every link.
Rate limit middleware recipe
RateLimitMiddleware is a lightweight in-process sliding-window limiter — each unique key (client IP by default) is allowed at most max_requests requests inside every window_seconds window. Exceeded requests get a 429 Too Many Requests with a Retry-After header.
# src/api/app.py
from tempest_fastapi_sdk import RateLimitMiddleware
def create_app() -> FastAPI:
app = FastAPI(...)
app.add_middleware(
RateLimitMiddleware,
max_requests=120,
window_seconds=60.0,
exempt_paths=("/health/liveness", "/health/readiness"),
)
...
Pass key_func= to partition state by tenant header, authenticated user, or any request attribute:
def by_tenant(request: Request) -> str:
return request.headers.get("X-Tenant", request.client.host or "anon")
app.add_middleware(
RateLimitMiddleware,
max_requests=600,
window_seconds=60.0,
key_func=by_tenant,
)
The state is held in-process — for multi-worker deployments either run a single uvicorn worker behind a single reverse-proxy node, or push rate limiting to the edge (nginx / Cloudflare / AWS WAF). The middleware is intentionally simple; a Redis-backed sliding-window limiter is one issue away if it shows up as a real need.
Outbox dispatcher pattern recipe
The transactional outbox pattern keeps a "to publish" table in the same database as the domain rows, so writing the row and recording the side-effect happen in a single transaction. A worker reads the outbox in order and publishes to RabbitMQ (FastStream) / TaskIQ, marking each row as dispatched only after the broker ACKs. Crashes between commit and publish replay safely on the next poll.
The SDK does not ship a dedicated OutboxDispatcher primitive — the implementation is short, opinionated, and benefits from staying in the service's db/models/ + tasks/ boundary. Use the recipe below.
# src/db/models/outbox.py
from sqlalchemy import JSON, String
from sqlalchemy.orm import Mapped, mapped_column
from tempest_fastapi_sdk import BaseModel
class OutboxEventModel(BaseModel):
"""One row per domain event waiting to be published."""
topic: Mapped[str] = mapped_column(String(128), nullable=False, index=True)
payload: Mapped[dict] = mapped_column(JSON, nullable=False)
status: Mapped[str] = mapped_column(
String(16),
nullable=False,
default="pending",
index=True,
)
# is_active / created_at / updated_at come from BaseModel.
# src/db/repositories/outbox.py
from sqlalchemy import select, update
from tempest_fastapi_sdk import BaseRepository
from src.db.models import OutboxEventModel
class OutboxRepository(BaseRepository[OutboxEventModel]):
model: type[OutboxEventModel] = OutboxEventModel
async def claim_pending(self, *, limit: int = 100) -> list[OutboxEventModel]:
"""Lock-free claim — fine for single-worker dispatcher."""
stmt = (
select(OutboxEventModel)
.where(OutboxEventModel.status == "pending")
.order_by(OutboxEventModel.created_at)
.limit(limit)
)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def mark_dispatched(self, ids: list[str]) -> None:
await self.session.execute(
update(OutboxEventModel)
.where(OutboxEventModel.id.in_(ids))
.values(status="dispatched"),
)
await self.session.commit()
# src/services/orders.py — produce side
from src.db.models import OrderModel, OutboxEventModel
class OrderService:
async def place_order(self, data: OrderCreateSchema) -> OrderResponseSchema:
order = OrderModel(**data.to_dict())
self.repo.session.add(order)
# Same transaction as the order row.
self.repo.session.add(
OutboxEventModel(
topic="orders.placed",
payload={"order_id": str(order.id), "amount": order.amount},
),
)
await self.repo.session.flush()
await self.repo.session.commit()
return self.repo.map_to_response(order)
# src/tasks/__init__.py — dispatcher side
from tempest_fastapi_sdk.tasks import AsyncTaskScheduler
from src.api.app import broker as queue_broker # FastStream AsyncBrokerManager
from src.api.app import db
scheduler = AsyncTaskScheduler(broker)
@scheduler.interval(seconds=5)
async def dispatch_outbox() -> None:
"""Poll the outbox and publish each pending event."""
async with db.get_session_context() as session:
repo = OutboxRepository(session)
events = await repo.claim_pending(limit=100)
if not events:
return
dispatched: list[str] = []
for event in events:
try:
await queue_broker.publish(event.payload, event.topic)
dispatched.append(str(event.id))
except Exception: # noqa: BLE001 — retry on next tick
continue
if dispatched:
await repo.mark_dispatched(dispatched)
Trade-offs to keep in mind:
- Order is best-effort. When a batch contains one failing publish, every later event in the same batch still runs — but they're still published in
created_atorder. If strict ordering matters, break on the first failure. - Single dispatcher. The naive
claim_pendingdoes not lock rows; running multiple dispatcher workers will double-publish. UseSELECT ... FOR UPDATE SKIP LOCKEDon PostgreSQL when you need to scale out. - Retention. Add a periodic
TRUNCATE-style job to deletedispatchedrows older than N days, otherwise the outbox table grows unbounded. - At-least-once. Consumers must be idempotent — the dispatcher can crash after publishing but before
mark_dispatched.
Migration guide 0.7 → 0.8
0.8.0 renames every field on ServerSettings, extracts log fields to a new LogSettings mixin, and adds eleven other primitives. The renames are the only breaking changes — every new primitive is opt-in.
1. Rename env vars
| Old | New | Mixin |
|---|---|---|
HOST |
SERVER_HOST |
ServerSettings |
PORT |
SERVER_PORT |
ServerSettings |
DEBUG |
SERVER_DEBUG |
ServerSettings |
| (new) | SERVER_RELOAD |
ServerSettings |
LOG_LEVEL |
LOG_LEVEL |
moved to LogSettings |
LOG_JSON |
LOG_JSON |
moved to LogSettings |
Mechanical sed on every .env / docker-compose.yml / deployment manifest:
sed -i \
-e 's/^HOST=/SERVER_HOST=/' \
-e 's/^PORT=/SERVER_PORT=/' \
-e 's/^DEBUG=/SERVER_DEBUG=/' \
.env .env.example .env.test
LOG_LEVEL and LOG_JSON keep their names — only the mixin moves.
2. Rename code references
# `settings.HOST` → `settings.SERVER_HOST`, same for PORT/DEBUG
grep -rn "settings\.\(HOST\|PORT\|DEBUG\)\b" src/ tests/
Replace each match with the SERVER_* form. If a service was using the
old settings.DEBUG flag for application-level debug behavior, switch
to settings.SERVER_DEBUG; if it was only being read for uvicorn
auto-reload, switch to settings.SERVER_RELOAD.
3. Mix LogSettings into the project Settings
from tempest_fastapi_sdk import (
BaseAppSettings,
CORSSettings,
DatabaseSettings,
JWTSettings,
+ LogSettings,
RabbitMQSettings,
RedisSettings,
ServerSettings,
)
class Settings(
ServerSettings,
+ LogSettings,
DatabaseSettings,
RedisSettings,
RabbitMQSettings,
JWTSettings,
CORSSettings,
BaseAppSettings,
):
...
Skip this step if the service never read settings.LOG_LEVEL /
settings.LOG_JSON — configure_logging accepts the values as
keyword arguments directly.
4. (Optional) Adopt the new primitives
Pick what fits. None of these are required.
- Replace the hand-written
src/server.pyuvicorn.run(...)withrun_server(...). - Replace the hand-written
get_current_userwithmake_jwt_user_dependency(tokens, load_user). - Move
SMTP_*/UPLOAD_*/TOKEN_SECRET/VAPID_*/TASKIQ_*fields out of the project'sSettingsand onto the matching SDK mixin (Settings mixins composition). - Adopt the
Outbox dispatcher patternif you already write side-effects from the same transaction as your domain rows.
5. Verify
uv sync # picks up new pyproject deps
uv run pytest -q # full suite
uv run ruff check src tests # confirm no `HOST`/`PORT`/`DEBUG` references slipped
If pytest fails with a Pydantic ValidationError referencing
HOST / PORT / DEBUG, an env var was not renamed (look at the
process environment or .env).
Reference
BaseRepository methods
| Method | Purpose | Raises on miss |
|---|---|---|
get(filters, for_update=False) |
Single record matching filters | ✅ not_found_exception |
get_or_none(filters, for_update=False) |
Single record or None |
— |
get_by_id(id, for_update=False) |
Shortcut for get({"id": id}) |
✅ not_found_exception |
exists(filters) |
bool without loading the row |
— |
first(filters, order_by, ascending) |
First match ordered | — |
list(filters, order_by, ascending) |
All rows; [] when empty |
— |
paginate(filters, order_by, page, page_size, ascending, query=) |
dict with items, total, page, size, pages |
— |
count(filters) |
Row count | — |
add(model) |
Insert single | ConflictException on integrity |
add_all(models) |
Insert batch | ConflictException on integrity |
update(model) |
Commit mutated instance | ConflictException on integrity |
update_many(models) |
Commit batch | ConflictException on integrity |
bulk_update(filters, values) |
UPDATE ... WHERE mass mutation; rejects empty filters |
ConflictException on integrity |
delete(id) |
Hard delete by PK | ✅ not_found_exception |
delete_many(filters) |
Hard delete by filter | — |
delete_batch(ids) |
Hard delete several PKs | — |
soft_delete(id) |
Flip is_active=False; returns row |
✅ not_found_exception |
restore(id) |
Flip is_active=True; returns row |
✅ not_found_exception |
map_to_schema(instance) |
Override in subclass | NotImplementedError |
map_to_response(instance) |
Override in subclass | NotImplementedError |
map_to_model(data) |
Default: self.model(**data) |
— |
Filter dict conventions
The dict passed to get / list / paginate / count / exists / delete_many / bulk_update understands these patterns out of the box:
| Filter shape | Generated SQL |
|---|---|
{"col": value} |
col = value |
{"col": [a, b]} |
col IN (a, b) |
{"col": True} / {"col": False} |
col IS TRUE / col IS FALSE |
{"name": "ana"} (string field literally named name) |
name ILIKE '%ana%' |
{"col": date(2024, 1, 1)} (date value) |
date(col) = '2024-01-01' |
{"start_in": date(...)} |
date(date_col_or_created_at) >= ... |
{"end_in": date(...)} |
date(date_col_or_created_at) <= ... |
{"col": None} |
filter is skipped (omit-when-None semantics) |
Pass the dict from BasePaginationFilterSchema.get_conditions() for query-string-driven filters.
Lifecycle managers
Every SDK-shipped manager follows the same core shape: connect() to start,
disconnect() to stop, and health_check() to plug into
make_health_router(checks=...). Brokers and the scheduler additionally
expose lifespan() (async context manager) and is_connected
(read-only state). The tables below list the manager-specific surface.
AsyncDatabaseManager
| Method / Property | Purpose |
|---|---|
__init__(url, *, echo=False, pool_size=10, max_overflow=20, pool_recycle=3600, **engine_kwargs) |
Build the manager (engine is lazy). |
await connect() |
Create the engine + sessionmaker. |
await disconnect() |
Dispose the engine. |
is_connected (property) |
True while the engine is alive. |
db_url_safe (property) |
DSN with password redacted (safe to log). |
await get_session() |
Return a single AsyncSession. |
async with get_session_context() |
Yield an AsyncSession; commits on success, rolls back on raise. |
async session_dependency() |
FastAPI Depends-compatible generator. |
await create_tables() |
Issue CREATE TABLE for every model on BaseModel.metadata (tests / local dev — production schemas go through Alembic). |
await drop_tables() |
Issue DROP TABLE for every model on BaseModel.metadata. |
await health_check() |
Ping with SELECT 1; returns bool. |
AsyncRedisManager (extra: [cache])
| Method / Property | Purpose |
|---|---|
__init__(url, *, decode_responses=True, **client_kwargs) |
Wrap a redis.asyncio.Redis client. |
await connect() |
Build the client. |
await disconnect() |
Close the client + release the pool. |
client (property) |
Live Redis instance; raises RuntimeError before connect. |
async with get_client_context() |
Yield the same client inside an async with. |
async client_dependency() |
FastAPI Depends-compatible generator. |
await health_check() |
PING returns bool. |
Pair with @cached(redis, ttl=..., key_prefix=...) for function-level memoization — see the Cache decorator recipe.
AsyncBrokerManager (extra: [queue])
| Method / Property | Purpose |
|---|---|
__init__(broker) |
Wrap any FastStream broker (RabbitBroker, KafkaBroker, ...). |
await connect() |
Start the broker; idempotent. |
await disconnect() |
Stop the broker. |
broker (attribute) |
The wrapped FastStream broker — use broker.publisher(...) / broker.subscriber(...) directly. |
await publish(message, *args, **kwargs) |
Forward to broker.publish; raises before connect. |
async with lifespan() |
Connect on enter, disconnect on exit. |
async broker_dependency() |
FastAPI Depends-compatible generator yielding the broker. |
await health_check() |
True while the broker is started. |
is_connected (property) |
Read-only state. |
AsyncTaskBrokerManager (extra: [tasks])
| Method / Property | Purpose |
|---|---|
__init__(broker) |
Wrap any TaskIQ broker (AioPikaBroker, RedisBroker, InMemoryBroker, ...). |
await connect() |
broker.startup(); idempotent. |
await disconnect() |
broker.shutdown(). |
broker (attribute) |
The wrapped TaskIQ broker. |
task(*args, **kwargs) |
Decorator forwarding to broker.task. |
register_task(func, *, task_name=None, **kwargs) |
Register without decorator syntax. |
async with lifespan() |
Connect on enter, disconnect on exit. |
async broker_dependency() |
FastAPI Depends-compatible generator. |
await health_check() |
True while the broker is started. |
is_connected (property) |
Read-only state. |
AsyncTaskScheduler (extra: [tasks])
| Method / Property | Purpose |
|---|---|
__init__(broker, sources=None) |
Wrap TaskiqScheduler + LabelScheduleSource (default). |
broker (attribute) |
Same broker tasks are kicked into. |
sources (attribute) |
List of ScheduleSource instances. |
scheduler (attribute) |
Underlying taskiq.TaskiqScheduler. |
@cron(expr, *, cron_offset=None, task_name=None, **labels) |
Register a cron-scheduled task. |
@interval(seconds_or_timedelta, *, task_name=None, **labels) |
Register a fixed-interval task. |
@schedule(spec, *, task_name=None, **labels) |
Register with raw TaskIQ schedule list. |
register(func, *, schedule, task_name=None, **labels) |
Decorator-free registration. |
await connect() |
scheduler.startup() + every source.startup(). |
await disconnect() |
Cancel the background loop (if any) and shut down. |
await run_in_background() |
Spawn an in-process SchedulerLoop task (dev / single-process). |
async with lifespan() |
Connect on enter, disconnect on exit. |
await health_check() |
True while started and (when applicable) the loop is alive. |
is_connected (property) |
Read-only state. |
Production deployments should run the standalone CLI instead of run_in_background():
taskiq worker src.tasks:tasks.broker
taskiq scheduler src.tasks:scheduler.scheduler
Error envelope
Every AppException (and any subclass) is serialized by register_exception_handlers into:
{
"detail": "Human-readable message",
"code": "MACHINE_READABLE_CODE",
"details": {"any": "structured context"}
}
| Exception | Default status_code |
Default code |
|---|---|---|
AppException |
500 | INTERNAL_SERVER_ERROR |
NotFoundException |
404 | NOT_FOUND |
ConflictException |
409 | CONFLICT |
ValidationException |
422 | VALIDATION_ERROR |
UnauthorizedException |
401 | UNAUTHORIZED |
InvalidTokenException |
401 | INVALID_TOKEN |
ExpiredTokenException |
401 | TOKEN_EXPIRED |
ForbiddenException |
403 | FORBIDDEN |
FileTooLargeException |
413 | FILE_TOO_LARGE |
InvalidFileTypeException |
415 | INVALID_FILE_TYPE |
Subclasses set message/code/status_code as class attributes; instances can override message and attach a details dict at construction time.
Conventions
- Layered architecture: routers → controllers → services → repositories. Never skip layers.
- Async-first: every I/O method is
async. Use SQLAlchemy 2.0 patterns (select, notsession.query()). - Collections return
[]: never raise on empty results. Only single-resource lookups raiseNotFoundException. - Soft delete by default:
is_active=Falseinstead of physical delete when applicable. - UTC everywhere: timestamps normalized via
to_utc;BaseResponseSchemaenforces this on field validators. - Double quotes: enforced by
ruff format. - Full typing: every parameter, return value and class attribute is typed.
Anyis explicit, never implicit. - Docstrings in English: Google-style covering description / args / returns / raises.
- Frontend branches on
code, not on the (translatable) message.
Development
make install # uv sync --all-extras
make test # pytest with coverage
make lint # ruff check .
make fmt # ruff format .
make type # mypy --strict
make check # lint + fmt-check + type + test (every gate)
make ci # check + build + smoke install in a clean venv (mirrors GitHub Actions)
make build # uv build → dist/
make clean # remove caches and build artifacts
Run make (or make help) to list every target. The Makefile is just thin wrappers around uv — direct invocations still work too:
uv sync --all-extras
uv run pytest
uv run ruff check .
uv run mypy tempest_fastapi_sdk
uv build
The CI gate (.github/workflows/ci.yml) runs the equivalent of make ci on every push and pull request against main. The wheel-build smoke step installs the freshly built artifact into a clean Python 3.13 virtualenv and imports the top-level surface to guard against the empty-wheel / missing-package-data class of bugs.
Release
Releases are published to PyPI automatically when a v*.*.* tag is pushed.
The pipeline (.github/workflows/release-pypi.yml) does three things:
- Version sanity check. The tag (
v0.1.0),pyproject.toml'sversionfield andtempest_fastapi_sdk.__version__must all match. Mismatched releases abort before any artifact is built. - Build + verify. Run the full validation suite (tests + lint + mypy), build sdist/wheel with
uv build, check metadata withtwine check, and verify the wheel actually contains the package files + the bundledenv.py.templateAlembic template. - Publish via Trusted Publishing. Uses OIDC against PyPI — no long-lived API tokens stored in repo secrets.
Cutting a release
make release VERSION=0.2.0
This single target:
- Refuses to run if the working tree is dirty.
- Bumps
pyproject.tomlandtempest_fastapi_sdk/__init__.pyto the requested version. - Runs
make check(lint + format + mypy + tests) so a broken commit never gets tagged. - Commits the bump as
chore: release v0.2.0and creates thev0.2.0tag locally. - Prints the two
git pushcommands you still need to run — pushing is left manual on purpose so you can review the commit one last time.
# Review then push
git show v0.2.0
git push origin main
git push origin v0.2.0
GitHub Actions picks up the tag, runs the release pipeline and publishes the artifacts to PyPI.
Manual flow (no Makefile) — same result:
$EDITOR pyproject.toml # version = "0.2.0"
$EDITOR tempest_fastapi_sdk/__init__.py # __version__ = "0.2.0"
git commit -am "chore: release v0.2.0"
git tag v0.2.0
git push origin main v0.2.0
One-time PyPI Trusted Publishing setup
PyPI needs to know which workflow is allowed to publish on the project's behalf. Done once per project:
- Create the project on PyPI (name:
tempest-fastapi-sdk). For brand-new projects, register a placeholder release manually first or usependingpublishers so the first OIDC-driven upload can create it. - On the project's PyPI settings page, add a Trusted Publisher pointing to:
- Owner:
mauriciobenjamin700 - Repository:
tempest-fastapi-sdk - Workflow filename:
release-pypi.yml - Environment:
pypi(must matchrelease-pypi.yml'senvironment.name)
- Owner:
- In the GitHub repository, create an environment named
pypi(Settings → Environments → New environment). Optionally restrict deployments to tags matchingv*.*.*for an extra safety net.
After this, every v*.*.* tag triggers a publish. No PYPI tokens are stored anywhere.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tempest_fastapi_sdk-0.8.0.tar.gz.
File metadata
- Download URL: tempest_fastapi_sdk-0.8.0.tar.gz
- Upload date:
- Size: 329.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c23ba2cad3bb84f4e4f396d9a7605331b6ca4a418ef9491c98ba60a4dd6c971f
|
|
| MD5 |
3be256abdf70fefa6f75d8af3306d349
|
|
| BLAKE2b-256 |
e0a524eee775f7f170e1b4254ab416491c7d731d3cc852b3a7ea21038cec6120
|
Provenance
The following attestation bundles were made for tempest_fastapi_sdk-0.8.0.tar.gz:
Publisher:
release-pypi.yml on mauriciobenjamin700/tempest-fastapi-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
tempest_fastapi_sdk-0.8.0.tar.gz -
Subject digest:
c23ba2cad3bb84f4e4f396d9a7605331b6ca4a418ef9491c98ba60a4dd6c971f - Sigstore transparency entry: 1563762688
- Sigstore integration time:
-
Permalink:
mauriciobenjamin700/tempest-fastapi-sdk@a90af867d1eb276baa08116afa2775c5a4ced095 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/mauriciobenjamin700
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-pypi.yml@a90af867d1eb276baa08116afa2775c5a4ced095 -
Trigger Event:
push
-
Statement type:
File details
Details for the file tempest_fastapi_sdk-0.8.0-py3-none-any.whl.
File metadata
- Download URL: tempest_fastapi_sdk-0.8.0-py3-none-any.whl
- Upload date:
- Size: 140.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c2ebce773a3cd074b4d594b4fc0658ba3a92bcd9c466e0e283e27f387046dff8
|
|
| MD5 |
1ad2785919cda3d1c086447a0e270bb5
|
|
| BLAKE2b-256 |
a83dcb24095181dff43083a9c10ec4f3e11951b2a9b4e20c415ba65e755a5897
|
Provenance
The following attestation bundles were made for tempest_fastapi_sdk-0.8.0-py3-none-any.whl:
Publisher:
release-pypi.yml on mauriciobenjamin700/tempest-fastapi-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
tempest_fastapi_sdk-0.8.0-py3-none-any.whl -
Subject digest:
c2ebce773a3cd074b4d594b4fc0658ba3a92bcd9c466e0e283e27f387046dff8 - Sigstore transparency entry: 1563762720
- Sigstore integration time:
-
Permalink:
mauriciobenjamin700/tempest-fastapi-sdk@a90af867d1eb276baa08116afa2775c5a4ced095 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/mauriciobenjamin700
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-pypi.yml@a90af867d1eb276baa08116afa2775c5a4ced095 -
Trigger Event:
push
-
Statement type: