Skip to main content

Generic audit library for applications

Project description

🔐 audit-recorder

Une librairie complète et flexible pour auditer les actions dans vos applications Python, avec support complet async/sync et extensibilité.

Python Version SQLAlchemy License

✨ Features

  • 📋 Audit complet: CREATE, UPDATE, DELETE, et actions custom
  • 🔄 Async/Sync: Support natif de async et sync
  • 🎯 Décorateurs simples: @audit, @transactional, @audit_entity
  • ⚙️ Configuration centralisée: AuditConfig pour simplifier le setup
  • 🧩 Extensible: Registry de sérializers pour vos types custom
  • Flexible: Support multi-formats de tokens et contextes utilisateur
  • 💾 Champs dynamiques: extra_fields pour enrichir vos logs
  • 🔑 Clés composées: Support natif des tables à index multiples

🚀 Installation

pip install audit-recorder

📖 Quick Start

1️⃣ Configuration simple

from audit_recorder import AuditConfig
from sqlalchemy import create_engine

# Initialiser la config
config = AuditConfig(
    user_id_extractor=lambda token: token.sub,
    user_email_extractor=lambda token: token.email,
)
config.init()

2️⃣ Enregistrer vos entités

from audit_recorder import audit_entity
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import declarative_base

Base = declarative_base()

@audit_entity
class Article(Base):
    __tablename__ = "articles"
    id = Column(Integer, primary_key=True)
    title = Column(String)

3️⃣ Décorer vos services

from audit_recorder import audit, transactional

@transactional  # Commit/rollback auto
@audit(action='UPDATE', resource_type='Article', id_param='article_id')
def update_article(session, article_id: int, title: str, id_token):
    article = session.query(Article).get(article_id)
    article.title = title
    # ✅ Pas besoin de commit, @transactional s'en charge
    return article

Paramètres obligatoires dans la signature de la fonction décorée :

  • session : doit être nommé session (instance Session SQLAlchemy)
  • id_token : doit correspondre au user_param configuré dans AuditConfig (défaut : 'id_token')

id_param — capture des snapshots avant/après

id_param indique à @audit comment récupérer l'ID de la ressource avant l'appel métier, ce qui permet de charger le snapshot old_values.

Cas id_param old_values new_values
CREATE — retourne l'entité avec id omis ✅ résolu depuis le retour
CREATE — retourne un DTO différent 'payload.id'
UPDATE / DELETE / action custom 'article_id'
Clé composite ['user_id', 'role_id']
# Chemin simple vers un argument
@audit(action='DELETE', resource_type='Article', id_param='article_id')
def delete_article(session, article_id: int, id_token): ...

# Chemin imbriqué (argument objet)
@audit(action='APPROVE', resource_type='DemandeAcces', id_param='db_demande.id')
def approve(session, db_demande: DemandeAcces, id_token): ...

# Liste de chemins (clé composite)
@audit(action='UPDATE', resource_type='UserRole', id_param=['user_id', 'role_id'])
def update_user_role(session, user_id: int, role_id: int, id_token): ...
# → resource_id stocké : {"user_id": 1, "role_id": 42}

# Callable pour une logique custom
@audit(action='UPDATE', resource_type='Article', id_param=lambda args: {'id': args['payload'].article_id})
def update_article(session, payload: UpdatePayload, id_token): ...

4️⃣ Consulter les logs

from audit_recorder import service

# Récupérer les logs d'un article (clé simple)
logs = service.query_logs(
    db,
    resource_type='Article',
    resource_id={'id': '42'},
    page=1,
    limit=20
)

# Clé composite
logs = service.query_logs(
    db,
    resource_type='UserRole',
    resource_id={'user_id': '1', 'role_id': '42'},
)

for log in logs.results:
    print(f"{log.action} on {log.resource_type} by {log.user_email}")
    print(f"  Old: {log.old_values}")
    print(f"  New: {log.new_values}")
    print(f"  Changes: {log.changes}")

🔧 Configuration centralisée (AuditConfig)

AuditConfig centralise tous les paramétrages de la librairie dans une seule classe, éliminant le boilerplate.

Attributs

Paramètre Type Description
user_id_extractor Callable Fonction pour extraire l'ID utilisateur
user_email_extractor Callable Fonction pour extraire l'email utilisateur
extra_fields_resolver Callable Fonction pour extraire les champs dynamiques
default_skip_actions list Actions à ignorer par défaut
custom_serializers dict Sérializers pour vos types custom

Exemples

Avec champs dynamiques:

def extract_audit_fields(action, resource_type, resource_id, result):
    return {
        "ip_address": request.remote_addr,
        "user_agent": request.headers.get("user-agent"),
        "timestamp": datetime.now().isoformat(),
    }

config = AuditConfig(
    user_id_extractor=lambda token: token.sub,
    user_email_extractor=lambda token: token.email,
    extra_fields_resolver=extract_audit_fields,
)
config.init()

Avec sérializers custom:

from decimal import Decimal
from datetime import date

config = AuditConfig(
    user_id_extractor=lambda token: token.sub,
    custom_serializers={
        date: lambda d: d.isoformat(),
        Decimal: lambda dec: float(dec),
    }
)
config.init()

🏷️ Enregistrement automatique d'entités (@audit_entity)

Le décorateur @audit_entity enregistre automatiquement le resolver pour une entité, sans code répétitif.

Simple (auto-détection)

Sans loader explicite, @audit_entity cherche dans l'ordre :

  1. Une classmethod load(cls, session, resource_id) sur la classe
  2. Sinon, fallback sur session.query(cls).get(resource_id)
@audit_entity
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # Pas de load() → utilise session.query(User).get(resource_id)

Avec load() défini :

@audit_entity
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)

    @classmethod
    def load(cls, session, resource_id):
        # Chargé automatiquement pour les snapshots avant/après
        return session.query(cls).options(joinedload(cls.role)).get(resource_id)

Avec loader personnalisé

def load_user(session, user_id):
    return session.query(User).filter(User.uuid == user_id).first()

@audit_entity(loader=load_user)
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    uuid = Column(String, unique=True)

Avec snapshot de relations (snapshot=)

snapshot= génère automatiquement un loader qui charge l'entité avec une session fraîche (évite l'identity map) et applique les options SQLAlchemy fournies. noload('*') est ajouté automatiquement pour exclure toutes les relations non listées.

C'est l'approche recommandée quand vous souhaitez inclure des relations dans le snapshot sans écrire de loader manuellement.

from sqlalchemy.orm import selectinload
from audit_recorder import audit_entity

@audit_entity(snapshot=lambda cls: [
    selectinload(cls.service).load_only(Service.nom),
    selectinload(cls.compte).load_only(Compte.prenom, Compte.nom, Compte.email),
    selectinload(cls.tags),   # sans load_only = toutes les colonnes
])
class Demande(Base):
    __tablename__ = "demandes"
    id = Column(Integer, primary_key=True)

Le lambda cls: [...] (au lieu d'une liste directe) est recommandé pour éviter les problèmes de forward references : les attributs de la classe ne sont résolus qu'au premier appel du loader, après que tous les mappers SQLAlchemy sont initialisés.

snapshot= est ignoré si loader= est fourni explicitement.

Avec resource_type custom

@audit_entity(resource_type="CustomUser")
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)

⚠️ Sans resource_type explicite, la clé enregistrée est le nom de la classe (cls.__name__), sans underscore. DemandeAcces → clé "DEMANDEACCES", ce qui ne correspond pas à "DEMANDE_ACCES". Si vous utilisez des enums ou des noms avec underscores, passez resource_type explicitement aux deux décorateurs.

Avec un identifiant autre que id

Par défaut, @audit_entity utilise l'attribut id comme identifiant de la ressource. C'est cet attribut qui est utilisé :

  • pour extraire l'ID depuis l'objet retourné par une action CREATE (quand id_param n'est pas renseigné dans @audit)
  • pour charger les snapshots avant/après via le loader

Si ta clé primaire ne s'appelle pas id, précise-le avec identifier :

@audit_entity(identifier='uuid')
class Article(Base):
    __tablename__ = "articles"
    uuid = Column(String, primary_key=True)
    title = Column(String)

Avec clé composite (identifier liste)

Pour les tables avec plusieurs colonnes de clé primaire (index composite), passez une liste de noms d'attributs :

@audit_entity(identifier=['user_id', 'role_id'])
class UserRole(Base):
    __tablename__ = "user_roles"
    user_id = Column(Integer, primary_key=True)
    role_id = Column(Integer, primary_key=True)

Le resource_id sera automatiquement stocké comme dict : {"user_id": 1, "role_id": 42}.

Compatible avec @audit(id_param=['user_id', 'role_id']) :

@audit(action='UPDATE', resource_type='UserRole', id_param=['user_id', 'role_id'])
def update_user_role(session, user_id: int, role_id: int, id_token): ...

Ou délègue entièrement la résolution au loader :

@audit_entity(loader=lambda session, rid: session.query(Article).filter_by(uuid=rid).first())
class Article(Base):
    __tablename__ = "articles"
    uuid = Column(String, primary_key=True)

🔄 Transaction automatique (@transactional)

Le décorateur @transactional gère automatiquement le commit et le rollback, éliminant les bugs "changement sans audit" ou "audit orphelin".

@transactional
@audit(action='UPDATE', resource_type='Article')
def update_article(session, article_id: int, title: str, id_token):
    article = session.query(Article).get(article_id)
    article.title = title
    # ✅ Commit auto, rollback auto en cas d'erreur
    # ❌ Pas de session.commit() nécessaire
    return article

# Utilisation
try:
    article = update_article(db_session, 1, "New Title", token)
    # Déjà committée !
except ValueError:
    # Déjà rollbackée !
    pass

Support Async/Sync automatique

# Async - il suffit de passer async
@transactional
@audit(action='UPDATE', resource_type='Article')
async def update_article(session, article_id: int, id_token):
    # Marche aussi en async !
    pass

# Sync - marche aussi normalement
@transactional
@audit(action='UPDATE', resource_type='Article')
def update_article(session, article_id: int, id_token):
    # Marche en sync
    pass

🎨 Sérializers extensibles

Enregistrez des sérializers custom pour vos types propriétaires.

Fonction simple

from audit_recorder import register_serializer
from datetime import date

# Enregistrer pour un type simple
register_serializer(date, lambda d: d.isoformat())

# Ensuite, les dates sont auto-sérialisées :
article.published_on = date(2024, 1, 15)
# Audit stocke : {"published_on": "2024-01-15"} ✅

Classe custom

from audit_recorder import Serializer, register_serializer

class DateRangeSerializer(Serializer):
    def serialize(self, value):
        return {
            'start': value.start.isoformat(),
            'end': value.end.isoformat(),
        }

register_serializer(DateRange, DateRangeSerializer())

Récupérer un sérializer

from audit_recorder import get_serializer

serializer = get_serializer(date)
if serializer:
    result = serializer.serialize(date(2024, 1, 15))
    # result: "2024-01-15"

🗂️ Champs dynamiques (extra_fields)

Enrichissez vos logs avec des données contextuelles dynamiques.

Définir un resolver

from audit_recorder import AuditConfig
from datetime import datetime

def extract_audit_fields(action, resource_type, resource_id, result):
    return {
        "timestamp": datetime.now().isoformat(),
        "user_role": current_user.role,
        "ip_address": request.remote_addr,
    }

config = AuditConfig(
    extra_fields_resolver=extract_audit_fields,
)
config.init()

Accéder aux champs

logs = service.query_logs(db, resource_type="Article")

for log in logs.results:
    # Accès direct (via model_validator flatten_extra_fields)
    print(f"IP: {log.ip_address}")
    print(f"Role: {log.user_role}")
    print(f"Timestamp: {log.timestamp}")
    
    # Ou via le dict
    print(f"All extra: {log.extra_fields}")

⏭️ Skip audit conditionnellement

Désactiver l'audit dans un contexte spécifique.

Action unique

from audit_recorder import skip_audit

with skip_audit('EXPORT'):
    export_data(session)  # ← Pas auditée
    
# Les autres actions sont auditées normalement
update_article(session)  # ← Auditée

Plusieurs actions

with skip_audit('EXPORT', 'SYNC', 'CLEANUP'):
    do_batch_operation(session)  # ← Aucune de ces 3 actions auditée

Tout désactiver

with skip_audit():  # Pas d'argument = tout désactiver
    do_sensitive_operation(session)  # ← Aucune action auditée

Imbriqué (fusion)

with skip_audit('CREATE'):
    create_user(session)  # ← CREATE non auditée
    
    with skip_audit('UPDATE'):
        update_user(session)  # ← Ni CREATE ni UPDATE auditées
    
    update_user(session)  # ← CREATE non auditée, UPDATE auditée

delete_user(session)  # ← DELETE auditée

🗄️ Requêtes sur les logs

Filtres disponibles

Paramètre Type Description
resource_type str | list[str] Type(s) de ressource
resource_id dict Identifiant exact (clé simple : {"id": "42"}, clé composite : {"user_id": "1", "role_id": "42"})
action str | list[str] Action(s) à filtrer
user_id str ID utilisateur (insensible à la casse)
user_email str Email (recherche partielle, insensible à la casse)
date_from datetime Borne inférieure sur created_at
date_to datetime Borne supérieure sur created_at
page int Numéro de page (défaut : 1)
limit int Résultats par page (défaut : 20)
populate Callable Enrichissement des DTOs après chargement
from audit_recorder import service

# Filtre simple
logs = service.query_logs(db, resource_type='Article', action='UPDATE')

# resource_type et action acceptent aussi une liste
logs = service.query_logs(
    db,
    resource_type=['Article', 'Comment'],
    action=['CREATE', 'UPDATE', 'DELETE'],
    user_id='user_123',
    date_from=datetime(2024, 1, 1),
    date_to=datetime(2024, 12, 31),
    page=1,
    limit=50,
)

print(f"Total: {logs.total} — Pages: {logs.pages}")
for log in logs.results:
    print(log)

Query par ID

log = service.query_log_by_id(db, 'audit_id_123')
print(log)

Enrichir les logs

Les logs stockent des IDs bruts (resource_id, user_id). Pour afficher des labels lisibles sur une interface, passez un populate : une fonction appelée sur chaque DTO après chargement, qui peut y ajouter des champs arbitraires.

def populate(log: AuditLogDTO, db) -> AuditLogDTO:
    if log.resource_type == 'Article':
        article = db.get(Article, log.resource_id)
        log.resource_label = article.title if article else log.resource_id
    if log.user_id:
        user = db.get(User, log.user_id)
        log.user_display_name = user.full_name if user else log.user_email
    return log

logs = service.query_logs(db, resource_type='Article', populate=populate)

for log in logs.results:
    print(log.resource_label)    # "Mon article" au lieu de "42"
    print(log.user_display_name) # "Jean Dupont" au lieu de "user_123"

Fonctionne aussi sur query_log_by_id :

log = service.query_log_by_id(db, 'audit_id_123', populate=populate)
print(log.resource_label)

Les champs ajoutés par populate sont directement accessibles sur le DTO (log.resource_label) grâce au extra='allow' de Pydantic.

Filtrer sur les extra_fields ou new_values (filtres JSON)

extra_fields_filter et new_values_filter permettent de filtrer en SQL sur les colonnes JSON extra_fields et new_values. La pagination reste entièrement côté base.

Le mode de comparaison est configurable indépendamment pour chaque filtre via extra_fields_filter_mode et new_values_filter_mode :

Mode Comportement
'ilike' (défaut) Recherche partielle, insensible à la casse (ILIKE '%value%')
'exact' Égalité stricte, insensible à la casse
# Recherche partielle sur extra_fields (défaut)
logs = service.query_logs(
    db,
    extra_fields_filter={'role': 'adm'},              # correspond à "admin", "superadmin"…
)

# Égalité stricte sur new_values
logs = service.query_logs(
    db,
    resource_type='DemandeAcces',
    new_values_filter={'status': 'approved'},
    new_values_filter_mode='exact',
)

# Modes différents sur chaque filtre
logs = service.query_logs(
    db,
    extra_fields_filter={'role': 'adm'},
    extra_fields_filter_mode='ilike',
    new_values_filter={'status': 'approved'},
    new_values_filter_mode='exact',
)

Structure d'un log

print(log.id)              # str UUID
print(log.action)          # str "CREATE", "UPDATE", "DELETE", etc.
print(log.resource_type)   # str "Article", "User", etc.
print(log.resource_id)     # dict | None  ex: {"id": "42"} ou {"user_id": "1", "role_id": "42"}
print(log.user_id)         # str | None
print(log.user_email)      # str | None
print(log.old_values)      # dict | None Snapshot avant
print(log.new_values)      # dict | None Snapshot après
print(log.changes)         # dict | None Diff précis
print(log.extra_fields)    # dict | None Champs dynamiques
print(log.created_at)      # datetime

📦 Schema & Migrations

Sans Alembic (dev / nouveau projet)

Crée la table directement via l'engine SQLAlchemy :

from sqlalchemy import create_engine
from audit_recorder.models import Base, AuditLog

engine = create_engine('postgresql://...')
Base.metadata.create_all(engine, tables=[AuditLog.__table__])

Avec Alembic (production)

Expose le Base d'audit_recorder dans env.py pour que l'autogenerate détecte la table audit_logs :

# alembic/env.py
from audit_recorder.models import Base as AuditBase
from myapp.models import Base as AppBase

target_metadata = [AppBase.metadata, AuditBase.metadata]

Ensuite :

alembic revision --autogenerate -m "add audit_logs table"
alembic upgrade head

Schema

CREATE TABLE audit_logs (
  -- Identification
  id VARCHAR(36) PRIMARY KEY,
  
  -- QUI (utilisateur)
  user_id VARCHAR(255),
  user_email VARCHAR(255),
  
  -- QUOI (action)
  action VARCHAR(50) NOT NULL,
  resource_type VARCHAR(100) NOT NULL,
  resource_id JSON,        -- {"id": "42"} ou {"user_id": "1", "role_id": "42"}
  
  -- SNAPSHOTS (changements)
  old_values JSON,
  new_values JSON,
  changes JSON,
  
  -- QUAND
  created_at DATETIME NOT NULL,
  
  -- EXTRA
  extra_fields JSON
);

🧪 Testing

import pytest
from audit_recorder import skip_audit, AuditResult

def test_audit_creation():
    @audit(action='CREATE', resource_type='TestModel')
    def create_model(session, id_token):
        return {'id': 1}
    
    # Tester normalement, l'audit se fera
    result = create_model(session=mock_session, id_token=mock_token)
    assert result['id'] == 1

def test_skip_audit():
    @audit(action='UPDATE', resource_type='TestModel')
    def update_model(session, id_token):
        return {'id': 1}
    
    # Désactiver l'audit dans un test
    with skip_audit('UPDATE'):
        result = update_model(session=mock_session, id_token=mock_token)
        # L'audit n'a pas été enregistré

def test_audit_result_skip():
    @audit(action='EXPORT', resource_type='Data')
    def export_data(session, id_token):
        return AuditResult.skip(data=[1, 2, 3], reason="Export is non-auditable")
    
    result = export_data(session=mock_session, id_token=mock_token)
    # L'audit n'a pas été enregistré

🧹 Linting & Formatting

# Format avec ruff
ruff check . --fix
ruff format .

# Tester
pytest

# Builder la librairie
python -m pip install --upgrade build
python -m build

📚 Documentation complète

Modules


📄 Licence

MIT License - voir LICENSE pour les détails.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

audit_recorder-0.4.1.tar.gz (40.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

audit_recorder-0.4.1-py3-none-any.whl (26.6 kB view details)

Uploaded Python 3

File details

Details for the file audit_recorder-0.4.1.tar.gz.

File metadata

  • Download URL: audit_recorder-0.4.1.tar.gz
  • Upload date:
  • Size: 40.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for audit_recorder-0.4.1.tar.gz
Algorithm Hash digest
SHA256 a33e638cd51004a24f7ebe194621a5420c003bef12ecf8ff333f74226d767d65
MD5 a86ee5f503ba353d33f0c0be730c917d
BLAKE2b-256 6eb217029b9783e528c26f899840505185f4d36c1fe7d303cc7a6c1e51c62199

See more details on using hashes here.

File details

Details for the file audit_recorder-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: audit_recorder-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 26.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for audit_recorder-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 75685a554acb0aaacbb4254ded8c51f432e9eba6b370acaa9979f6f8c94e00d9
MD5 efc303e802e86f15e5b6db0c33e34f64
BLAKE2b-256 6c00ead4eff190069bcee3c989f51443726dbeee480999b024bd9b5aef547b22

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page