SQL database management and code generation tool
Project description
🚀 TAI-SQL Framework
TAI-SQL es un framework declarativo para Python que simplifica el trabajo con bases de datos relacionales usando SQLAlchemy. Permite definir esquemas de forma intuitiva y generar automáticamente modelos, CRUDs y diagramas ER.
📦 Instalación
Usando Poetry (Recomendado)
poetry add tai-sql
Usando pip
pip install tai-sql
Dependencias del sistema
Para generar diagramas ER, necesitas instalar Graphviz:
# Ubuntu/Debian
sudo apt install graphviz
# macOS
brew install graphviz
# Windows
# Descargar desde: https://graphviz.org/download/
🗂️ Schema
Un schema es un archivo Python que define la estructura completa de tu base de datos. Es el punto central donde configuras la conexión, defines tus modelos y especificas qué recursos se generarán automáticamente.
📁 Estructura típica de un schema
# schemas/mi_proyecto.py
from __future__ import annotations
from tai_sql import *
from tai_sql.generators import *
# 1️⃣ Configurar conexión a la base de datos
datasource(provider=env('DATABASE_URL'))
# 2️⃣ Configurar generadores
generate(
ModelsGenerator(output_dir='mi_proyecto'),
CRUDGenerator(output_dir='mi_proyecto'),
ERDiagramGenerator(output_dir='mi_proyecto/diagrams')
)
# 3️⃣ Definir modelos (Tablas y Vistas)
class Usuario(Table):
'''Tabla que almacena información de los usuarios del sistema'''
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
nombre: str
pwd: str = column(encrypt=True)
email: str = column(unique=True)
posts: List[Post] # Relación implícita
class Post(Table):
'''Tabla que almacena los posts de los usuarios'''
__tablename__ = "post"
id: int = column(primary_key=True, autoincrement=True)
titulo: str = 'Post title'
contenido: str
timestamp: datetime = column(default=datetime.now)
usuario_id: int
# Relación explícita
usuario: Usuario = relation(
fields=['usuario_id'],
references=['id'],
backref='posts'
)
class UserStats(View):
'''Vista que muestra estadísticas de los usuarios'''
__tablename__ = "user_stats"
__query__ = query('user_stats.sql')
usuario_id: int
nombre_usuario: str
post_count: int
🎯 Concepto clave
El schema actúa como el "blueprint" de tu aplicación:
- Define la estructura de base de datos (tablas, vistas, tipos, etc...)
- Configura la conexión y parámetros
- Especifica qué código se generará automáticamente
- Centraliza toda la configuración en un solo lugar
Una vez definido, el CLI de TAI-SQL usa este schema para:
- Sincronizar la base de datos (
tai-sql push) - Generar modelos SQLAlchemy, CRUDs y diagramas (
tai-sql generate) - Poblar la base de datos con datos iniciales (
tai-sql feed)
🏗️ Elementos del Schema
El esquema es el corazón de TAI-SQL. Define la estructura de tu base de datos y los recursos que se generarán automáticamente.
📊 datasource() - Configuración de la Base de Datos
La función datasource() configura la conexión a tu base de datos:
from tai_sql import datasource, env, connection_string, params
# ✅ Opción 1: Variables de entorno (Recomendado para producción)
datasource(
provider=env('DATABASE_URL') # postgres://user:pass@host:port/dbname
)
# ✅ Opción 2: String de conexión directo (Para desarrollo/testing)
datasource(
provider=connection_string('postgresql://user:password@localhost/mydb')
)
# ✅ Opción 3: Parámetros individuales (Para desarrollo/testing)
datasource(
provider=params(
drivername='postgresql',
username='user',
password='password',
host='localhost',
port=5432,
database='mydb'
)
)
Opciones avanzadas:
datasource(
provider=env('DATABASE_URL'),
secret_key_name='SECRET_KEY', # Variable de entorno para encriptación
pool_size=20, # Tamaño del pool de conexiones
max_overflow=30, # Conexiones adicionales permitidas
pool_timeout=30, # Timeout para obtener conexión
pool_recycle=3600, # Reciclar conexiones cada hora
echo=True # Mostrar consultas SQL en desarrollo
)
🔧 generate() - Configuración de Generadores
La función generate() define qué recursos se generarán automáticamente:
from tai_sql import generate
from tai_sql.generators import ModelsGenerator, CRUDGenerator, ERDiagramGenerator
generate(
# Generar modelos SQLAlchemy
ModelsGenerator(
output_dir='database/database'
),
# Generar CRUDs sincronos
CRUDGenerator(
output_dir='database/database',
mode='sync' # 'sync', 'async', o 'both'
),
# Generar diagramas ER
ERDiagramGenerator(
output_dir='database/diagrams'
)
)
📋 Table - Definición de Tablas
Las tablas son la base de tu modelo de datos:
from __future__ import annotations
from tai_sql import Table, column, relation
from typing import List, Optional
from datetime import date
class Usuario(Table):
'''Tabla que almacena información de los usuarios'''
__tablename__ = "usuario"
# Columnas básicas
id: int = column(primary_key=True, autoincrement=True)
name: str
email: str = column(unique=True)
fecha_alta: date
# Relaciones
posts: List[Post] # Implícita
class Post(Table):
'''Tabla que almacena la información de los posts de los usuarios'''
__tablename__ = "post"
id: int = column(primary_key=True, autoincrement=True)
title: str = 'Post title'
content: str
author_id: int
published: Optional[bool]
# Relación explícita
author: Usuario = relation(
fields=['author_id'],
references=['id'],
backref='posts'
)
📝 Documentación de Tablas
TAI-SQL permite documentar las tablas de dos formas equivalentes para proporcionar contexto y descripción de cada modelo:
# Opción 1: Usando docstring de la clase
class Usuario(Table):
'''Tabla que almacena información de los usuarios del sistema'''
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str
email: str
# Opción 2: Usando el metaparámetro __description__
class Post(Table):
__tablename__ = "post"
__description__ = "Tabla que almacena los posts de los usuarios"
id: int = column(primary_key=True, autoincrement=True)
title: str
content: str
Prioridad
- El uso del metaparámetro description tiene preferencia sobre el docstring de la clase. De esta forma si concurren ambos en una tabla, description tendrá prioridad.
Usos de la documentación:
- 📊 Diagramas ER: Aparece en los diagramas generados por
ERDiagramGenerator
Ambas formas son equivalentes y permiten que los generadores accedan a la descripción de la tabla para crear documentación automática, comentarios en los modelos generados y descripciones en los diagramas ER.
🛠️ Función column() - Configuración de Columnas
La función column() permite configurar las propiedades específicas de las columnas:
def column(
primary_key=False, # Si es clave primaria
unique=False, # Si debe ser único
default=None, # Valor por defecto
server_now=False, # Para usar NOW() del servidor
index=False, # Si debe tener índice
autoincrement=False, # Si es autoincremental
encrypt=False, # Si queremos que se encripte
description='' # Descripción de la columna (para diagramas)
):
Ejemplos de uso:
class Producto(Table):
__tablename__ = "producto"
# Clave primaria autoincremental
id: int = column(primary_key=True, autoincrement=True)
# Campo único
sku: str = column(unique=True)
# Campo con valor por defecto
estado: str = "activo"
# Equivalente a
estado: str = column(default="activo")
# Campo con índice para búsquedas rápidas
categoria: str = column(index=True)
# Campo opcional (nullable automático por tipo Optional)
descripcion: Optional[str]
# Campo obligatorio (nullable=False automático)
nombre: str
# Campo encriptado (necesita una SECRET_KEY)
password: str = column(encrypt=True)
Parámetros detallados:
| Parámetro | Tipo | Descripción | Ejemplo |
|---|---|---|---|
primary_key |
bool |
Define si la columna es clave primaria | column(primary_key=True) |
unique |
bool |
Garantiza valores únicos en la columna | column(unique=True) |
default |
Any |
Valor por defecto para nuevos registros | column(default="activo") |
server_now |
bool |
Usa la función NOW() del servidor de BD | column(server_now=True) |
index |
bool |
Crea un índice en la columna para búsquedas rápidas | column(index=True) |
autoincrement |
bool |
Incrementa automáticamente el valor (solo integers) | column(autoincrement=True) |
encrypt |
bool |
Encripta automáticamente el contenido de la columna | column(encrypt=True) |
description |
str |
Descripción de la columna, usada en diagramas ER | column(description='Nombre del usuario') |
🔗 Función relation() - Definición de Relaciones
La función relation() define relaciones explícitas entre tablas:
def relation(
fields: List[str], # Campos en la tabla actual (foreign keys)
references: List[str], # Campos referenciados en la tabla destino
backref: str, # Nombre de la relación inversa
onDelete='cascade', # Comportamiento al eliminar
onUpdate='cascade' # Comportamiento al actualizar
):
Conceptos importantes:
-
Relaciones Explícitas vs Implícitas:
- Explícita: Se define usando
relation()en la tabla que CONTIENE la foreign key - Implícita: Se declara solo con el tipo en la tabla que NO contiene la foreign key
- Explícita: Se define usando
-
Dónde usar
relation():- SOLO en la tabla que tiene la columna foreign key
- La tabla "origen" muestra la relación como
List[...](implícita)
Ejemplo completo:
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
nombre: str
email: str = column(unique=True)
# Relación IMPLÍCITA - Usuario NO tiene foreign key hacia Post
# Se muestra automáticamente como List por la relación inversa
posts: List[Post] # ← No necesita relation()
class Post(Table):
__tablename__ = "post"
id: int = column(primary_key=True, autoincrement=True)
titulo: str
contenido: str
autor_id: int # ← Esta ES la foreign key
# Relación EXPLÍCITA - Post SÍ tiene foreign key hacia Usuario
autor: Usuario = relation(
fields=['autor_id'], # Campo FK en esta tabla
references=['id'], # Campo PK en tabla destino
backref='posts' # Nombre de relación inversa en Usuario
)
Parámetros de relation():
| Parámetro | Descripción | Ejemplo |
|---|---|---|
fields |
Lista de columnas FK en la tabla actual | ['autor_id'] |
references |
Lista de columnas PK en la tabla destino | ['id'] |
backref |
Nombre de la relación inversa | 'posts' |
onDelete |
Acción al eliminar: 'cascade', 'restrict', 'set null' |
'cascade' |
onUpdate |
Acción al actualizar: 'cascade', 'restrict', 'set null' |
'cascade' |
Regla fundamental:
- ✅ Usa
relation()SOLO en la tabla que tiene la foreign key - ✅ La tabla "origen" automáticamente muestra
List[...]por la relación inversa - ❌ NO uses
relation()en ambos lados de la relación
🔐 Encriptación de Columnas
TAI-SQL soporta encriptación automática de columnas para proteger datos sensibles:
from tai_sql import Table, column, datasource
# Configurar datasource con clave de encriptación
datasource(
provider=env('DATABASE_URL'),
secret_key_name='SECRET_KEY' # Variable de entorno con la clave secreta
)
class Usuario(Table):
__tablename__ = "usuarios"
id: int = column(primary_key=True, autoincrement=True)
email: str = column(unique=True)
nombre: str
# Columnas encriptadas - Los datos se encriptan automáticamente
password: str = column(encrypt=True)
telefono: Optional[str] = column(encrypt=True)
datos_bancarios: Optional[str] = column(encrypt=True)
Configuración requerida:
-
Variable de entorno: Define una clave secreta segura
export SECRET_KEY="tu_clave_secreta_de_al_menos_32_caracteres"
-
Configuración en datasource: Especifica el nombre de la variable
datasource( provider=env('DATABASE_URL'), secret_key_name='SECRET_KEY' # Por defecto es 'SECRET_KEY' )
Características de la encriptación:
- ✅ Automática: Los datos se encriptan al escribir y desencriptan al leer
- ✅ Transparente: El código funciona igual que columnas normales
- ✅ Segura: Usa
cryptography.fernetcon claves de 256 bits - ✅ Validación: Verifica la existencia de la clave secreta antes de generar
Ejemplo de uso:
# El ModelGenerator crea propiedades híbridas automáticamente
user = Usuario(
email="juan@example.com",
nombre="Juan",
password="mi_password_secreto", # Se encripta automáticamente
telefono="123-456-7890" # Se encripta automáticamente
)
# Al leer, se desencripta automáticamente
print(user.password) # "mi_password_secreto" (desencriptado)
print(user.telefono) # "123-456-7890" (desencriptado)
# En la BD se almacena encriptado
print(user._password) # "gAAAAABh..." (encriptado)
Validaciones de seguridad:
- ❗ Clave requerida: Si hay columnas con
encrypt=True, la clave secreta debe existir - ❗ Longitud mínima: La clave debe tener al menos 32 caracteres
- ❗ Solo strings: Solo columnas de tipo string pueden encriptarse
👁️ View - Definición de Vistas
Las vistas permiten crear consultas complejas reutilizables:
from tai_sql import View, query
class UserStats(View):
'''Estadísticas de usuarios y sus posts'''
__tablename__ = "user_stats"
__query__ = query('user_stats.sql') # Archivo SQL en .../views/
# Definir las columnas que retorna la vista
user_id: int
user_name: str
post_count: int
last_post_date: datetime
Archivo SQL correspondiente (.../views/user_stats.sql):
SELECT
u.id AS user_id,
u.name AS user_name,
COUNT(p.id) AS post_count,
MAX(p.created_at) AS last_post_date
FROM usuarios u
LEFT JOIN posts p ON u.id = p.author_id
WHERE u.active = true
GROUP BY u.id, u.name
🔢 Enum - Definición de Enumeraciones
Los enums permiten definir listas de valores predefinidos para ciertas columnas, garantizando integridad de datos:
from tai_sql import Table, column
from enum import Enum
# Definir enum como clase Python estándar
class ContentType(Enum):
TEXT = "text"
IMAGE = "image"
VIDEO = "video"
class Post(Table):
'''Tabla de posts con tipo de contenido controlado'''
__tablename__ = "post"
id: int = column(primary_key=True, autoincrement=True)
title: str
content: str
content_type: ContentType # ← Usar enum como tipo de columna
timestamp: datetime = column(server_now=True)
Características de los Enums:
- ✅ Auto-registro: Los enums se registran automáticamente al definirlos
- ✅ Validación automática: Solo acepta valores definidos en el enum
- ✅ Integración CRUD: El CRUD generado expone los valores disponibles
- ✅ Soporte en DTOs: Los Pydantic DTOs incluyen validación de enum
- ✅ Type hints: Autocompletado completo en tu IDE
Ejemplo con múltiples enums:
class Status(Enum):
DRAFT = "draft"
PUBLISHED = "published"
ARCHIVED = "archived"
class Priority(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class Task(Table):
__tablename__ = "tasks"
id: int = column(primary_key=True, autoincrement=True)
title: str
status: Status = Status.DRAFT # ← Con valor por defecto
priority: Priority
created_at: datetime = column(server_now=True)
Ventajas de usar Enums:
- 🛡️ Integridad de datos: Previene valores inválidos en la BD
- 📝 Documentación clara: Los valores posibles están definidos en el código
- 🔄 Refactoring seguro: Cambios de enum se propagan automáticamente
- 🚀 Performance: Validación rápida sin consultas a BD
- 🎯 Type safety: Detección de errores en tiempo de desarrollo
🔔 Triggers - Lógica de Negocio en Modelos
Los triggers permiten definir lógica de negocio que se ejecuta automáticamente cuando ocurren operaciones CRUD en una tabla. Se definen directamente en el modelo usando decoradores y se inyectan como código inline en los DAOs generados — no hay overhead de runtime, no hay clases intermedias.
TAI-SQL analiza el código fuente del trigger mediante AST (Abstract Syntax Tree) y lo transforma en código Python explícito que se integra directamente en los métodos create, update y delete del DAO generado.
Decoradores disponibles
| Decorador | Evento | Parámetros |
|---|---|---|
@on_create |
Inserción de registro | timing, priority, when |
@on_update |
Actualización de registro | timing, priority, fields, when |
@on_delete |
Eliminación de registro | timing, priority, when |
Parámetros comunes
| Parámetro | Tipo | Descripción | Default |
|---|---|---|---|
timing |
str |
'before' o 'after' — cuándo se ejecuta respecto a la operación |
'before' |
priority |
int |
Orden de ejecución cuando hay múltiples triggers (menor = primero) | 1 |
fields |
List[str] |
Solo en @on_update: lista de campos que activan el trigger |
None (todos) |
when |
lambda |
Condición opcional: si retorna False, el trigger no se ejecuta |
None |
Referencia de self y TriggerAPI
Dentro de un trigger, self se refiere al modelo actual (la tabla donde se define el trigger). El parámetro t: TriggerAPI proporciona métodos para interactuar con otros modelos y utilidades.
Acceso a datos del modelo actual con self:
class Post(Table):
__tablename__ = "post"
id: bigint = column(primary_key=True, autoincrement=True)
content: str
author_id: int
timestamp: datetime = column(default=datetime.now)
@on_create(timing='after')
def update_author(self, t: TriggerAPI):
# self.author_id → accede al campo author_id del Post que se está creando
# self.timestamp → accede al campo timestamp del Post que se está creando
t.update(Usuario, self.author_id, last_post_date=self.timestamp)
Mapeo interno de self según evento:
| Evento | self.campo se mapea a |
Contexto |
|---|---|---|
create |
instance.campo |
instance es el objeto SQLAlchemy recién creado |
update |
record.campo |
record es el registro cargado de la BD con los nuevos valores aplicados |
delete |
record.campo |
record es el registro antes de ser eliminado |
📗 Ejemplo 1: Trigger BEFORE CREATE — Normalización de datos
Modifica los datos antes de que sean insertados en la base de datos.
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str
email: str
@on_create(timing='before')
def normalize_email(self, t: TriggerAPI):
"""Normaliza el email a minúsculas antes de insertar"""
t.data['email'] = t.data['email'].lower().strip()
Código generado en el DAO:
def create(self, usuario: UsuarioCreate, session=None) -> UsuarioRead:
instance = usuario.to_instance()
def _execute_create(session):
# 🔔 Trigger: normalize_email — Normaliza el email a minúsculas antes de insertar
instance.email = instance.email.lower().strip()
session.add(instance)
session.flush()
...
Nota:
t.data['email']se transforma automáticamente eninstance.emailporque en el contexto decreate,t.datase mapea a la instancia del modelo SQLAlchemy.
📗 Ejemplo 2: Trigger AFTER CREATE — Operación cross-table
Ejecuta lógica después de insertar un registro. Ideal para actualizar tablas relacionadas.
class Post(Table):
__tablename__ = "post"
id: bigint = column(primary_key=True, autoincrement=True)
content: str
timestamp: datetime = column(default=datetime.now)
author_id: int
author: Usuario = relation(fields=['author_id'], references=['id'], backref='posts')
@on_create(timing='after')
def modify_user_last_post_date(self, t: TriggerAPI):
"""Actualiza last_post_date del usuario al crear un nuevo post"""
t.update(Usuario, self.author_id, last_post_date=self.timestamp)
Código generado en el DAO:
def create(self, post: PostCreate, session=None) -> PostRead:
instance = post.to_instance()
def _execute_create(session):
session.add(instance)
session.flush()
# 🔔 Trigger: modify_user_last_post_date — Actualiza last_post_date del usuario al crear un nuevo post
UsuarioSyncDAO(self.session_manager).update(
instance.author_id,
updated_values=UsuarioUpdateValues(last_post_date=instance.timestamp),
session=session
)
...
Nota:
t.update(Model, pk, **values)genera una llamada completa al DAO del modelo destino, reutilizando la misma sesión para mantener la transaccionalidad.
📗 Ejemplo 3: Trigger BEFORE UPDATE — Validación y campos específicos
Ejecuta lógica antes de actualizar. El parámetro fields limita la activación a ciertos campos.
class Producto(Table):
__tablename__ = "producto"
id: int = column(primary_key=True, autoincrement=True)
nombre: str
precio: float
email_contacto: str
@on_update(timing='before', fields=['email_contacto'])
def normalize_email_on_update(self, t: TriggerAPI):
"""Normaliza el email de contacto al actualizarlo"""
t.data['email_contacto'] = t.data['email_contacto'].lower().strip()
@on_update(timing='before', fields=['precio'])
def validate_precio(self, t: TriggerAPI):
"""No permite precios negativos"""
if t.data['precio'] < 0:
t.abort("El precio no puede ser negativo")
Código generado en el DAO:
def update(self, id, updated_values, session=None) -> int:
update_data = updated_values.to_dict()
...
def _execute_update(session):
record = ... # Carga el registro de la BD
for key, value in update_data.items():
setattr(record, key, value)
# 🔔 Trigger: normalize_email_on_update
if any(field in update_data for field in ['email_contacto']):
record.email_contacto = record.email_contacto.lower().strip()
# 🔔 Trigger: validate_precio
if any(field in update_data for field in ['precio']):
if record.precio < 0:
raise ValueError('Trigger abortado: El precio no puede ser negativo')
session.flush()
...
Nota: Cuando se usa
fields, el trigger solo se activa si alguno de esos campos está presente enupdate_data. Esto evita ejecuciones innecesarias.
📗 Ejemplo 4: Trigger con condición when
La condición when es una lambda que recibe el TriggerAPI y retorna un booleano. Si retorna False, el trigger no se ejecuta.
class Pedido(Table):
__tablename__ = "pedido"
id: int = column(primary_key=True, autoincrement=True)
estado: str
total: float
@on_update(
timing='after',
fields=['estado'],
when=lambda t: t.old['estado'] != t.new['estado']
)
def log_cambio_estado(self, t: TriggerAPI):
"""Registra cambios de estado en el log"""
t.log(f"Estado cambiado de {t.old['estado']} a {t.new['estado']}")
Código generado en el DAO:
# 🔔 Trigger: log_cambio_estado — Registra cambios de estado en el log
if any(field in update_data for field in ['estado']):
if old_values['estado'] != record.estado:
logger.info("🔔 Trigger: " + f"Estado cambiado de {old_values['estado']} a {record.estado}")
Nota:
t.oldse mapea aold_values(dict con valores pre-update) yt.newse mapea arecord(instancia con los nuevos valores ya aplicados).
📗 Ejemplo 5: Trigger BEFORE DELETE — Protección y limpieza
Ejecuta lógica antes de eliminar un registro.
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str
is_admin: bool
@on_delete(timing='before')
def prevent_admin_deletion(self, t: TriggerAPI):
"""No permite eliminar administradores"""
if self.is_admin:
t.abort(f"No se puede eliminar al administrador {self.name}")
@on_delete(timing='before', priority=2)
def log_deletion(self, t: TriggerAPI):
"""Registra la eliminación en el log"""
t.log(f"Eliminando usuario: {self.name} (id={self.id})")
Código generado en el DAO:
def delete(self, id, session=None) -> int:
def _execute_delete(session):
record = ... # Carga el registro
old_values = {col.name: getattr(record, col.name) for col in Usuario.__table__.columns}
# 🔔 Trigger: prevent_admin_deletion (priority=1)
if record.is_admin:
raise ValueError('Trigger abortado: No se puede eliminar al administrador ' + str(record.name))
# 🔔 Trigger: log_deletion (priority=2)
logger.info("🔔 Trigger: " + f"Eliminando usuario: {record.name} (id={record.id})")
# Ejecutar DELETE
del_query = delete(Usuario).where(Usuario.id == id)
result = session.execute(del_query)
...
📗 Ejemplo 6: Trigger cross-table CREATE — Crear registros en otra tabla
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str
email: str
@on_create(timing='after')
def create_default_profile(self, t: TriggerAPI):
"""Crea un perfil por defecto para el nuevo usuario"""
t.create(Profile, user_id=self.id, bio=f"Hola, soy {self.name}")
Código generado:
# 🔔 Trigger: create_default_profile
ProfileSyncDAO(self.session_manager).create(
ProfileCreate(user_id=instance.id, bio='Hola, soy ' + str(instance.name)),
session=session
)
📗 Ejemplo 7: Trigger cross-table DELETE — Eliminar registros en otra tabla
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str
@on_delete(timing='before')
def cleanup_sessions(self, t: TriggerAPI):
"""Elimina todas las sesiones activas del usuario antes de borrarlo"""
t.delete_many(Session, user_id=self.id)
Código generado:
# 🔔 Trigger: cleanup_sessions
SessionSyncDAO(self.session_manager).delete_many(
filters_list=[{'user_id': record.id}],
session=session
)
📗 Ejemplo 8: Trigger cross-table UPDATE_MANY — Actualización masiva
class Categoria(Table):
__tablename__ = "categoria"
id: int = column(primary_key=True, autoincrement=True)
nombre: str
activa: bool
@on_update(timing='after', fields=['activa'])
def cascade_disable_products(self, t: TriggerAPI):
"""Desactiva todos los productos cuando se desactiva la categoría"""
t.update_many(Producto, filters={'categoria_id': self.id}, activa=self.activa)
Código generado:
# 🔔 Trigger: cascade_disable_products
if any(field in update_data for field in ['activa']):
ProductoSyncDAO(self.session_manager).update_many(
payload=ProductoUpdate(
filter=ProductoUpdateFilter(categoria_id=record.id),
values=ProductoUpdateValues(activa=record.activa)
),
session=session
)
📗 Ejemplo 9: Múltiples triggers con prioridad
Cuando una tabla tiene varios triggers para el mismo evento, priority controla el orden de ejecución.
class Pedido(Table):
__tablename__ = "pedido"
id: int = column(primary_key=True, autoincrement=True)
total: float
estado: str
cliente_id: int
@on_create(timing='before', priority=1)
def validate_total(self, t: TriggerAPI):
"""Valida que el total sea positivo"""
if t.data['total'] <= 0:
t.abort("El total del pedido debe ser mayor a 0")
@on_create(timing='before', priority=2)
def set_initial_status(self, t: TriggerAPI):
"""Establece el estado inicial"""
t.data['estado'] = 'pendiente'
@on_create(timing='after', priority=1)
def update_client_stats(self, t: TriggerAPI):
"""Actualiza las estadísticas del cliente"""
t.update(Cliente, self.cliente_id, ultimo_pedido=self.timestamp)
@on_create(timing='after', priority=2)
def notify_warehouse(self, t: TriggerAPI):
"""Registra la notificación al almacén"""
t.log(f"Nuevo pedido #{self.id} para preparación")
Orden de ejecución:
validate_total(before, priority=1)set_initial_status(before, priority=2)- INSERT en BD
update_client_stats(after, priority=1)notify_warehouse(after, priority=2)
📗 Ejemplo 10: Utilidades t.log() y t.abort()
class Transferencia(Table):
__tablename__ = "transferencia"
id: int = column(primary_key=True, autoincrement=True)
monto: float
cuenta_origen_id: int
cuenta_destino_id: int
@on_create(timing='before')
def validate_transfer(self, t: TriggerAPI):
"""Valida la transferencia antes de crearla"""
if t.data['monto'] <= 0:
t.abort("El monto debe ser positivo")
if t.data['cuenta_origen_id'] == t.data['cuenta_destino_id']:
t.abort("No se puede transferir a la misma cuenta")
t.log(f"Transferencia validada: ${t.data['monto']}")
t.log("Procesando transferencia bancaria", level='warning')
Transformación de utilidades:
| Código del trigger | Código generado |
|---|---|
t.log("mensaje") |
logger.info("🔔 Trigger: mensaje") |
t.log("mensaje", level='warning') |
logger.warning("🔔 Trigger: mensaje") |
t.log("mensaje", level='error') |
logger.error("🔔 Trigger: mensaje") |
t.abort("razón") |
raise ValueError("Trigger abortado: razón") |
Comportamiento en operaciones masivas (*_many)
Cuando un modelo tiene triggers, los métodos create_many, update_many y delete_many delegan automáticamente a su método unitario (create, update, delete) para que cada registro ejecute sus triggers individualmente. Todo ocurre en una misma sesión transaccional.
Cuando un modelo no tiene triggers para ese evento, los métodos masivos mantienen su implementación bulk optimizada (ej: session.add_all(), UPDATE ... WHERE ...).
# Post tiene un trigger @on_create → create_many delega a create
posts = [PostCreate(content="Post 1", author_id=1), PostCreate(content="Post 2", author_id=1)]
public_api.post.create_many(posts) # Cada post ejecuta modify_user_last_post_date
# Comment NO tiene triggers → create_many usa bulk insert
comments = [CommentCreate(content="A", post_id=1), CommentCreate(content="B", post_id=1)]
public_api.comment.create_many(comments) # Bulk insert sin overhead
Esta decisión se toma en tiempo de generación, no en runtime — no hay ningún overhead de comprobación para modelos sin triggers.
Referencia completa de la API de operaciones cross-table
| Método del trigger | Parámetros | Código generado |
|---|---|---|
t.update(Model, pk, **values) |
Modelo, primary key, valores | {Model}SyncDAO(...).update(pk, updated_values={Model}UpdateValues(**values), session=session) |
t.create(Model, **values) |
Modelo, valores | {Model}SyncDAO(...).create({Model}Create(**values), session=session) |
t.delete(Model, pk) |
Modelo, primary key | {Model}SyncDAO(...).delete(pk, session=session) |
t.update_many(Model, filters={...}, **values) |
Modelo, filtros, valores | {Model}SyncDAO(...).update_many(payload={Model}Update(filter=..., values=...), session=session) |
t.delete_many(Model, **filters) |
Modelo, filtros | {Model}SyncDAO(...).delete_many(filters_list=[{...}], session=session) |
Todas las operaciones cross-table comparten la misma sesión de la transacción principal, garantizando atomicidad. Si cualquier trigger falla o ejecuta
t.abort(), toda la transacción se revierte.
Resumen de características
- ✅ Cero overhead: Los triggers se inyectan como código inline en los DAOs — no hay introspección ni dispatch en runtime
- ✅ Transaccionalidad: Todas las operaciones (incluyendo cross-table) comparten la misma sesión
- ✅ Type-safe:
TriggerAPIproporciona autocompletado e intellisense en tu IDE - ✅ Condicional: Soporte para
when(lambda) yfields(lista de campos) para activación selectiva - ✅ Cross-table: Operaciones CRUD completas sobre otros modelos desde dentro de un trigger
- ✅ Priorizable: Múltiples triggers en el mismo evento se ordenan por
priority - ✅ Sync/Async: Los triggers generan código correcto para ambos modos (
SyncDAO/AsyncDAO) - ✅ Bulk-aware:
create_many,update_manyydelete_manydelegan automáticamente cuando hay triggers
🌱 feed() - Datos Iniciales (Seeding)
El método feed() permite definir datos iniciales directamente en los modelos del schema. TAI-SQL se encarga de resolver dependencias entre tablas, inyectar foreign keys y ejecutar upserts idempotentes.
Definición básica
Define un método feed() en cualquier Table que retorne una lista de instancias de esa misma tabla:
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str = column(unique=True)
email: str
posts: List[Post]
def feed(self) -> List[Usuario]:
"""Datos iniciales para poblar la tabla"""
return [
Usuario(name="John Doe", email="john@example.com"),
Usuario(name="Jane Smith", email="jane@example.com"),
]
Reglas:
feed()debe retornarList[MismaClase]— se valida en tiempo de ejecución- Los campos
autoincrementse omiten automáticamente (los genera la BD) - Los valores
defaultdel schema se aplican a columnas NOT NULL faltantes - Si la tabla ya tiene datos, se ejecuta un upsert (INSERT ... ON CONFLICT DO UPDATE)
Datos anidados en cascada
Puedes anidar instancias de tablas relacionadas. TAI-SQL resuelve automáticamente las foreign keys tras insertar el registro padre:
class Usuario(Table):
__tablename__ = "usuario"
id: int = column(primary_key=True, autoincrement=True)
name: str = column(unique=True)
pwd: str = column(encrypt=True)
email: Optional[str]
posts: List[Post]
def feed(self) -> List[Usuario]:
return [
Usuario(
name="John Doe",
pwd="hashed_password",
email="john@example.com",
posts=[
Post(title="First Post", content="Hello World!", comments=[
Comment(content="Great post!"),
Comment(content="Thanks for sharing"),
]),
Post(title="Second Post", content="Another day"),
],
),
Usuario(
name="Jane Smith",
pwd="hashed_password",
email="jane@example.com",
posts=[
Post(title="Jane's Post", content="My first post"),
],
),
]
Cómo funciona internamente:
- Orden topológico: Las tablas se procesan por dependencias FK (padres antes que hijos)
- Extracción: Separa columnas directas de relaciones anidadas
- FK placeholders: Los hijos reciben marcadores
__fk_ref__que apuntan al índice del padre - Resolución: Al ejecutar, los placeholders se sustituyen por los PKs reales generados por la BD
- Upsert: Cada registro se inserta con
ON CONFLICT DO UPDATE, garantizando idempotencia
Soporte multi-motor
| Motor | Estrategia de upsert |
|---|---|
| PostgreSQL | INSERT ... ON CONFLICT DO UPDATE con RETURNING |
| MySQL | INSERT ... ON DUPLICATE KEY UPDATE |
| SQL Server | MERGE ... WHEN MATCHED / NOT MATCHED |
| Otros | INSERT simple (fallback) |
Características
- ✅ Idempotente: Ejecutar
feedmúltiples veces produce el mismo resultado (upsert) - ✅ Cascada: Relaciones anidadas a cualquier nivel de profundidad
- ✅ FK automáticas: No necesitas conocer los IDs — se resuelven tras insertar el padre
- ✅ Defaults: Columnas NOT NULL con
defaulten el schema se rellenan automáticamente - ✅ Drift check: Antes de ejecutar, verifica que no haya cambios pendientes de
push - ✅ Orden topológico: Nunca intenta insertar un hijo antes que su padre
- ✅ Validación: Comprueba que
feed()retorne el tipo correcto (List[MismaClase])
🎯 Generadores Incluidos
📝 ModelsGenerator
Genera modelos SQLAlchemy estándar desde tus definiciones de Table y View.
ModelsGenerator(
output_dir='...' # Directorio donde se generarán los modelos
)
🔄 CRUDGenerator
Genera clases CRUD completas con operaciones Create, Read, Update, Delete optimizadas.
CRUDGenerator(
output_dir='...',
models_import_path='...',
mode='sync' # 'sync', 'async', o 'both'
)
Estructura generada:
.../<schema_name>/crud/
├── syn/ # Si mode='sync' o 'both'
│ ├── __init__.py # API unificada (public_api)
│ ├── session_manager.py # Gestor de sesiones síncronas
│ ├── daos.py # Data Access Objects por tabla
│ ├── dtos.py # Data Transfer Objects (Pydantic)
│ └── utils.py # Utilidades y decoradores
└── asyn/ # Si mode='async' o 'both'
├── __init__.py # API unificada (public_api)
├── session_manager.py # Gestor de sesiones asíncronas
├── daos.py # Data Access Objects por tabla
├── dtos.py # Data Transfer Objects (Pydantic)
└── utils.py # Utilidades y decoradores
Arquitectura del CRUD generado:
El sistema genera una arquitectura por capas completa:
- 📋 DTOs (Data Transfer Objects): Objetos Pydantic para validación y serialización
- 🗃️ DAOs (Data Access Objects): Clases especializadas por tabla con métodos CRUD
- 🔗 API Unificada: Objeto
public_apique centraliza el acceso a todos los DAOs - ⚙️ Gestión de sesiones: SessionManager para manejo automático de transacciones
Ejemplo de uso del CRUD generado:
from database.public.crud.syn import public_api
# ===== 🔍 OPERACIONES DE LECTURA =====
# Buscar un usuario por ID
user = public_api.usuario.find(id=1)
# Retorna: UsuarioRead | None
# Buscar múltiples usuarios con filtros
users = public_api.usuario.find_many(
limit=10,
offset=0,
name="Juan",
email="juan@example.com"
)
# Retorna: List[UsuarioRead]
# Buscar con relaciones incluidas
user = public_api.usuario.find(
id=1,
includes=['posts', 'posts.comments'] # ← Carga optimizada
)
# user.posts estará poblado automáticamente
# Contar registros con filtros
total_users = public_api.usuario.count(name="Juan")
# Retorna: int
# Verificar existencia
exists = public_api.usuario.exists(email="juan@example.com")
# Retorna: bool
# ===== 🆕 OPERACIONES DE CREACIÓN =====
# Crear usuario usando DTO
from database.public.crud.syn import UsuarioCreate
user_data = UsuarioCreate(
name="Ana García",
pwd="password123",
email="ana@example.com"
)
user = public_api.usuario.create(user_data)
# Retorna: UsuarioRead
# Crear múltiples usuarios
users_data = [
UsuarioCreate(name="Pedro", pwd="pass1", email="pedro@example.com"),
UsuarioCreate(name="María", pwd="pass2", email="maria@example.com"),
]
count = public_api.usuario.create_many(users_data)
# Retorna: int (número de registros creados)
# ===== 🔄 OPERACIONES DE ACTUALIZACIÓN =====
# Actualizar usuario específico
from database.public.crud.syn import UsuarioUpdateValues
updated_count = public_api.usuario.update(
id=1, # Filtro por ID
updated_values=UsuarioUpdateValues(
name="Juan Carlos",
email="juancarlos@example.com"
)
)
# Retorna: int (número de registros actualizados)
# Actualización masiva con filtros
from database.public.crud.syn import UsuarioUpdate, UsuarioFilter
result = public_api.usuario.update_many(
payload=UsuarioUpdate(
filter=UsuarioFilter(name="Juan"),
values=UsuarioUpdateValues(name="Juan Actualizado")
)
)
# Retorna: int
# Upsert (crear o actualizar)
user = public_api.usuario.upsert(
usuario=UsuarioCreate(
name="Carlos",
pwd="password",
email="carlos@example.com"
),
match_fields=['email'] # Campo para verificar existencia
)
# Retorna: UsuarioRead
# ===== 🗑️ OPERACIONES DE ELIMINACIÓN =====
# Eliminar por ID
deleted_count = public_api.usuario.delete(id=1)
# Retorna: int
# Eliminación masiva con filtros
filters_list = [
{"name": "Usuario1"},
{"email": "obsoleto@example.com"}
]
deleted_count = public_api.usuario.delete_many(filters_list)
# Retorna: int
# ===== 📊 INTEGRACIÓN CON PANDAS =====
# Exportar a DataFrame
users_df = public_api.usuario.as_dataframe(
limit=1000,
name="Juan" # ← Con filtros opcionales
)
# Retorna: pandas.DataFrame con optimización automática de tipos
# Importar desde DataFrame
import pandas as pd
new_users_df = pd.DataFrame({
'name': ['Luis', 'Carmen', 'Roberto'],
'pwd': ['pass1', 'pass2', 'pass3'],
'email': ['luis@test.com', 'carmen@test.com', 'roberto@test.com']
})
inserted_count = public_api.usuario.from_dataframe(
df=new_users_df,
validate_types=True, # Validar tipos automáticamente
ignore_extra_columns=True, # Ignorar columnas no definidas
fill_missing_nullable=True # Llenar campos opcionales con None
)
# Retorna: int (registros insertados)
# ===== 🔢 TRABAJAR CON ENUMS =====
# Obtener valores disponibles del enum
content_types = public_api.content_type.find_many()
# Retorna: ['text', 'image', 'video']
# Usar en creación con validación
post = public_api.post.create(PostCreate(
title="Mi post",
content="Contenido",
content_type="text" # ← Validado automáticamente
))
# ===== 🏗️ GESTIÓN DE SESIONES TRANSACCIONALES =====
# Operaciones transaccionales (múltiples operaciones en una transacción)
with public_api.session_manager.get_session() as session:
# Crear usuario
user = public_api.usuario.create(
UsuarioCreate(name="Transaccional", pwd="test", email="trans@test.com"),
session=session
)
# Crear post asociado
post = public_api.post.create(
PostCreate(title="Post", content="Contenido", author_id=user.id),
session=session
)
# Si cualquier operación falla, toda la transacción se revierte
🎯 Características avanzadas del CRUD:
- ✅ Type Safety completo: Todos los métodos tienen type hints precisos
- 🔄 Carga optimizada de relaciones: Soporte para
includesconjoinedload/selectinload - 📊 Integración nativa con Pandas: Exportación/importación optimizada
- 🛡️ Validación automática: DTOs Pydantic validan datos antes de BD
- ⚡ Gestión de sesiones: Automática o manual según necesidad
- 🔍 Logging integrado: Todas las operaciones quedan registradas
- 🎭 Manejo de errores: Decorador
@error_handlercon rollback automático
📋 DTOs Generados por tabla:
Para cada tabla se generan los siguientes DTOs Pydantic:
# Lectura (datos que vienen de la BD)
UsuarioRead: BaseModel # Con relaciones opcionales
# Creación (datos para nuevos registros)
UsuarioCreate: BaseModel # Sin campos autogenerados
# Filtros (para operaciones de búsqueda)
UsuarioFilter: BaseModel # Todos los campos opcionales
# Actualización de valores
UsuarioUpdateValues: BaseModel # Campos a modificar
# Actualización completa (filtros + valores)
UsuarioUpdate: BaseModel # Combina filter + values
# Validador de DataFrame
UsuarioDataFrameValidator # Para operaciones con Pandas
🏗️ DAOs Generados por tabla:
Cada tabla genera una clase DAO especializada:
class UsuarioSyncDAO:
"""DAO con documentación completa de todos los métodos"""
def __init__(self, session_manager: SyncSessionManager)
# Métodos de lectura
def find(self, id: int, includes: Optional[List[str]] = None, session: Optional[Session] = None) -> Optional[UsuarioRead]
def find_many(self, limit: Optional[int] = None, offset: Optional[int] = None, **filters, session: Optional[Session] = None) -> List[UsuarioRead]
# Métodos de escritura
def create(self, usuario: UsuarioCreate, session: Optional[Session] = None) -> UsuarioRead
def create_many(self, records: List[UsuarioCreate], session: Optional[Session] = None) -> int
def update(self, id: int, updated_values: UsuarioUpdateValues, session: Optional[Session] = None) -> int
def update_many(self, payload: UsuarioUpdate, session: Optional[Session] = None) -> int
def upsert(self, usuario: UsuarioCreate, match_fields: List[str], session: Optional[Session] = None) -> UsuarioRead
def upsert_many(self, records: List[UsuarioCreate], match_fields: List[str], session: Optional[Session] = None) -> int
def delete(self, id: int, session: Optional[Session] = None) -> int
def delete_many(self, filters_list: List[Dict[str, Any]], session: Optional[Session] = None) -> int
# Métodos de utilidad
def count(self, **filters, session: Optional[Session] = None) -> int
def exists(self, **filters, session: Optional[Session] = None) -> bool
# Integración Pandas
def as_dataframe(self, **filters) -> DataFrame
def from_dataframe(self, df: DataFrame, validate_types: bool = False, ignore_extra_columns: bool = False, fill_missing_nullable: bool = True) -> int
👁️ DAOs para Vistas (Solo lectura):
Las vistas generan DAOs con operaciones de solo lectura:
# Acceso a vista UserStats
stats = public_api.user_stats.find_many(
limit=10,
user_id=1, # Filtros específicos de la vista
min_post_count=5,
max_post_count=100
)
# Retorna: List[UserStatsRead]
# Exportar vista a DataFrame
stats_df = public_api.user_stats.as_dataframe(
min_post_count=10 # Con filtros opcionales
)
# Retorna: pandas.DataFrame optimizado
# Las vistas NO tienen métodos de escritura (create, update, delete)
# Solo: find_many, count, exists, as_dataframe
🔗 Acceso unificado con public_api:
El objeto public_api es un singleton que centraliza el acceso:
from database.public.crud.syn import public_api
# ✅ Acceso a tablas (CRUD completo)
public_api.usuario # UsuarioSyncDAO - Operaciones completas
public_api.post # PostSyncDAO - Operaciones completas
public_api.comment # CommentSyncDAO - Operaciones completas
# 👁️ Acceso a vistas (Solo lectura)
public_api.user_stats # UserStatsSyncDAO - Solo lectura
# 🔢 Acceso a enums (Solo valores)
public_api.content_type # EnumModel - Lista de valores
# ⚙️ Gestor de sesiones compartido
public_api.session_manager # SyncSessionManager para transacciones
📊 ERDiagramGenerator
Genera diagramas Entity-Relationship profesionales usando Graphviz.
ERDiagramGenerator(
output_dir='docs/diagrams',
format='png', # 'png', 'svg', 'pdf', 'dot'
include_views=True, # Incluir vistas en el diagrama
include_columns=True, # Mostrar detalles de columnas
include_relationships=True, # Mostrar relaciones
dpi=300 # Resolución para formatos bitmap
)
Características del diagrama:
- 🔑 Primary Keys: Marcadas con icono de llave
- 🔗 Foreign Keys: Marcadas con icono de enlace
- ⭐ Unique: Columnas únicas marcadas
- ❗ Not Null: Columnas obligatorias marcadas
- ⬆️ Auto Increment: Columnas auto-incrementales marcadas
- 👁️ Views: Diferenciadas visualmente de las tablas
🖥️ Comandos CLI
tai-sql init - Inicializar Proyecto
Crea un nuevo proyecto TAI-SQL con la estructura completa:
# Crear proyecto básico
tai-sql init
# Crear proyecto con nombre personalizado
tai-sql init --name mi-proyecto --schema-name mi-esquema
# Estructura generada:
mi-proyecto/
├── pyproject.toml
├── README.md
├── mi_proyecto/ # CRUD/Models Folder
├── schemas/
│ └── mi_esquema.py # Schema principal
├── views/
│ └── mi_esquema/
│ └── user_stats.sql # Vista de ejemplo
└── diagrams/
└── mi_esquema.png # ERD Diagram
Opciones:
--name, -n: Nombre del proyecto (default:database)--schema, -s: Nombre del primer schema (default:public)
tai-sql new-schema - Crear Nuevo Schema
Agrega un nuevo schema a un proyecto existente:
# Crear nuevo schema en proyecto existente
tai-sql new-schema productos
# Con proyecto personalizado
tai-sql new-schema --project mi-empresa productos
Características:
- ✅ Detecta automáticamente el proyecto TAI-SQL actual
- ✅ Crea archivo de schema con plantilla completa
- ✅ Crea directorio de vistas correspondiente
- ✅ Actualiza configuración del proyecto si es necesario
tai-sql set-default-schema - Establecer Schema por Defecto
Configura qué schema se usará por defecto en los comandos:
# Establecer schema por defecto
tai-sql set-default-schema productos
# Si el schema no existe, muestra opciones disponibles:
# ❌ El schema 'nonexistent' no existe en el proyecto
#
# 📄 Schemas disponibles:
# ✅ public (actual por defecto)
# productos
# ventas
tai-sql info - Información del Proyecto
Muestra información completa del proyecto actual:
tai-sql info
Información mostrada:
📁 Información del proyecto:
Nombre: mi-proyecto
Directorio: /path/to/mi-proyecto
Schema por defecto: productos
📄 Schemas disponibles:
• public
• productos (✅ default, 📌 current)
• ventas
└─ Estado: Cargado
🔧 Comandos disponibles:
tai-sql generate # Usa schema por defecto
tai-sql push # Usa schema por defecto
tai-sql feed # Poblar BD con datos iniciales
tai-sql set-default-schema <nombre> # Cambiar default
tai-sql generate - Generar Recursos
Ejecuta todos los generadores configurados en el schema:
# Generar usando schema por defecto
tai-sql generate
# Generar usando schema específico
tai-sql generate --schema productos
# Generar para todos los schemas del proyecto
tai-sql generate --all
Opciones:
--schema, -s: Schema específico a procesar--all: Procesar todos los schemas del proyecto
Proceso de generación:
- ✅ Carga y valida el schema
- 🔍 Descubre modelos (tablas y vistas)
- 🏗️ Ejecuta generadores configurados
- 📊 Muestra resumen de archivos generados
tai-sql push - Sincronizar con Base de Datos
Aplica los cambios del schema a la base de datos:
# Push básico
tai-sql push
# Con opciones avanzadas
tai-sql push --schema public --force --verbose
# Dry run (mostrar cambios sin aplicar)
tai-sql push --dry-run
Opciones disponibles:
--schema, -s: Schema específico a procesar--force, -f: Aplicar cambios sin confirmación--dry-run, -d: Mostrar DDL sin ejecutar--verbose, -v: Mostrar información detallada--no-generate: No ejecutar generadores tras el push
Proceso de push:
- � Verifica conectividad con el servidor
- 🗂️ Crea base de datos y schema si no existen
- 🔍 Detecta drift (diferencias entre schema declarado y BD actual)
- 📋 Genera sentencias DDL necesarias
- ⚠️ Valida seguridad — bloquea operaciones peligrosas (ej: DROP TABLE con datos)
- ✅ Aplica cambios tras confirmación del usuario
- 🚀 Ejecuta generadores automáticamente (salvo
--no-generate)
Ejemplo de salida:
🚀 Push schema: database/schemas/main.py
📋 Resumen de cambios:
🆕 2 tabla(s) nueva(s): usuarios, posts
➕ 3 columna(s) a añadir en 1 tabla(s)
🆕 1 vista(s) nueva(s): user_stats
¿Deseas ejecutar estas sentencias en la base de datos? [y/N]: y
✅ Esquema sincronizado exitosamente
🚀 Ejecutando generadores...
✅ ModelsGenerator completado
✅ CRUDGenerator completado
✅ ERDiagramGenerator completado
tai-sql ping - Verificar Conectividad
Verifica la conectividad con el servidor de base de datos:
# Verificación básica (ping al host)
tai-sql ping
# Verificación con schema específico
tai-sql ping --schema productos
# Verificación completa (incluye ping ICMP, TCP y BD)
tai-sql ping --full
# Verificar también existencia de la base de datos
tai-sql ping --check-db
# Modo silencioso (solo resultado final)
tai-sql ping --quiet
Opciones:
--schema, -s: Schema específico para conectividad--timeout, -t: Timeout en segundos (default: 5)--check-db, -d: Verificar si la base de datos específica existe--full, -f: Verificación completa (ICMP + TCP + BD)--quiet, -q: Modo silencioso, solo resultado final
Tipos de verificación:
- Básica (default): Solo ping al host
- Full (
--full): Ping ICMP + conectividad TCP + conexión BD - Con BD (
--check-db): Incluye verificación de existencia de BD
Ejemplo de salida:
🔧 Información de conexión:
Motor: postgresql
Host: localhost
Puerto: 5432
Base de datos: mi_proyecto
Usuario: postgres
🏓 Verificación BASIC
✅ Host accesible
🗄️ Verificando existencia de la base de datos...
✅ La base de datos existe
🎉 Verificación de conectividad completada exitosamente
Gestión Automática de Schemas
Resolución automática del schema:
- Si no especificas
--schema, los comandos usan automáticamente el schema por defecto - Si no hay schema por defecto configurado, el comando te guía para establecer uno
- Todos los comandos muestran qué schema están usando
Mensajes de ayuda inteligentes:
# Si no hay schema por defecto:
❌ No existe ningún esquema por defecto
Puedes definir uno con: tai-sql set-default-schema <nombre>
O usar la opción: --schema <nombre_esquema>
# Si especificas un schema que no existe:
❌ El schema 'inexistente' no existe en el proyecto
📄 Schemas disponibles:
✅ public
productos
ventas
Workflow Típico
# 1. Crear nuevo proyecto
tai-sql init --name mi-empresa --schema productos
# 2. Entrar al proyecto
cd mi-empresa
# 3. Configurar base de datos
export DATABASE_URL="postgresql://user:pass@localhost/mi_empresa"
# 4. Editar el schema
# Editar schemas/productos.py
# 5. Sincronizar con BD
tai-sql push
# 6. Poblar con datos iniciales
tai-sql feed
# 7. Verificar conectividad
tai-sql ping --full
# 8. Crear schema adicional
tai-sql new-schema ventas
# 9. Cambiar schema por defecto
tai-sql set-default-schema ventas
# 10. Ver información del proyecto
tai-sql info
# 11. Generar recursos para todos los schemas
tai-sql generate --all
Gestión de Proyectos Multi-Schema
TAI-SQL soporta múltiples schemas en un mismo proyecto:
# Crear schemas adicionales
tai-sql new-schema productos
tai-sql new-schema ventas
tai-sql new-schema usuarios
# Trabajar con schemas específicos
tai-sql push --schema productos
tai-sql generate --schema ventas
# O procesar todos a la vez
tai-sql generate --all
# Cambiar entre schemas por defecto
tai-sql set-default-schema productos
tai-sql push # Usa 'productos' automáticamente
tai-sql set-default-schema ventas
tai-sql generate # Usa 'ventas' automáticamente
Ventajas del multi-schema:
- ✅ Modularidad: Separar lógicamente diferentes dominios
- ✅ Escalabilidad: Cada schema puede tener su propia configuración
- ✅ Flexibilidad: Procesar schemas individualmente o en conjunto
- ✅ Organización: Mejor estructura para proyectos complejos
🛠️ Crear tu Propio Generador
Puedes crear generadores personalizados heredando de BaseGenerator:
from tai_sql.generators.base import BaseGenerator
from tai_sql import db
import os
class APIDocsGenerator(BaseGenerator):
"""Generador de documentación API desde los modelos"""
def __init__(self, output_dir=None, format='markdown'):
super().__init__(output_dir or 'docs/api')
self.format = format
def generate(self) -> str:
"""Genera la documentación API"""
docs_content = self._create_header()
# Procesar cada modelo
for model in self.models:
if hasattr(model, '__tablename__'): # Es una tabla
docs_content += self._generate_table_docs(model)
else: # Es una vista
docs_content += self._generate_view_docs(model)
# Guardar archivo
output_path = os.path.join(self.config.output_dir, f'api.{self.format}')
with open(output_path, 'w', encoding='utf-8') as f:
f.write(docs_content)
return output_path
def _create_header(self) -> str:
"""Crea el header de la documentación"""
return f"""# API Documentation
Database: {db.provider.database}
Schema: {db.schema_name}
Generated: {datetime.now().isoformat()}
## Models
"""
def _generate_table_docs(self, model) -> str:
"""Genera documentación para una tabla"""
docs = f"### {model.__name__} (Table)\n\n"
docs += f"**Table name:** `{model.__tablename__}`\n\n"
if hasattr(model, '__description__'):
docs += f"**Description:** {model.__description__}\n\n"
docs += "**Columns:**\n\n"
docs += "| Column | Type | Constraints |\n"
docs += "|--------|------|-------------|\n"
for name, column in model.columns.items():
constraints = []
if column.primary_key:
constraints.append("PRIMARY KEY")
if not column.nullable:
constraints.append("NOT NULL")
if column.unique:
constraints.append("UNIQUE")
if column.autoincrement:
constraints.append("AUTO INCREMENT")
docs += f"| {name} | {column.type} | {', '.join(constraints)} |\n"
docs += "\n"
return docs
def _generate_view_docs(self, model) -> str:
"""Genera documentación para una vista"""
docs = f"### {model.__name__} (View)\n\n"
docs += f"**View name:** `{model.__tablename__}`\n\n"
if hasattr(model, '__description__'):
docs += f"**Description:** {model.__description__}\n\n"
# Agregar información de la vista...
return docs
# Uso del generador personalizado
generate(
...,
APIDocsGenerator(output_dir='docs/api', format='markdown')
)
Métodos requeridos:
generate(): Método principal que debe retornar la ruta del archivo generado
Métodos/propiedades útiles heredados:
self.models: Propiedad que contiene todos los modelos (tablas y vistas)self.config.output_dir: Directorio de salida configuradoself.register_model(model): Registra un modelo manualmenteself.clear_models(): Limpia la lista de modelos
Este framework te permite construir aplicaciones robustas con una definición declarativa simple, generación automática de código y herramientas CLI potentes para el desarrollo ágil.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tai_sql-0.4.12.tar.gz.
File metadata
- Download URL: tai_sql-0.4.12.tar.gz
- Upload date:
- Size: 236.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.11.0 Linux/6.14.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
270f888450e550536a9fc20b482aa3c840d5ef03a495d76a319e8caf2541fc90
|
|
| MD5 |
fbfece36607b40eca6c666c23766f3e4
|
|
| BLAKE2b-256 |
7ff57c460cc5b6e7978fd054e34e7816912528eaebee455436f92fa4cb4a15ae
|
File details
Details for the file tai_sql-0.4.12-py3-none-any.whl.
File metadata
- Download URL: tai_sql-0.4.12-py3-none-any.whl
- Upload date:
- Size: 271.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.11.0 Linux/6.14.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f83ae9646ef0f78d37e3fa7d11dda479ab587a61c2f1c88293a8c7e97db75fb7
|
|
| MD5 |
00829a8b953f7444076c6097d053a865
|
|
| BLAKE2b-256 |
dabfd75321dc836193008987bbcc703bad25654aa26d27f7e5e0bc8137a6a40e
|