Skip to main content

Uma abstração moderna e inteligente para zen-engine com otimizações avançadas para Spark/Databricks

Project description

🧠 Wise Decision Engine

Motor de decisão que separa a definição das regras de negócio do seu local de processamento

Python 3.8+ License: MIT Databricks Ready Automated Release codecov PyPI version GitHub Release Code style: black


💼 Problema de Negócio

Em ambientes corporativos, as regras de negócio mudam constantemente mas estão frequentemente acopladas ao código de processamento. Isso gera:

  • Demora para mudanças: Alterações simples requerem deploy completo
  • 🔄 Dependência técnica: Regras de negócio presas no pipeline de dados
  • 🚫 Falta de governança: Regras espalhadas e sem controle centralizado
  • 💸 Alto custo de manutenção: Equipe técnica para mudanças de negócio

🎯 Solução: Separação de Responsabilidades

O Wise Decision Engine permite:

🏛️ DEFINIÇÃO DA REGRA ⚙️ PROCESSAMENTO DOS DADOS
Armazenada em tabelas/arquivos Executado em Spark/Databricks
Modificável por analistas Gerenciado por engenheiros
Governança centralizada Performance otimizada
Versionamento de regras Processamento distribuído

🔧 Como Funciona

1. Defina a Regra (Uma vez)

// Salva em tabela Databricks ou arquivo JSON
{
  "name": "aprovacao-credito",
  "rules": {
    "if": [{"var": "renda"}, ">", 5000],
    "then": {"aprovado": true, "limite": 10000},
    "else": {"aprovado": false, "limite": 0}
  }
}

2. Processe os Dados (Sempre que necessário)

from wise_decision_engine import DatabricksHelper

# Uma linha aplica a regra para milhões de registros
resultado = DatabricksHelper.quick_decision_apply(
    catalog="regras_catalog",
    schema="public", 
    table="decisoes",
    decision_name="aprovacao-credito",
    input_df=clientes_df  # DataFrame com milhões de clientes
)

# ✅ Resultado: DataFrame com decisões aplicadas automaticamente
resultado.show()

3. Mude a Regra (Sem redeploy)

-- Analista de negócio altera diretamente na tabela
UPDATE regras_catalog.public.decisoes 
SET content = '{"rules": {"if": [{"var": "renda"}, ">", 8000], ...}}'
WHERE name = 'aprovacao-credito';

-- ✅ Próxima execução já usa a nova regra (cache automático)

📦 Instalação

pip install wise-decision-engine

🚀 Releases Automatizados

Este projeto utiliza um sistema completamente automatizado de releases. Para contribuir:

Para Desenvolvedores

# Para nova funcionalidade (minor bump)
git commit -m "feat: implementa cache inteligente"

# Para correção de bug (patch bump)
git commit -m "fix: resolve erro de parsing"

# Para breaking changes (major bump)
git commit -m "feat!: remove API deprecated"

git push origin main

✨ Resultado: Release automático no GitHub + PyPI em ~1-2 minutos!

📖 Veja o guia completo de releases para detalhes sobre:

  • Conventional commits
  • Fluxo automático completo
  • Troubleshooting
  • Configurações avançadas

🚀 Casos de Uso Reais

Aprovação de Crédito

# Regra armazenada em tabela Databricks
# Processamento em Spark para milhões de clientes
resultado = DatabricksHelper.quick_decision_apply(
    "financeiro_catalog", "regras", "decisoes",
    "aprovacao-pf", clientes_df
)

Detecção de Fraude

# Mesma interface, regra diferente
# Analista atualiza regra sem código
resultado = DatabricksHelper.quick_decision_apply(
    "risco_catalog", "regras", "decisoes", 
    "deteccao-fraude", transacoes_df
)

Precificação Dinâmica

# Regras de pricing atualizadas em tempo real
resultado = DatabricksHelper.quick_decision_apply(
    "comercial_catalog", "regras", "decisoes",
    "precificacao-produto", vendas_df
)

🎯 Benefícios de Negócio

Para Analistas de Negócio

  • Autonomia total: Alteram regras sem depender de TI
  • Versionamento: Histórico completo de mudanças
  • Teste A/B: Diferentes versões de regras facilmente
  • Governança: Controle centralizado de todas as regras

Para Engenheiros de Dados

  • Menos deploy: Mudanças de regra não requerem código
  • Performance: Processamento otimizado para Spark
  • Manutenibilidade: Código limpo e desacoplado
  • Escalabilidade: Engine preparada para big data

Para Organização

  • 💰 Redução de custos: 80% menos tempo para mudanças
  • Time-to-market: Novas regras em minutos, não semanas
  • 🔒 Compliance: Auditoria completa de regras aplicadas
  • 📈 Agilidade: Resposta rápida a mudanças de mercado

🏗️ Arquitetura da Solução

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   📋 REGRAS     │    │  🔄 PROCESSING   │    │  📊 RESULTADO   │
│                 │    │                  │    │                 │
│ • Tabela Delta  │───▶│ • Spark Engine   │───▶│ • DataFrame     │
│ • Arquivos JSON │    │ • Cache Auto     │    │ • Schema Auto   │
│ • Versionadas   │    │ • UDFs Otimizas  │    │ • Performance   │
└─────────────────┘    └──────────────────┘    └─────────────────┘

Separação Clara de Responsabilidades

  • 📋 Camada de Regras: Definição e governança (Analistas)
  • 🔄 Camada de Processamento: Performance e escala (Engenheiros)
  • 📊 Camada de Resultado: Consumo e análise (Usuários finais)

🔍 Auto-Schema: Expansão Automática de Colunas

Problema: Schema Manual Complexo

Antes, criar colunas a partir dos resultados das decisões requeria ~20 linhas de código manual:

# ❌ Schema manual (complexo e propenso a erros)
exemplo_json = resultado_df.select("wd_result").limit(1).collect()[0]["wd_result"]
exemplo_result = json.loads(exemplo_json).get("result", {})

# Define tipos manualmente para cada campo
result_fields = []
for k, v in exemplo_result.items():
    field_type = type_map.get(type(v), StringType())
    result_fields.append(StructField(k, field_type, True))

result_schema = StructType(result_fields)
# ... mais 10+ linhas para aplicar schema ...

Solução: Auto-Schema Inteligente

Com auto-schema, isso vira 1 parâmetro:

# ✅ Auto-schema (automático e inteligente)
resultado = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "minha-regra", df,
    auto_schema=True,              # 🔥 Detecção automática de tipos
    schema_strategy="aggressive",   # Estratégia inteligente
    show_schema_info=True          # Mostra o que foi detectado
)

# ✅ Resultado: Todas as colunas expandidas automaticamente com tipos corretos!

Como Funciona o Auto-Schema

  1. 🔍 Análise Automática: Coleta amostras dos resultados JSON
  2. 🧠 Detecção Inteligente: Identifica tipos automaticamente (int, bool, string, arrays, structs)
  3. 📊 Schema Otimizado: Cria schema Spark com tipos corretos
  4. ⚡ Expansão Automática: Converte para colunas individuais com prefixo decision_*

Estratégias Disponíveis

Estratégia Descrição Uso Recomendado
conservative Tipos básicos apenas Produção estável
aggressive Inclui arrays e structs aninhados Estruturas complexas
flat_only Apenas campos de primeiro nível Performance máxima

Exemplo de Transformação

Input JSON:

{
  "result": {
    "score": 750,
    "approved": true,
    "limit": 10000.50,
    "reasons": ["good_credit", "stable_income"]
  }
}

Schema Detectado Automaticamente:

  • scoreIntegerType()
  • approvedBooleanType()
  • limitDoubleType()
  • reasonsArrayType(StringType()) (modo aggressive)

DataFrame Final:

Original columns + decision_score + decision_approved + decision_limit + decision_reasons

Benefícios do Auto-Schema

Aspecto Schema Manual Auto-Schema Melhoria
Linhas de código ~20 linhas 1 parâmetro 20x menos
Detecção de tipos Manual Automática 100% automático
Estruturas complexas Muito complexo Simples 10x mais simples
Manutenção Alta Zero Eliminada
Erros de schema Frequentes Raros 95% redução
Tempo de desenvolvimento Horas Segundos 1000x mais rápido

Casos de Uso Ideais para Auto-Schema

  • ✅ Decisões com muitos campos de saída
  • ✅ Estruturas que mudam frequentemente
  • ✅ Múltiplas decisões com schemas diferentes
  • ✅ Prototipagem rápida
  • ✅ Ambientes de produção com governança

🎯 Controle Granular de Campos: Máxima Flexibilidade

Problema: DataFrame "Poluído" ou Falta de Chaves para Join

Três cenários comuns:

  1. DataFrame "poluído": 50+ colunas originais + resultados da decisão
  2. Apenas resultados: Quer só campos da decisão, mas perde as chaves de join
  3. Join controlado: Quer resultados + algumas chaves específicas
# ❌ Problema 1: DataFrame "poluído"
resultado_completo = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "credito-pj", clientes_df
)
# Resultado: 50 colunas originais + 3 da decisão = 53 total (📊 pesado!)

# ❌ Problema 2: Perde chaves de join
resultado_so_decisao = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "credito-pj", clientes_df,
    return_only_new_fields=True  # Deprecated
)
# Resultado: decision_score + decision_limit + decision_approved
# 🚫 NÃO TEM customer_id para fazer join!

✨ Solução: Controle Granular com original_fields + decision_fields

Controle total sobre quais campos retornar:

# 🎯 Cenário 1: Apenas campos da decisão
apenas_decisao = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "credito-pj", clientes_df,
    auto_schema=True,
    original_fields=False,    # ✖️ Não incluir campos originais
    decision_fields=True      # ✅ Incluir todos campos da decisão
)
# Resultado: decision_score + decision_approved + decision_limit (3 colunas)

# 🔥 Cenário 2: Join otimizado (chaves + decisão)
join_otimizado = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "credito-pj", clientes_df,
    auto_schema=True,
    original_fields=['customer_id', 'account_id'],  # 🔑 Chaves para join
    decision_fields=True                           # ✅ Toda a decisão
)
# Resultado: customer_id + account_id + decision_score + decision_approved + decision_limit (5 colunas)

# 🎨 Cenário 3: Seleção específica de ambos
analise_customizada = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "credito-pj", clientes_df,
    auto_schema=True,
    original_fields=['customer_id', 'renda', 'idade'],       # 📈 Campos de análise
    decision_fields=['decision_score', 'decision_approved']   # 🎯 Campos específicos
)
# Resultado: customer_id + renda + idade + decision_score + decision_approved (5 colunas)

Sinergia Perfeita: Auto-Schema + Only New Fields

A combinação é extremamente poderosa:

# 🚀 Máxima produtividade: Auto-detecção + Resultados limpos
resultados = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "minha-regra",
    dados_df,
    auto_schema=True,              # ✨ Detecta campos automaticamente
    schema_strategy="aggressive",   # 💪 Máxima detecção
    return_only_new_fields=True,   # 🎯 Apenas campos da decisão
    show_schema_info=True          # 🔍 Debug: mostra o que foi detectado
)

# ✅ Resultado: DataFrame limpo com campos perfeitamente tipados!

🚀 Casos de Uso Reais

1. 📊 Tabelas de Resultados Puros

# Salva apenas resultados das decisões (sem poluição)
apenas_decisao.write.mode("overwrite").saveAsTable("decisoes_credito_resultados")

# Perfeito para: Data Lakes, reports, dashboards

2. 🔗 Joins Controlados e Otimizados

# Join super limpo com chaves específicas
clientes_finais = (
    clientes_df
    .join(join_otimizado, on=['customer_id', 'account_id'])  # 🔥 Join direto!
)

# Vantagem: Join rápido, sem colunas duplicadas

3. 🤖 Machine Learning Features

# Dataset limpo para ML - apenas features relevantes
ml_features = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "risk-features", df,
    original_fields=['customer_id'],  # ID para tracking
    decision_fields=['risk_score', 'fraud_probability', 'credit_limit']
)

# Uso em pipelines ML
ml_pipeline.fit(ml_features.select('risk_score', 'fraud_probability'))

4. 🌐 APIs de Alta Performance

# Payload ultra-otimizado para APIs
api_response = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "api-response", df,
    original_fields=['transaction_id'],  # ID para client
    decision_fields=['status', 'approved_amount', 'next_steps']
).toPandas().to_dict("records")

# Resultado: JSON limpo, 90% menor que full DataFrame

5. 📈 Análises Específicas

# Dataset para análise de risco focada
risk_analysis = DatabricksHelper.quick_decision_apply(
    "catalog", "schema", "decisoes", "risk-analysis", df,
    original_fields=['customer_id', 'segment', 'region'],    # Dimensões
    decision_fields=['risk_score', 'probability_default']    # Métricas
)

# Perfeito para: BI tools, relatórios executivos, análises ad-hoc

📋 Matriz de Configurações

Configuração Colunas Resultado Uso Ideal
original_fields=True, decision_fields=True Originais (50) + Decisão (5) = 55 Análise exploratória
original_fields=False, decision_fields=True Apenas Decisão (5) = 5 Tabelas de resultado
original_fields=['id'], decision_fields=True Chave (1) + Decisão (5) = 6 Join otimizado 🔥
original_fields=['id', 'name'], decision_fields=['score'] Seleção (2) + Específico (1) = 3 Análise focada
original_fields=True, decision_fields=False Apenas Originais (50) = 50 Debug/validação

Benefícios

Aspecto Sem Filtro Com return_only_new_fields Melhoria
Volume de dados 100% ~10% 90% redução
Clareza Confuso Cristalino 100% melhor
Performance Padrão Otimizada Significativa
Join posterior Difícil Simples 10x mais fácil
API payload Pesado Limpo 90% menor

⚙️ Configuração Avançada

Adapters Disponíveis

from wise_decision_engine import WiseDecisionEngine, DatabricksAdapter, FileAdapter

# Para tabelas Databricks
adapter = DatabricksAdapter(
    catalog="meu_catalog",
    schema="regras", 
    table="decisoes"
)

# Para arquivos JSON locais
adapter = FileAdapter(file_path="/path/to/rules.json")

# Engine configurável
engine = WiseDecisionEngine(adapter=adapter)

💡 Exemplo Completo

Notebook Databricks

# 1. Instalar
%pip install wise-decision-engine

# 2. Aplicar decisão
from wise_decision_engine import DatabricksHelper

resultado = DatabricksHelper.quick_decision_apply(
    catalog="regras_catalog",
    schema="public", 
    table="decisoes",
    decision_name="minha-regra",
    input_df=meus_dados_df
)

# 3. Visualizar resultado
resultado.display()

🤝 Contribuição e Suporte

Repositório

Como Contribuir

  1. Fork o repositório
  2. Crie sua feature branch
  3. Commit suas mudanças
  4. Abra um Pull Request

📄 Licença

MIT License - veja LICENSE para detalhes.

Construído pela Five Acts 🎆

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wise_decision_engine-0.6.0.tar.gz (33.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wise_decision_engine-0.6.0-py3-none-any.whl (31.1 kB view details)

Uploaded Python 3

File details

Details for the file wise_decision_engine-0.6.0.tar.gz.

File metadata

  • Download URL: wise_decision_engine-0.6.0.tar.gz
  • Upload date:
  • Size: 33.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for wise_decision_engine-0.6.0.tar.gz
Algorithm Hash digest
SHA256 2ce336484b60345df82c9a2057a5d3fd712cc704ec06462fce0e50cae781474b
MD5 05f6200dce6b2e00865770dec7cc540e
BLAKE2b-256 534109cdef062a88dee6c03a299467d51b5eb374adaa15e222b91c808698e339

See more details on using hashes here.

File details

Details for the file wise_decision_engine-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for wise_decision_engine-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a2343fc8b7bdab5d685540315839b8e9ec8a02e12bc213f08c663919c292eabe
MD5 242abb0b58b9ff4f6ab0b37e80aee36a
BLAKE2b-256 1875fc436f07b04ca7b8aa32ff5ede540766511180b17d12753df1665a51f028

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page