Biblioteca de normalizacao de enderecos brasileiros
Project description
br-address-normalize
Biblioteca Python para normalização de endereços brasileiros com foco em match rate para cruzamento de bases de dados.
Desenvolvido com base em análise profunda de 76.9M registros de 6 estados (AM, AP, GO, MG, MT, RS), identificando e resolvendo anomalias críticas que impedem matching.
Índice
- Visão Geral
- Arquitetura
- Setup Local com Docker + LocalStack
- Deploy em AWS
- Uso da Biblioteca
- Pipeline de Normalização
- Resultados
- Roadmap
- Referências de Análise
Visão Geral
O objetivo é padronizar endereços de entrada para maximizar a taxa de correspondência com bases de referência. O pipeline é determinístico, idempotente e sem dependências externas no core.
Problemas Identificados
Análise de 76.9M registros revelou anomalias críticas que impedem matching:
| Anomalia | Volume | Impacto |
|---|---|---|
| Encoding corrompido | 2.8M | Impossível fazer match com bases limpas |
| Campos trocados | 558K | Dados no campo errado |
| Abreviações não expandidas | 51.6M | RUA BRASIL vs R BRASIL não fazem match |
| Estrutura desorganizada | 182K | Complemento embutido no número |
| Placeholders genéricos | 3.3M | Valores como NI, ND, NINF |
| CEP divergente da UF | 68K | Prefixo não corresponde à UF declarada |
Princípios de Design
- Conservadorismo — Só transforma o que tem certeza. Ambiguidade → flag, não transformação.
- Idempotência —
normalize(normalize(x)) == normalize(x) - Sem dependências externas no core — Consultas a Correios/IBGE são escopo de v3.0.
- Rastreabilidade — Toda transformação registrada em
metadata.transformacoes.
Arquitetura
O serviço roda como um worker containerizado que consome mensagens de uma fila SQS. Localmente, o SQS é simulado com LocalStack. Em produção, aponta para o SQS real da AWS sem mudança de código.
Por que Worker e não Lambda
O tempo de processamento de grandes volumes (ex: 1M+ registros) é incompatível com o timeout máximo de 15 minutos do Lambda. O worker roda indefinidamente, processa em batch e escala horizontalmente via réplicas de container.
Fluxo de Dados
┌─────────────────────────────────────────────────────────────────┐
│ Produtor (qualquer sistema) │
│ Envia mensagens JSON para a fila SQS │
└──────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SQS (LocalStack local / AWS em produção) │
│ Fila entrada: address-normalization-queue │
│ Fila saída: address-normalization-results │
│ DLQ: address-normalization-dlq │
└──────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Container (normalizer-worker) │
│ ├── queue/base.py — interface QueueBackend (agnóstica de broker)│
│ ├── queue/sqs.py — implementação SQS/LocalStack │
│ ├── consumer.py — polling via QueueBackend (batch até 10) │
│ ├── processor.py — asyncio.gather para paralelismo │
│ └── publisher.py — publicação e confirmação via QueueBackend │
└─────────────────────────────────────────────────────────────────┘
Camadas da Aplicação
┌─────────────────────────────────────┐
│ Worker (consumer/processor/pub) │ ← Transporte via QueueBackend
├─────────────────────────────────────┤
│ QueueBackend (queue/base.py) │ ← Interface agnóstica de broker
├─────────────────────────────────────┤
│ Fachada (NormalizerFacade) │ ← Ponto de entrada público
├─────────────────────────────────────┤
│ Pipeline (orquestração) │ ← Executa steps sequencialmente
├─────────────────────────────────────┤
│ Steps (adaptadores) │ ← Adaptam módulos para o pipeline
├─────────────────────────────────────┤
│ Módulos (lógica de negócio) │ ← Lógica pura, testável isoladamente
├─────────────────────────────────────┤
│ Data (JSONs de configuração) │ ← Dicionários, regras, whitelist
└─────────────────────────────────────┘
Estrutura de Diretórios
br-address-normalize/
├── src/address_normalizer/
│ ├── core/ # Interface PipelineStep (ABC)
│ ├── data/ # JSONs de configuração e dicionários
│ ├── facade.py # NormalizerFacade — ponto de entrada público
│ ├── modules/
│ │ ├── abbreviation/ # Providers e camadas de abreviação (L1+L2)
│ │ ├── field_swap/ # Detecção e correção de campos trocados
│ │ └── *.py # Módulos por campo (bairro, cep, numero...)
│ ├── pipeline/
│ │ ├── orchestrator.py # Executa steps sequencialmente
│ │ └── steps/ # 13 steps do pipeline
│ ├── worker/
│ │ ├── queue/
│ │ │ ├── base.py # QueueBackend — interface ABC (receber/publicar/confirmar)
│ │ │ └── sqs.py # SQSBackend — implementação boto3/LocalStack
│ │ ├── consumer.py # Loop de polling (usa QueueBackend)
│ │ ├── processor.py # Normalização de registros (sem lógica de fila)
│ │ ├── publisher.py # Publicação e confirmação (usa QueueBackend)
│ │ ├── schemas.py # Contratos de mensagem (Pydantic)
│ │ └── security.py # Validação e sanitização de payloads
│ └── schemas/ # NormalizationResult, EnderecoInput
├── docker/
│ ├── Dockerfile # Imagem do worker (python:3.11-slim)
│ └── localstack/
│ └── init-aws.sh # Cria filas SQS no LocalStack na inicialização
├── docker-compose.yml # Orquestração local (worker + localstack)
├── .env.example # Variáveis de ambiente documentadas
├── pyproject.toml
└── README.md
Setup Local com Docker + LocalStack
Pré-requisitos
- Docker Desktop (ou Docker Engine + Compose plugin)
awscli-localpara interagir com o LocalStack via CLI
pip install awscli-local
1. Subir o ambiente local
docker compose up --build
Isso sobe dois serviços:
localstack— SQS simulado na porta4566normalizer-worker— Worker que consome a fila
O LocalStack cria automaticamente as filas na inicialização via docker/localstack/init-aws.sh.
2. Enviar mensagens para a fila
Endereço único (formato raw):
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}'
Batch com metadados (formato estruturado):
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"meta": {
"message_id": "msg-001",
"idempotency_key": "BATCH-2024-001",
"sent_at": "2024-01-15T10:30:00Z",
"priority": "normal"
},
"origem": {
"orgao": "DATAPREV",
"sistema": "BENEFICIOS-RURAL"
},
"modo": "batch",
"registros": [
{
"ref": "NIS-12345678901",
"endereco": {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
}
]
}'
3. Verificar resultados
awslocal sqs receive-message \
--queue-url http://localhost:4566/000000000000/address-normalization-results \
--max-number-of-messages 10
4. Monitorar o worker
docker compose logs -f normalizer-worker
Variáveis de Ambiente
| Variável | Default | Descrição |
|---|---|---|
SQS_ENDPOINT_URL |
http://localstack:4566 |
Endpoint SQS (vazio = AWS real) |
SQS_INPUT_QUEUE_URL |
— | URL da fila de entrada (obrigatório) |
SQS_OUTPUT_QUEUE_URL |
— | URL da fila de saída (obrigatório) |
AWS_REGION |
us-east-1 |
Região AWS |
AWS_ACCESS_KEY_ID |
test |
Credencial (LocalStack aceita qualquer valor) |
AWS_SECRET_ACCESS_KEY |
test |
Credencial (LocalStack aceita qualquer valor) |
WORKER_BATCH_SIZE |
10 |
Mensagens por poll (máx 10) |
WORKER_POLL_INTERVAL |
1 |
Segundos entre polls quando fila vazia |
WORKER_CONCURRENCY |
4 |
Corrotinas paralelas por batch |
ENABLE_V2 |
true |
Ativa steps v2.0 (campos trocados, CEP×UF) |
LOG_LEVEL |
INFO |
Nível de log |
Deploy em AWS
Pré-requisitos
- Conta AWS com permissões para SQS, ECS, ECR, CloudWatch
- AWS CLI configurado com credenciais
- Docker instalado localmente
1. Criar filas SQS em produção
# Criar DLQ
aws sqs create-queue \
--queue-name address-normalization-dlq \
--region us-east-1 \
--attributes MessageRetentionPeriod=86400
# Obter ARN da DLQ
DLQ_ARN=$(aws sqs get-queue-attributes \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-dlq \
--attribute-names QueueArn \
--query Attributes.QueueArn --output text \
--region us-east-1)
# Criar fila de entrada com redrive policy
aws sqs create-queue \
--queue-name address-normalization-queue \
--region us-east-1 \
--attributes \
VisibilityTimeout=60,\
MessageRetentionPeriod=86400,\
RedrivePolicy="{\"deadLetterTargetArn\":\"${DLQ_ARN}\",\"maxReceiveCount\":\"3\"}"
# Criar fila de saída
aws sqs create-queue \
--queue-name address-normalization-results \
--region us-east-1 \
--attributes MessageRetentionPeriod=86400
2. Criar repositório ECR
aws ecr create-repository \
--repository-name address-normalizer \
--region us-east-1
3. Build e push da imagem Docker
# Fazer login no ECR
aws ecr get-login-password --region us-east-1 | \
docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com
# Build da imagem
docker build -f docker/Dockerfile -t address-normalizer:latest .
# Tag para ECR
docker tag address-normalizer:latest \
123456789.dkr.ecr.us-east-1.amazonaws.com/address-normalizer:latest
# Push
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/address-normalizer:latest
4. Criar cluster ECS
# Criar cluster
aws ecs create-cluster \
--cluster-name address-normalizer-cluster \
--region us-east-1
# Criar log group
aws logs create-log-group \
--log-group-name /ecs/address-normalizer \
--region us-east-1
5. Criar IAM roles
Execution Role (permissões para ECS):
cat > trust-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ecs-tasks.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
aws iam create-role \
--role-name ecsTaskExecutionRole \
--assume-role-policy-document file://trust-policy.json
aws iam attach-role-policy \
--role-name ecsTaskExecutionRole \
--policy-arn arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy
# Adicionar permissão para CloudWatch Logs
aws iam put-role-policy \
--role-name ecsTaskExecutionRole \
--policy-name CloudWatchLogs \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:us-east-1:123456789:log-group:/ecs/address-normalizer:*"
}
]
}'
Task Role (permissões para a aplicação):
aws iam create-role \
--role-name ecsTaskRole \
--assume-role-policy-document file://trust-policy.json
# Adicionar permissão para SQS
aws iam put-role-policy \
--role-name ecsTaskRole \
--policy-name SQSAccess \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:SendMessage",
"sqs:GetQueueAttributes"
],
"Resource": [
"arn:aws:sqs:us-east-1:123456789:address-normalization-queue",
"arn:aws:sqs:us-east-1:123456789:address-normalization-results",
"arn:aws:sqs:us-east-1:123456789:address-normalization-dlq"
]
}
]
}'
6. Registrar task definition
cat > task-definition.json << 'EOF'
{
"family": "address-normalizer-worker",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "256",
"memory": "512",
"containerDefinitions": [
{
"name": "normalizer-worker",
"image": "123456789.dkr.ecr.us-east-1.amazonaws.com/address-normalizer:latest",
"essential": true,
"environment": [
{
"name": "SQS_INPUT_QUEUE_URL",
"value": "https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-queue"
},
{
"name": "SQS_OUTPUT_QUEUE_URL",
"value": "https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-results"
},
{
"name": "AWS_REGION",
"value": "us-east-1"
},
{
"name": "WORKER_BATCH_SIZE",
"value": "10"
},
{
"name": "WORKER_CONCURRENCY",
"value": "4"
},
{
"name": "ENABLE_V2",
"value": "true"
},
{
"name": "LOG_LEVEL",
"value": "INFO"
}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/address-normalizer",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "ecs"
}
}
}
],
"executionRoleArn": "arn:aws:iam::123456789:role/ecsTaskExecutionRole",
"taskRoleArn": "arn:aws:iam::123456789:role/ecsTaskRole"
}
EOF
aws ecs register-task-definition \
--cli-input-json file://task-definition.json \
--region us-east-1
7. Criar VPC e subnets (se necessário)
# Criar VPC
VPC_ID=$(aws ec2 create-vpc --cidr-block 10.0.0.0/16 --region us-east-1 --query Vpc.VpcId --output text)
# Criar subnet
SUBNET_ID=$(aws ec2 create-subnet \
--vpc-id $VPC_ID \
--cidr-block 10.0.1.0/24 \
--region us-east-1 \
--query Subnet.SubnetId --output text)
# Criar security group
SG_ID=$(aws ec2 create-security-group \
--group-name address-normalizer-sg \
--description "Security group for address normalizer" \
--vpc-id $VPC_ID \
--region us-east-1 \
--query GroupId --output text)
8. Criar serviço ECS
aws ecs create-service \
--cluster address-normalizer-cluster \
--service-name address-normalizer-service \
--task-definition address-normalizer-worker:1 \
--desired-count 2 \
--launch-type FARGATE \
--network-configuration "awsvpcConfiguration={subnets=[subnet-xxxxx],securityGroups=[sg-xxxxx],assignPublicIp=ENABLED}" \
--region us-east-1
9. Configurar auto-scaling
# Registrar target para auto-scaling
aws application-autoscaling register-scalable-target \
--service-namespace ecs \
--resource-id service/address-normalizer-cluster/address-normalizer-service \
--scalable-dimension ecs:service:DesiredCount \
--min-capacity 1 \
--max-capacity 10 \
--region us-east-1
# Criar política de scaling baseada em fila SQS
aws application-autoscaling put-scaling-policy \
--policy-name address-normalizer-scaling \
--service-namespace ecs \
--resource-id service/address-normalizer-cluster/address-normalizer-service \
--scalable-dimension ecs:service:DesiredCount \
--policy-type TargetTrackingScaling \
--target-tracking-scaling-policy-configuration '{
"TargetValue": 100.0,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "SQSQueueAverageAgeOfMessage"
},
"ScaleOutCooldown": 60,
"ScaleInCooldown": 300
}' \
--region us-east-1
10. Monitorar em produção
# Ver logs
aws logs tail /ecs/address-normalizer --follow --region us-east-1
# Ver status do serviço
aws ecs describe-services \
--cluster address-normalizer-cluster \
--services address-normalizer-service \
--region us-east-1
# Ver tarefas em execução
aws ecs list-tasks \
--cluster address-normalizer-cluster \
--region us-east-1
Troubleshooting em Produção
Tarefas não iniciando:
aws ecs describe-task-definition \
--task-definition address-normalizer-worker:1 \
--region us-east-1
Verificar permissões SQS:
aws sqs get-queue-attributes \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-queue \
--attribute-names All \
--region us-east-1
Ver erros de execução:
aws logs get-log-events \
--log-group-name /ecs/address-normalizer \
--log-stream-name ecs/address-normalizer-worker/xxxxx \
--region us-east-1
Uso da Biblioteca
Instalação
pip install -e .
# ou com uv
uv pip install -e .
Normalização direta (sem SQS)
import asyncio
from address_normalizer.facade import NormalizerFacade
from address_normalizer.modules.abbreviation.local_provider import LocalAbbreviationProvider
async def main():
normalizer = NormalizerFacade(
abbreviation_provider=LocalAbbreviationProvider(),
enable_v2=True,
)
endereco = {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567",
}
result = await normalizer.normalize(endereco)
print(result.normalizado["logradouro"]) # "RUA DOUTOR JOAO SILVA"
print(result.normalizado["numero"]) # "123"
print(result.normalizado["complemento"]) # "APARTAMENTO 45"
print(result.normalizado["bairro"]) # "JARDIM AMERICA"
print(result.metadata.flags) # ["ABREV_EXPANDIDA", ...]
print(result.metadata.confianca) # {"logradouro": 1.0, ...}
asyncio.run(main())
Batch
results = await normalizer.normalize_batch([endereco1, endereco2, endereco3])
Contrato de Mensagens SQS
O worker aceita dois formatos de mensagem: raw (endereço direto, sem envelope) e estruturado (com metadados de rastreabilidade para uso em produção).
Detecção automática de formato
O worker detecta o formato pela presença ou ausência do campo meta:
- Sem
meta→ modo raw, processa o objeto diretamente como endereço - Com
meta→ modo estruturado, extrairegistros[]do envelope
Em ambos os casos o pipeline completo é executado, incluindo a etapa de recategorização (FieldSwapStep) que detecta e corrige campos trocados antes de normalizar.
Formato Raw — endereço único sem envelope
Útil para integrações simples ou testes. O worker recebe o endereço semi-estruturado diretamente.
Entrada:
{
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
O pipeline executa normalmente, incluindo recategorização. Por exemplo, se logradouro contiver apenas dígitos, o FieldSwapStep detecta e move para numero antes de prosseguir.
Saída:
{
"ref": null,
"status": "ok",
"endereco": {
"original": {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
},
"normalizado": {
"logradouro": "RUA DOUTOR JOAO SILVA",
"numero": "123",
"complemento": "APARTAMENTO 45",
"bairro": "JARDIM AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
},
"qualidade": {
"score_geral": 0.99,
"grupo_poluicao": "G1",
"confianca": {
"logradouro": 1.0,
"numero": 1.0,
"complemento": 1.0,
"bairro": 1.0,
"municipio": 1.0,
"uf": 1.0,
"cep": 0.9
},
"flags": ["ABREV_EXPANDIDA"]
},
"transformacoes": [
{
"step": "abbreviation",
"campo": "logradouro",
"tipo": "abreviacao",
"de": "R",
"para": "RUA",
"regra": "TIPO_LOGRADOURO"
},
{
"step": "numero",
"campo": "numero",
"tipo": "extracao_complemento",
"de": "123 APTO 45",
"para": "123",
"regra": "COMPLEMENTO_EMBUTIDO"
}
],
"error": null
}
Formato Estruturado — batch com metadados
Para rastreabilidade completa em produção: idempotência por idempotency_key, identificação de origem, prioridade e opções de pipeline por mensagem.
Entrada:
{
"meta": {
"message_id": "uuid-v4",
"idempotency_key": "DATAPREV-BATCH-2024-001-chunk-003",
"sent_at": "2024-01-15T10:30:00Z",
"priority": "normal"
},
"origem": {
"orgao": "DATAPREV",
"sistema": "BENEFICIOS-RURAL",
"responsavel": "equipe-dados@dataprev.gov.br",
"ambiente": "producao"
},
"modo": "batch",
"registros": [
{
"ref": "NIS-12345678901",
"endereco": {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
},
{
"ref": "NIS-98765432100",
"endereco": {
"logradouro": "AV BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
}
}
],
"pipeline": {
"preset": "full",
"options": {
"enable_v2": true,
"confidence_threshold": 0.7
}
}
}
Campos do envelope de entrada:
| Campo | Tipo | Obrigatório | Descrição |
|---|---|---|---|
meta.message_id |
string (uuid) | sim | ID único da mensagem |
meta.idempotency_key |
string | sim | Chave de deduplicação — mesma chave retorna o mesmo resultado sem reprocessar |
meta.sent_at |
ISO 8601 | sim | Timestamp de envio |
meta.priority |
normal | high |
não | Prioridade de processamento |
origem.orgao |
string | sim | Órgão remetente |
origem.sistema |
string | sim | Sistema de origem |
origem.responsavel |
string | não | Email ou identificador do responsável |
origem.ambiente |
string | não | producao, homologacao, desenvolvimento |
modo |
batch | single |
sim | Modo de processamento |
registros[].ref |
string | sim | Identificador do registro no sistema de origem |
registros[].endereco |
object | sim | Endereço semi-estruturado (7 campos abaixo) |
pipeline.preset |
full | v1 | v2 |
não | Preset do pipeline (default: full) |
pipeline.options.enable_v2 |
bool | não | Ativa steps v2.0 — campos trocados, CEP×UF (default: true) |
pipeline.options.confidence_threshold |
float 0-1 | não | Score mínimo para status ok (default: 0.7) |
Campos do endereço (registros[].endereco):
| Campo | Tipo | Descrição |
|---|---|---|
logradouro |
string | Rua, avenida, travessa, etc. |
numero |
string | Número do imóvel (pode conter complemento embutido) |
complemento |
string | Apartamento, sala, bloco, etc. |
bairro |
string | Bairro |
municipio |
string | Município |
uf |
string | Sigla do estado (2 letras) |
cep |
string | CEP (8 dígitos, com ou sem hífen) |
Saída:
{
"meta": {
"message_id": "uuid-v4-novo",
"idempotency_key": "DATAPREV-BATCH-2024-001-chunk-003",
"sent_at": "2024-01-15T10:30:00Z",
"processed_at": "2024-01-15T10:30:00.412Z",
"processing_ms": 312,
"worker_id": "normalizer-worker-2",
"pipeline_version": "2.0.0"
},
"origem": {
"orgao": "DATAPREV",
"sistema": "BENEFICIOS-RURAL"
},
"summary": {
"total": 2,
"ok": 1,
"erro": 0,
"alertas": 1,
"status": "partial"
},
"registros": [
{
"ref": "NIS-12345678901",
"status": "ok",
"endereco": {
"original": {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
},
"normalizado": {
"logradouro": "RUA DOUTOR JOAO SILVA",
"numero": "123",
"complemento": "APARTAMENTO 45",
"bairro": "JARDIM AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
},
"qualidade": {
"score_geral": 0.99,
"grupo_poluicao": "G1",
"confianca": {
"logradouro": 1.0,
"numero": 1.0,
"complemento": 1.0,
"bairro": 1.0,
"municipio": 1.0,
"uf": 1.0,
"cep": 0.9
},
"flags": ["ABREV_EXPANDIDA"]
},
"transformacoes": [
{
"step": "abbreviation",
"campo": "logradouro",
"tipo": "abreviacao",
"de": "R",
"para": "RUA",
"regra": "TIPO_LOGRADOURO"
},
{
"step": "abbreviation",
"campo": "logradouro",
"tipo": "abreviacao",
"de": "DR",
"para": "DOUTOR",
"regra": "TITULO_PROPRIO"
},
{
"step": "numero",
"campo": "numero",
"tipo": "extracao_complemento",
"de": "123 APTO 45",
"para": "123",
"regra": "COMPLEMENTO_EMBUTIDO"
}
],
"error": null
},
{
"ref": "NIS-98765432100",
"status": "alerta",
"endereco": {
"original": {
"logradouro": "AV BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
},
"normalizado": {
"logradouro": "AVENIDA BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
}
},
"qualidade": {
"score_geral": 0.71,
"grupo_poluicao": "G2",
"confianca": {
"logradouro": 1.0,
"numero": 0.5,
"complemento": 1.0,
"bairro": 0.8,
"municipio": 1.0,
"uf": 1.0,
"cep": 0.3
},
"flags": ["ABREV_EXPANDIDA", "VARIANTE_SN", "CEP_GENERICO"]
},
"transformacoes": [
{
"step": "abbreviation",
"campo": "logradouro",
"tipo": "abreviacao",
"de": "AV",
"para": "AVENIDA",
"regra": "TIPO_LOGRADOURO"
},
{
"step": "numero",
"campo": "numero",
"tipo": "variante_sn",
"de": "SN",
"para": "SN",
"regra": "VARIANTE_SN_CANONICA"
}
],
"error": null
}
]
}
Campos da saída:
| Campo | Descrição |
|---|---|
meta.processed_at |
Timestamp de conclusão do processamento |
meta.processing_ms |
Tempo total em milissegundos |
meta.worker_id |
Instância do worker que processou |
meta.pipeline_version |
Versão do pipeline utilizada |
summary.status |
ok — todos ok | partial — algum com alerta/erro | error — todos falharam |
registros[].status |
ok | alerta | erro |
registros[].qualidade.score_geral |
Média ponderada das confianças por campo (0-1) |
registros[].qualidade.grupo_poluicao |
G1 limpo | G2 médio | G3 poluído |
registros[].qualidade.flags |
Flags de rastreabilidade geradas pelo pipeline |
registros[].transformacoes |
Lista detalhada de cada transformação aplicada |
registros[].error |
Mensagem de erro se status = erro, caso contrário null |
Regras de status por registro:
| Status | Condição |
|---|---|
ok |
score_geral ≥ confidence_threshold e sem flags de alerta |
alerta |
score_geral ≥ confidence_threshold mas com CEP_GENERICO, VARIANTE_SN ou CAMPO_TROCADO_AMBIGUO |
erro |
Exceção no processamento ou score_geral < confidence_threshold |
Pipeline de Normalização
O pipeline executa 13 steps sequenciais. A ordem é crítica — não altere sem validação.
Entrada (dict raw)
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ v1.0 — Limpeza Mecânica │
│ [1] UppercaseStep → [2] RemoveAccentsStep → [3] EncodingStep │
├─────────────────────────────────────────────────────────────────┤
│ v1.1 — Expansão de Abreviações │
│ [4] AbbreviationStep → [5] PreprocessingStep │
├─────────────────────────────────────────────────────────────────┤
│ v1.2 — Normalização Estrutural │
│ [6] NumeroStep → [7] ComplementoStep → [8] BairroStep │
│ [9] MunicipioStep → [10] CepStep │
├─────────────────────────────────────────────────────────────────┤
│ v2.0 — Resolução Contextual (enable_v2=True) │
│ [11] FieldSwapStep → [12] RegionalAbbreviationStep │
│ [13] CepUfValidationStep │
└─────────────────────────────────────────────────────────────────┘
│
▼
NormalizationResult
Dependências de Ordem
- Uppercase deve vir primeiro (para identificar tokens)
- RemoveAccents antes de Encoding (encoding opera sobre texto já em ASCII)
- Abbreviation antes de Preprocessing (expandir antes de deduplicar)
- Steps v1.2 após Preprocessing (dados já limpos)
- Steps v2.0 por último (contexto completo disponível)
Flags de Rastreabilidade
| Flag | Versão | Significado |
|---|---|---|
ENCODING_CORRIGIDO |
v1.0 | Encoding corrompido corrigido |
PLACEHOLDER_REMOVIDO |
v1.0 | Placeholder removido |
ABREV_EXPANDIDA |
v1.1 | Abreviação expandida |
DUPLICADO_REMOVIDO |
v1.1 | Tipo de via duplicado removido |
VARIANTE_SN |
v1.2 | Variante de S/N normalizada |
CEP_GENERICO |
v1.2 | CEP genérico (00000000, etc.) |
CAMPO_TROCADO_DETECTADO |
v2.0 | Campo trocado detectado |
CAMPO_TROCADO_CORRIGIDO |
v2.0 | Campo trocado corrigido |
CAMPO_TROCADO_AMBIGUO |
v2.0 | Detectado mas não corrigido (para v2.1) |
ABREV_REGIONAL |
v2.0 | Abreviação expandida por regra regional |
CEP_DIVERGENTE_UF |
v2.0 | CEP diverge da UF declarada |
Desenvolvimento Local
Setup sem Docker
# Clone
git clone <repo-url>
cd br-address-normalize
# Ambiente virtual
python -m venv .venv
source .venv/bin/activate # Linux/macOS
.venv\Scripts\activate # Windows
# Instalar
pip install -e ".[dev]"
Testes Automatizados
# Todos os testes
pytest tests/ -v
# Com cobertura
pytest tests/ --cov=src/address_normalizer --cov-report=html
# Teste específico
pytest tests/test_v2_0.py -v
Testes Manuais com Docker + LocalStack
Este guia permite testar o pipeline de normalização localmente com Docker, simulando o ambiente de produção com SQS.
Pré-requisitos
# Instalar awscli-local para interagir com LocalStack
pip install awscli-local
# Verificar Docker
docker --version
docker compose --version
1. Iniciar o Ambiente Local
# Subir LocalStack + Worker
# Docker moderno (v20.10+):
docker compose up --build
# Docker antigo / WSL2 sem plugin:
docker-compose up --build
# Em outro terminal, verificar se os serviços estão rodando
docker compose ps
# ou
docker-compose ps
Você verá:
localstack— SQS simulado na porta 4566normalizer-worker— Worker consumindo a fila
2. Enviar Testes Manuais
Teste 1: Endereço Simples (Formato Raw)
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}'
Teste 2: Endereço com Anomalias (Encoding + Abreviações)
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "AV BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
}'
Teste 3: Endereço com Complemento Embutido
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "RUA BRASIL",
"numero": "456 SALA 10",
"complemento": "",
"bairro": "CENTRO",
"municipio": "RIO DE JANEIRO",
"uf": "RJ",
"cep": "20000000"
}'
Teste 4: Batch com Metadados (Formato Estruturado)
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"meta": {
"message_id": "msg-001",
"idempotency_key": "BATCH-2024-001",
"sent_at": "2024-01-15T10:30:00Z",
"priority": "normal"
},
"origem": {
"orgao": "DATAPREV",
"sistema": "BENEFICIOS-RURAL"
},
"modo": "batch",
"registros": [
{
"ref": "NIS-12345678901",
"endereco": {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
},
{
"ref": "NIS-98765432100",
"endereco": {
"logradouro": "AV BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
}
}
]
}'
Teste 5: Campos Trocados (v2.0)
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "123",
"numero": "RUA BRASIL",
"complemento": "",
"bairro": "CENTRO",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}'
Teste 6: CEP Divergente da UF (v2.0)
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "RUA BRASIL",
"numero": "100",
"complemento": "",
"bairro": "CENTRO",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "69000000"
}'
Teste 7: Placeholder Genérico
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "RUA BRASIL",
"numero": "NI",
"complemento": "",
"bairro": "ND",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "00000000"
}'
3. Verificar Resultados
# Receber mensagens da fila de resultados
awslocal sqs receive-message \
--queue-url http://localhost:4566/000000000000/address-normalization-results \
--max-number-of-messages 10
Você verá a resposta com:
endereco.normalizado— Endereço após normalizaçãoqualidade.score_geral— Score de confiança (0-1)qualidade.flags— Flags de transformação aplicadastransformacoes— Detalhes de cada transformação
4. Monitorar o Worker
Em outro terminal:
# Ver logs em tempo real
docker compose logs -f normalizer-worker
# Ver logs de um serviço específico
docker compose logs -f localstack
5. Testar Idempotência
Envie a mesma mensagem duas vezes:
# Primeira vez
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}'
# Aguarde processamento (ver logs)
# Depois envie novamente
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}'
# Receba ambos os resultados
awslocal sqs receive-message \
--queue-url http://localhost:4566/000000000000/address-normalization-results \
--max-number-of-messages 10
Esperado: Ambos os resultados devem ser idênticos (mesmo normalizado, mesmas transformacoes).
6. Testar Batch vs Individual
Envie um batch com 2 registros:
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"meta": {
"message_id": "batch-test",
"idempotency_key": "BATCH-EQUIV-001",
"sent_at": "2024-01-15T10:30:00Z"
},
"modo": "batch",
"registros": [
{
"ref": "REF-001",
"endereco": {
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}
},
{
"ref": "REF-002",
"endereco": {
"logradouro": "AV BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
}
}
]
}'
Depois envie os mesmos endereços individualmente:
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "R DR JOAO SILVA",
"numero": "123 APTO 45",
"complemento": "",
"bairro": "JD AMERICA",
"municipio": "SAO PAULO",
"uf": "SP",
"cep": "01234567"
}'
awslocal sqs send-message \
--queue-url http://localhost:4566/000000000000/address-normalization-queue \
--message-body '{
"logradouro": "AV BRASIL",
"numero": "SN",
"complemento": "",
"bairro": "CENTRO",
"municipio": "MANAUS",
"uf": "AM",
"cep": "69000000"
}'
Esperado: Os resultados normalizados devem ser idênticos entre batch e individual.
7. Limpar Ambiente
# Parar containers
docker compose down
# Remover volumes (limpar dados)
docker compose down -v
Checklist de Testes Manuais
- Teste 1: Endereço simples normaliza corretamente
- Teste 2: Abreviações são expandidas (AV → AVENIDA)
- Teste 3: Complemento embutido é extraído (123 APTO 45 → 123 + APARTAMENTO 45)
- Teste 4: Batch com metadados processa todos os registros
- Teste 5: Campos trocados são detectados e corrigidos
- Teste 6: CEP divergente da UF gera flag de alerta
- Teste 7: Placeholders genéricos são removidos
- Idempotência: Mesma entrada produz mesma saída
- Batch Equivalência: Batch e individual produzem mesmos resultados
- Logs: Worker registra todas as transformações
- Fila de Saída: Resultados aparecem na fila de resultados
- DLQ: Mensagens inválidas vão para Dead Letter Queue
Resultados
Benchmark Consolidado (24.37M registros, 6 UFs)
| Cenário | Taxa | Ganho |
|---|---|---|
| C1 — Raw vs Raw (baseline) | 64.53% | — |
| C4 — Norm vs Norm | 68.11% | +3.58pp |
| Registros resgatados | — | 872.625 |
| Regressões | — | 0 |
Por Grupo de Poluição
| Grupo | Volume | Baseline | Normalizado | Ganho |
|---|---|---|---|---|
| G1 (limpo) | 18.17M | 65.57% | 66.55% | +0.98pp |
| G2 (médio) | 5.15M | 68.93% | 75.86% | +6.94pp |
| G3 (poluído) | 1.04M | 27.68% | 59.95% | +32.27pp |
Roadmap
| Versão | Status | Escopo |
|---|---|---|
| v1.0 | ✅ | Limpeza mecânica: encoding, case, whitespace, placeholders |
| v1.1 | ✅ | Expansão de 115 abreviações sem ambiguidade |
| v1.2 | ✅ | Normalização estrutural: número, complemento, CEP, município, bairro |
| v2.0 | ✅ | Campos trocados, abreviações regionais, validação CEP×UF |
| Containerização | 🔄 | Worker SQS + Docker + LocalStack (em andamento) |
| v2.1 | ⬜ | Fallback IA (Maritaca AI) para ambiguidades não resolvidas |
| v3.0 | ⬜ | Validação canônica (Correios/IBGE) |
| v4.0 | ⬜ | ML supervisionado / NER |
Planejamento da Containerização (em andamento)
O que será implementado para rodar o serviço localmente com Docker e LocalStack, e subir para AWS sem mudança de código.
Arquivos a criar:
docker/
├── Dockerfile # Imagem do worker (python:3.11-slim)
└── localstack/
└── init-aws.sh # Cria filas SQS no LocalStack na inicialização
docker-compose.yml # Orquestra localstack + normalizer-worker
.env.example # Variáveis de ambiente documentadas
src/address_normalizer/
└── worker/
├── __init__.py
├── consumer.py # Loop de polling SQS com boto3
├── processor.py # Processa batch com NormalizerFacade + asyncio.gather
└── publisher.py # Publica resultados na fila de saída + deleta mensagem
Fluxo interno do worker:
consumer.py
└── poll_messages() # boto3 receive_message (batch de até 10)
└── processor.py
└── process_batch() # asyncio.gather para paralelismo
└── NormalizerFacade.normalize()
└── publisher.py
└── publish_result() # send_message para fila de saída
└── delete_message() # confirma processamento
Decisões de design:
- Sem Lambda: worker roda como processo contínuo, sem limite de timeout
- Paralelismo via
asyncio.gatherdentro de cada batch (não threads) - Idempotência garantida pelo pipeline (mensagem reprocessada = mesmo resultado)
- Dead Letter Queue (DLQ) para mensagens que falham após 3 tentativas
- Graceful shutdown: captura SIGTERM e termina o batch atual antes de parar
Referências de Análise
Todas as decisões de implementação são baseadas em análises quantitativas de 76.9M registros:
| Análise | Arquivo | Conteúdo |
|---|---|---|
| A-10 | planning/relatorios_tasks/A10_caracteres_anomalos.json |
Encoding corrompido por campo e UF |
| A-20/A-21 | planning/relatorios_tasks/A20_A21_via_duplicada_pontos.json |
Vias duplicadas, abreviações |
| A-30 | planning/relatorios_tasks/A30_numero_patologias.json |
Complemento embutido, variantes S/N |
| A-50 | planning/relatorios_tasks/A50_bairro_anomalias.json |
Anomalias em bairro |
| A-70 | planning/relatorios_tasks/A70_cep_formatos.json |
CEPs inválidos, genéricos, divergentes |
| A-80 | planning/relatorios_tasks/A80_placeholders_globais.json |
Placeholders por campo |
| A-90 | planning/relatorios_tasks/A90_campos_trocados.json |
Campos trocados |
| A-99 | planning/relatorios_tasks/A99_sintese_decisoes.json |
45 decisões consolidadas com prioridade |
| B-06 | planning/relatorios_tasks/B06_score_qualidade_baseline.json |
Score de qualidade baseline |
Roadmap detalhado: planning/PLANEJAMENTO_VERSOES.md
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file br_address_normalize-0.3.0.dev20260416142724.tar.gz.
File metadata
- Download URL: br_address_normalize-0.3.0.dev20260416142724.tar.gz
- Upload date:
- Size: 107.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
53c95e609b25ddc427c5f25a46eec6c5a88e7c63357e926b66079da4dbca5aab
|
|
| MD5 |
47cf1d89c0e3a1090f4188ab03f9cdef
|
|
| BLAKE2b-256 |
064b21ddc461f4265edd02e048b60b580772ced5931398030a2d946cd18c2857
|
Provenance
The following attestation bundles were made for br_address_normalize-0.3.0.dev20260416142724.tar.gz:
Publisher:
publish.yml on lipiw/br-address-normalize
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
br_address_normalize-0.3.0.dev20260416142724.tar.gz -
Subject digest:
53c95e609b25ddc427c5f25a46eec6c5a88e7c63357e926b66079da4dbca5aab - Sigstore transparency entry: 1317781217
- Sigstore integration time:
-
Permalink:
lipiw/br-address-normalize@c1cfb5a7ed52217ea859f9890896d45473d6d66d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/lipiw
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c1cfb5a7ed52217ea859f9890896d45473d6d66d -
Trigger Event:
push
-
Statement type:
File details
Details for the file br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl.
File metadata
- Download URL: br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl
- Upload date:
- Size: 91.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
23b84cbfe501984235fcf51b6f1fec19cf8ef6732cf2588d04bcf2f56bb4875f
|
|
| MD5 |
375d802b90cd7fbcadda29f4ec76643b
|
|
| BLAKE2b-256 |
969883fe1b03dc16454e934d3c740a15574afc099c1c75cf46c0812734617646
|
Provenance
The following attestation bundles were made for br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl:
Publisher:
publish.yml on lipiw/br-address-normalize
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl -
Subject digest:
23b84cbfe501984235fcf51b6f1fec19cf8ef6732cf2588d04bcf2f56bb4875f - Sigstore transparency entry: 1317781448
- Sigstore integration time:
-
Permalink:
lipiw/br-address-normalize@c1cfb5a7ed52217ea859f9890896d45473d6d66d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/lipiw
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c1cfb5a7ed52217ea859f9890896d45473d6d66d -
Trigger Event:
push
-
Statement type: