Skip to main content

Biblioteca de normalizacao de enderecos brasileiros

Project description

br-address-normalize

Biblioteca Python para normalização de endereços brasileiros com foco em match rate para cruzamento de bases de dados.

Desenvolvido com base em análise profunda de 76.9M registros de 6 estados (AM, AP, GO, MG, MT, RS), identificando e resolvendo anomalias críticas que impedem matching.


Índice


Visão Geral

O objetivo é padronizar endereços de entrada para maximizar a taxa de correspondência com bases de referência. O pipeline é determinístico, idempotente e sem dependências externas no core.

Problemas Identificados

Análise de 76.9M registros revelou anomalias críticas que impedem matching:

Anomalia Volume Impacto
Encoding corrompido 2.8M Impossível fazer match com bases limpas
Campos trocados 558K Dados no campo errado
Abreviações não expandidas 51.6M RUA BRASIL vs R BRASIL não fazem match
Estrutura desorganizada 182K Complemento embutido no número
Placeholders genéricos 3.3M Valores como NI, ND, NINF
CEP divergente da UF 68K Prefixo não corresponde à UF declarada

Princípios de Design

  1. Conservadorismo — Só transforma o que tem certeza. Ambiguidade → flag, não transformação.
  2. Idempotêncianormalize(normalize(x)) == normalize(x)
  3. Sem dependências externas no core — Consultas a Correios/IBGE são escopo de v3.0.
  4. Rastreabilidade — Toda transformação registrada em metadata.transformacoes.

Arquitetura

O serviço roda como um worker containerizado que consome mensagens de uma fila SQS. Localmente, o SQS é simulado com LocalStack. Em produção, aponta para o SQS real da AWS sem mudança de código.

Por que Worker e não Lambda

O tempo de processamento de grandes volumes (ex: 1M+ registros) é incompatível com o timeout máximo de 15 minutos do Lambda. O worker roda indefinidamente, processa em batch e escala horizontalmente via réplicas de container.

Fluxo de Dados

┌─────────────────────────────────────────────────────────────────┐
│  Produtor (qualquer sistema)                                    │
│  Envia mensagens JSON para a fila SQS                           │
└──────────────────────────┬──────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│  SQS (LocalStack local / AWS em produção)                       │
│  Fila entrada:  address-normalization-queue                     │
│  Fila saída:    address-normalization-results                   │
│  DLQ:           address-normalization-dlq                       │
└──────────────────────────┬──────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│  Worker Container (normalizer-worker)                           │
│  ├── queue/base.py — interface QueueBackend (agnóstica de broker)│
│  ├── queue/sqs.py  — implementação SQS/LocalStack               │
│  ├── consumer.py   — polling via QueueBackend (batch até 10)    │
│  ├── processor.py  — asyncio.gather para paralelismo            │
│  └── publisher.py  — publicação e confirmação via QueueBackend  │
└─────────────────────────────────────────────────────────────────┘

Camadas da Aplicação

┌─────────────────────────────────────┐
│  Worker (consumer/processor/pub)    │  ← Transporte via QueueBackend
├─────────────────────────────────────┤
│  QueueBackend (queue/base.py)       │  ← Interface agnóstica de broker
├─────────────────────────────────────┤
│  Fachada (NormalizerFacade)         │  ← Ponto de entrada público
├─────────────────────────────────────┤
│  Pipeline (orquestração)            │  ← Executa steps sequencialmente
├─────────────────────────────────────┤
│  Steps (adaptadores)                │  ← Adaptam módulos para o pipeline
├─────────────────────────────────────┤
│  Módulos (lógica de negócio)        │  ← Lógica pura, testável isoladamente
├─────────────────────────────────────┤
│  Data (JSONs de configuração)       │  ← Dicionários, regras, whitelist
└─────────────────────────────────────┘

Estrutura de Diretórios

br-address-normalize/
├── src/address_normalizer/
│   ├── core/                        # Interface PipelineStep (ABC)
│   ├── data/                        # JSONs de configuração e dicionários
│   ├── facade.py                    # NormalizerFacade — ponto de entrada público
│   ├── modules/
│   │   ├── abbreviation/            # Providers e camadas de abreviação (L1+L2)
│   │   ├── field_swap/              # Detecção e correção de campos trocados
│   │   └── *.py                     # Módulos por campo (bairro, cep, numero...)
│   ├── pipeline/
│   │   ├── orchestrator.py          # Executa steps sequencialmente
│   │   └── steps/                   # 13 steps do pipeline
│   ├── worker/
│   │   ├── queue/
│   │   │   ├── base.py              # QueueBackend — interface ABC (receber/publicar/confirmar)
│   │   │   └── sqs.py               # SQSBackend — implementação boto3/LocalStack
│   │   ├── consumer.py              # Loop de polling (usa QueueBackend)
│   │   ├── processor.py             # Normalização de registros (sem lógica de fila)
│   │   ├── publisher.py             # Publicação e confirmação (usa QueueBackend)
│   │   ├── schemas.py               # Contratos de mensagem (Pydantic)
│   │   └── security.py              # Validação e sanitização de payloads
│   └── schemas/                     # NormalizationResult, EnderecoInput
├── docker/
│   ├── Dockerfile                   # Imagem do worker (python:3.11-slim)
│   └── localstack/
│       └── init-aws.sh              # Cria filas SQS no LocalStack na inicialização
├── docker-compose.yml               # Orquestração local (worker + localstack)
├── .env.example                     # Variáveis de ambiente documentadas
├── pyproject.toml
└── README.md

Setup Local com Docker + LocalStack

Pré-requisitos

  • Docker Desktop (ou Docker Engine + Compose plugin)
  • awscli-local para interagir com o LocalStack via CLI
pip install awscli-local

1. Subir o ambiente local

docker compose up --build

Isso sobe dois serviços:

  • localstack — SQS simulado na porta 4566
  • normalizer-worker — Worker que consome a fila

O LocalStack cria automaticamente as filas na inicialização via docker/localstack/init-aws.sh.

2. Enviar mensagens para a fila

Endereço único (formato raw):

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "R DR JOAO SILVA",
    "numero": "123 APTO 45",
    "complemento": "",
    "bairro": "JD AMERICA",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
  }'

Batch com metadados (formato estruturado):

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "meta": {
      "message_id": "msg-001",
      "idempotency_key": "BATCH-2024-001",
      "sent_at": "2024-01-15T10:30:00Z",
      "priority": "normal"
    },
    "origem": {
      "orgao": "DATAPREV",
      "sistema": "BENEFICIOS-RURAL"
    },
    "modo": "batch",
    "registros": [
      {
        "ref": "NIS-12345678901",
        "endereco": {
          "logradouro": "R DR JOAO SILVA",
          "numero": "123 APTO 45",
          "complemento": "",
          "bairro": "JD AMERICA",
          "municipio": "SAO PAULO",
          "uf": "SP",
          "cep": "01234567"
        }
      }
    ]
  }'

3. Verificar resultados

awslocal sqs receive-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-results \
  --max-number-of-messages 10

4. Monitorar o worker

docker compose logs -f normalizer-worker

Variáveis de Ambiente

Variável Default Descrição
SQS_ENDPOINT_URL http://localstack:4566 Endpoint SQS (vazio = AWS real)
SQS_INPUT_QUEUE_URL URL da fila de entrada (obrigatório)
SQS_OUTPUT_QUEUE_URL URL da fila de saída (obrigatório)
AWS_REGION us-east-1 Região AWS
AWS_ACCESS_KEY_ID test Credencial (LocalStack aceita qualquer valor)
AWS_SECRET_ACCESS_KEY test Credencial (LocalStack aceita qualquer valor)
WORKER_BATCH_SIZE 10 Mensagens por poll (máx 10)
WORKER_POLL_INTERVAL 1 Segundos entre polls quando fila vazia
WORKER_CONCURRENCY 4 Corrotinas paralelas por batch
ENABLE_V2 true Ativa steps v2.0 (campos trocados, CEP×UF)
LOG_LEVEL INFO Nível de log

Deploy em AWS

Pré-requisitos

  • Conta AWS com permissões para SQS, ECS, ECR, CloudWatch
  • AWS CLI configurado com credenciais
  • Docker instalado localmente

1. Criar filas SQS em produção

# Criar DLQ
aws sqs create-queue \
  --queue-name address-normalization-dlq \
  --region us-east-1 \
  --attributes MessageRetentionPeriod=86400

# Obter ARN da DLQ
DLQ_ARN=$(aws sqs get-queue-attributes \
  --queue-url https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-dlq \
  --attribute-names QueueArn \
  --query Attributes.QueueArn --output text \
  --region us-east-1)

# Criar fila de entrada com redrive policy
aws sqs create-queue \
  --queue-name address-normalization-queue \
  --region us-east-1 \
  --attributes \
    VisibilityTimeout=60,\
    MessageRetentionPeriod=86400,\
    RedrivePolicy="{\"deadLetterTargetArn\":\"${DLQ_ARN}\",\"maxReceiveCount\":\"3\"}"

# Criar fila de saída
aws sqs create-queue \
  --queue-name address-normalization-results \
  --region us-east-1 \
  --attributes MessageRetentionPeriod=86400

2. Criar repositório ECR

aws ecr create-repository \
  --repository-name address-normalizer \
  --region us-east-1

3. Build e push da imagem Docker

# Fazer login no ECR
aws ecr get-login-password --region us-east-1 | \
  docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com

# Build da imagem
docker build -f docker/Dockerfile -t address-normalizer:latest .

# Tag para ECR
docker tag address-normalizer:latest \
  123456789.dkr.ecr.us-east-1.amazonaws.com/address-normalizer:latest

# Push
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/address-normalizer:latest

4. Criar cluster ECS

# Criar cluster
aws ecs create-cluster \
  --cluster-name address-normalizer-cluster \
  --region us-east-1

# Criar log group
aws logs create-log-group \
  --log-group-name /ecs/address-normalizer \
  --region us-east-1

5. Criar IAM roles

Execution Role (permissões para ECS):

cat > trust-policy.json << 'EOF'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "ecs-tasks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF

aws iam create-role \
  --role-name ecsTaskExecutionRole \
  --assume-role-policy-document file://trust-policy.json

aws iam attach-role-policy \
  --role-name ecsTaskExecutionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy

# Adicionar permissão para CloudWatch Logs
aws iam put-role-policy \
  --role-name ecsTaskExecutionRole \
  --policy-name CloudWatchLogs \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        "Resource": "arn:aws:logs:us-east-1:123456789:log-group:/ecs/address-normalizer:*"
      }
    ]
  }'

Task Role (permissões para a aplicação):

aws iam create-role \
  --role-name ecsTaskRole \
  --assume-role-policy-document file://trust-policy.json

# Adicionar permissão para SQS
aws iam put-role-policy \
  --role-name ecsTaskRole \
  --policy-name SQSAccess \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:SendMessage",
          "sqs:GetQueueAttributes"
        ],
        "Resource": [
          "arn:aws:sqs:us-east-1:123456789:address-normalization-queue",
          "arn:aws:sqs:us-east-1:123456789:address-normalization-results",
          "arn:aws:sqs:us-east-1:123456789:address-normalization-dlq"
        ]
      }
    ]
  }'

6. Registrar task definition

cat > task-definition.json << 'EOF'
{
  "family": "address-normalizer-worker",
  "networkMode": "awsvpc",
  "requiresCompatibilities": ["FARGATE"],
  "cpu": "256",
  "memory": "512",
  "containerDefinitions": [
    {
      "name": "normalizer-worker",
      "image": "123456789.dkr.ecr.us-east-1.amazonaws.com/address-normalizer:latest",
      "essential": true,
      "environment": [
        {
          "name": "SQS_INPUT_QUEUE_URL",
          "value": "https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-queue"
        },
        {
          "name": "SQS_OUTPUT_QUEUE_URL",
          "value": "https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-results"
        },
        {
          "name": "AWS_REGION",
          "value": "us-east-1"
        },
        {
          "name": "WORKER_BATCH_SIZE",
          "value": "10"
        },
        {
          "name": "WORKER_CONCURRENCY",
          "value": "4"
        },
        {
          "name": "ENABLE_V2",
          "value": "true"
        },
        {
          "name": "LOG_LEVEL",
          "value": "INFO"
        }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/ecs/address-normalizer",
          "awslogs-region": "us-east-1",
          "awslogs-stream-prefix": "ecs"
        }
      }
    }
  ],
  "executionRoleArn": "arn:aws:iam::123456789:role/ecsTaskExecutionRole",
  "taskRoleArn": "arn:aws:iam::123456789:role/ecsTaskRole"
}
EOF

aws ecs register-task-definition \
  --cli-input-json file://task-definition.json \
  --region us-east-1

7. Criar VPC e subnets (se necessário)

# Criar VPC
VPC_ID=$(aws ec2 create-vpc --cidr-block 10.0.0.0/16 --region us-east-1 --query Vpc.VpcId --output text)

# Criar subnet
SUBNET_ID=$(aws ec2 create-subnet \
  --vpc-id $VPC_ID \
  --cidr-block 10.0.1.0/24 \
  --region us-east-1 \
  --query Subnet.SubnetId --output text)

# Criar security group
SG_ID=$(aws ec2 create-security-group \
  --group-name address-normalizer-sg \
  --description "Security group for address normalizer" \
  --vpc-id $VPC_ID \
  --region us-east-1 \
  --query GroupId --output text)

8. Criar serviço ECS

aws ecs create-service \
  --cluster address-normalizer-cluster \
  --service-name address-normalizer-service \
  --task-definition address-normalizer-worker:1 \
  --desired-count 2 \
  --launch-type FARGATE \
  --network-configuration "awsvpcConfiguration={subnets=[subnet-xxxxx],securityGroups=[sg-xxxxx],assignPublicIp=ENABLED}" \
  --region us-east-1

9. Configurar auto-scaling

# Registrar target para auto-scaling
aws application-autoscaling register-scalable-target \
  --service-namespace ecs \
  --resource-id service/address-normalizer-cluster/address-normalizer-service \
  --scalable-dimension ecs:service:DesiredCount \
  --min-capacity 1 \
  --max-capacity 10 \
  --region us-east-1

# Criar política de scaling baseada em fila SQS
aws application-autoscaling put-scaling-policy \
  --policy-name address-normalizer-scaling \
  --service-namespace ecs \
  --resource-id service/address-normalizer-cluster/address-normalizer-service \
  --scalable-dimension ecs:service:DesiredCount \
  --policy-type TargetTrackingScaling \
  --target-tracking-scaling-policy-configuration '{
    "TargetValue": 100.0,
    "PredefinedMetricSpecification": {
      "PredefinedMetricType": "SQSQueueAverageAgeOfMessage"
    },
    "ScaleOutCooldown": 60,
    "ScaleInCooldown": 300
  }' \
  --region us-east-1

10. Monitorar em produção

# Ver logs
aws logs tail /ecs/address-normalizer --follow --region us-east-1

# Ver status do serviço
aws ecs describe-services \
  --cluster address-normalizer-cluster \
  --services address-normalizer-service \
  --region us-east-1

# Ver tarefas em execução
aws ecs list-tasks \
  --cluster address-normalizer-cluster \
  --region us-east-1

Troubleshooting em Produção

Tarefas não iniciando:

aws ecs describe-task-definition \
  --task-definition address-normalizer-worker:1 \
  --region us-east-1

Verificar permissões SQS:

aws sqs get-queue-attributes \
  --queue-url https://sqs.us-east-1.amazonaws.com/123456789/address-normalization-queue \
  --attribute-names All \
  --region us-east-1

Ver erros de execução:

aws logs get-log-events \
  --log-group-name /ecs/address-normalizer \
  --log-stream-name ecs/address-normalizer-worker/xxxxx \
  --region us-east-1

Uso da Biblioteca

Instalação

pip install -e .
# ou com uv
uv pip install -e .

Normalização direta (sem SQS)

import asyncio
from address_normalizer.facade import NormalizerFacade
from address_normalizer.modules.abbreviation.local_provider import LocalAbbreviationProvider

async def main():
    normalizer = NormalizerFacade(
        abbreviation_provider=LocalAbbreviationProvider(),
        enable_v2=True,
    )

    endereco = {
        "logradouro": "R DR JOAO SILVA",
        "numero": "123 APTO 45",
        "complemento": "",
        "bairro": "JD AMERICA",
        "municipio": "SAO PAULO",
        "uf": "SP",
        "cep": "01234567",
    }

    result = await normalizer.normalize(endereco)

    print(result.normalizado["logradouro"])   # "RUA DOUTOR JOAO SILVA"
    print(result.normalizado["numero"])       # "123"
    print(result.normalizado["complemento"]) # "APARTAMENTO 45"
    print(result.normalizado["bairro"])       # "JARDIM AMERICA"
    print(result.metadata.flags)             # ["ABREV_EXPANDIDA", ...]
    print(result.metadata.confianca)         # {"logradouro": 1.0, ...}

asyncio.run(main())

Batch

results = await normalizer.normalize_batch([endereco1, endereco2, endereco3])

Contrato de Mensagens SQS

O worker aceita dois formatos de mensagem: raw (endereço direto, sem envelope) e estruturado (com metadados de rastreabilidade para uso em produção).

Detecção automática de formato

O worker detecta o formato pela presença ou ausência do campo meta:

  • Sem meta → modo raw, processa o objeto diretamente como endereço
  • Com meta → modo estruturado, extrai registros[] do envelope

Em ambos os casos o pipeline completo é executado, incluindo a etapa de recategorização (FieldSwapStep) que detecta e corrige campos trocados antes de normalizar.


Formato Raw — endereço único sem envelope

Útil para integrações simples ou testes. O worker recebe o endereço semi-estruturado diretamente.

Entrada:

{
    "logradouro": "R DR JOAO SILVA",
    "numero": "123 APTO 45",
    "complemento": "",
    "bairro": "JD AMERICA",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
}

O pipeline executa normalmente, incluindo recategorização. Por exemplo, se logradouro contiver apenas dígitos, o FieldSwapStep detecta e move para numero antes de prosseguir.

Saída:

{
    "ref": null,
    "status": "ok",
    "endereco": {
        "original": {
            "logradouro": "R DR JOAO SILVA",
            "numero": "123 APTO 45",
            "complemento": "",
            "bairro": "JD AMERICA",
            "municipio": "SAO PAULO",
            "uf": "SP",
            "cep": "01234567"
        },
        "normalizado": {
            "logradouro": "RUA DOUTOR JOAO SILVA",
            "numero": "123",
            "complemento": "APARTAMENTO 45",
            "bairro": "JARDIM AMERICA",
            "municipio": "SAO PAULO",
            "uf": "SP",
            "cep": "01234567"
        }
    },
    "qualidade": {
        "score_geral": 0.99,
        "grupo_poluicao": "G1",
        "confianca": {
            "logradouro": 1.0,
            "numero": 1.0,
            "complemento": 1.0,
            "bairro": 1.0,
            "municipio": 1.0,
            "uf": 1.0,
            "cep": 0.9
        },
        "flags": ["ABREV_EXPANDIDA"]
    },
    "transformacoes": [
        {
            "step": "abbreviation",
            "campo": "logradouro",
            "tipo": "abreviacao",
            "de": "R",
            "para": "RUA",
            "regra": "TIPO_LOGRADOURO"
        },
        {
            "step": "numero",
            "campo": "numero",
            "tipo": "extracao_complemento",
            "de": "123 APTO 45",
            "para": "123",
            "regra": "COMPLEMENTO_EMBUTIDO"
        }
    ],
    "error": null
}

Formato Estruturado — batch com metadados

Para rastreabilidade completa em produção: idempotência por idempotency_key, identificação de origem, prioridade e opções de pipeline por mensagem.

Entrada:

{
    "meta": {
        "message_id": "uuid-v4",
        "idempotency_key": "DATAPREV-BATCH-2024-001-chunk-003",
        "sent_at": "2024-01-15T10:30:00Z",
        "priority": "normal"
    },
    "origem": {
        "orgao": "DATAPREV",
        "sistema": "BENEFICIOS-RURAL",
        "responsavel": "equipe-dados@dataprev.gov.br",
        "ambiente": "producao"
    },
    "modo": "batch",
    "registros": [
        {
            "ref": "NIS-12345678901",
            "endereco": {
                "logradouro": "R DR JOAO SILVA",
                "numero": "123 APTO 45",
                "complemento": "",
                "bairro": "JD AMERICA",
                "municipio": "SAO PAULO",
                "uf": "SP",
                "cep": "01234567"
            }
        },
        {
            "ref": "NIS-98765432100",
            "endereco": {
                "logradouro": "AV BRASIL",
                "numero": "SN",
                "complemento": "",
                "bairro": "CENTRO",
                "municipio": "MANAUS",
                "uf": "AM",
                "cep": "69000000"
            }
        }
    ],
    "pipeline": {
        "preset": "full",
        "options": {
            "enable_v2": true,
            "confidence_threshold": 0.7
        }
    }
}

Campos do envelope de entrada:

Campo Tipo Obrigatório Descrição
meta.message_id string (uuid) sim ID único da mensagem
meta.idempotency_key string sim Chave de deduplicação — mesma chave retorna o mesmo resultado sem reprocessar
meta.sent_at ISO 8601 sim Timestamp de envio
meta.priority normal | high não Prioridade de processamento
origem.orgao string sim Órgão remetente
origem.sistema string sim Sistema de origem
origem.responsavel string não Email ou identificador do responsável
origem.ambiente string não producao, homologacao, desenvolvimento
modo batch | single sim Modo de processamento
registros[].ref string sim Identificador do registro no sistema de origem
registros[].endereco object sim Endereço semi-estruturado (7 campos abaixo)
pipeline.preset full | v1 | v2 não Preset do pipeline (default: full)
pipeline.options.enable_v2 bool não Ativa steps v2.0 — campos trocados, CEP×UF (default: true)
pipeline.options.confidence_threshold float 0-1 não Score mínimo para status ok (default: 0.7)

Campos do endereço (registros[].endereco):

Campo Tipo Descrição
logradouro string Rua, avenida, travessa, etc.
numero string Número do imóvel (pode conter complemento embutido)
complemento string Apartamento, sala, bloco, etc.
bairro string Bairro
municipio string Município
uf string Sigla do estado (2 letras)
cep string CEP (8 dígitos, com ou sem hífen)

Saída:

{
    "meta": {
        "message_id": "uuid-v4-novo",
        "idempotency_key": "DATAPREV-BATCH-2024-001-chunk-003",
        "sent_at": "2024-01-15T10:30:00Z",
        "processed_at": "2024-01-15T10:30:00.412Z",
        "processing_ms": 312,
        "worker_id": "normalizer-worker-2",
        "pipeline_version": "2.0.0"
    },
    "origem": {
        "orgao": "DATAPREV",
        "sistema": "BENEFICIOS-RURAL"
    },
    "summary": {
        "total": 2,
        "ok": 1,
        "erro": 0,
        "alertas": 1,
        "status": "partial"
    },
    "registros": [
        {
            "ref": "NIS-12345678901",
            "status": "ok",
            "endereco": {
                "original": {
                    "logradouro": "R DR JOAO SILVA",
                    "numero": "123 APTO 45",
                    "complemento": "",
                    "bairro": "JD AMERICA",
                    "municipio": "SAO PAULO",
                    "uf": "SP",
                    "cep": "01234567"
                },
                "normalizado": {
                    "logradouro": "RUA DOUTOR JOAO SILVA",
                    "numero": "123",
                    "complemento": "APARTAMENTO 45",
                    "bairro": "JARDIM AMERICA",
                    "municipio": "SAO PAULO",
                    "uf": "SP",
                    "cep": "01234567"
                }
            },
            "qualidade": {
                "score_geral": 0.99,
                "grupo_poluicao": "G1",
                "confianca": {
                    "logradouro": 1.0,
                    "numero": 1.0,
                    "complemento": 1.0,
                    "bairro": 1.0,
                    "municipio": 1.0,
                    "uf": 1.0,
                    "cep": 0.9
                },
                "flags": ["ABREV_EXPANDIDA"]
            },
            "transformacoes": [
                {
                    "step": "abbreviation",
                    "campo": "logradouro",
                    "tipo": "abreviacao",
                    "de": "R",
                    "para": "RUA",
                    "regra": "TIPO_LOGRADOURO"
                },
                {
                    "step": "abbreviation",
                    "campo": "logradouro",
                    "tipo": "abreviacao",
                    "de": "DR",
                    "para": "DOUTOR",
                    "regra": "TITULO_PROPRIO"
                },
                {
                    "step": "numero",
                    "campo": "numero",
                    "tipo": "extracao_complemento",
                    "de": "123 APTO 45",
                    "para": "123",
                    "regra": "COMPLEMENTO_EMBUTIDO"
                }
            ],
            "error": null
        },
        {
            "ref": "NIS-98765432100",
            "status": "alerta",
            "endereco": {
                "original": {
                    "logradouro": "AV BRASIL",
                    "numero": "SN",
                    "complemento": "",
                    "bairro": "CENTRO",
                    "municipio": "MANAUS",
                    "uf": "AM",
                    "cep": "69000000"
                },
                "normalizado": {
                    "logradouro": "AVENIDA BRASIL",
                    "numero": "SN",
                    "complemento": "",
                    "bairro": "CENTRO",
                    "municipio": "MANAUS",
                    "uf": "AM",
                    "cep": "69000000"
                }
            },
            "qualidade": {
                "score_geral": 0.71,
                "grupo_poluicao": "G2",
                "confianca": {
                    "logradouro": 1.0,
                    "numero": 0.5,
                    "complemento": 1.0,
                    "bairro": 0.8,
                    "municipio": 1.0,
                    "uf": 1.0,
                    "cep": 0.3
                },
                "flags": ["ABREV_EXPANDIDA", "VARIANTE_SN", "CEP_GENERICO"]
            },
            "transformacoes": [
                {
                    "step": "abbreviation",
                    "campo": "logradouro",
                    "tipo": "abreviacao",
                    "de": "AV",
                    "para": "AVENIDA",
                    "regra": "TIPO_LOGRADOURO"
                },
                {
                    "step": "numero",
                    "campo": "numero",
                    "tipo": "variante_sn",
                    "de": "SN",
                    "para": "SN",
                    "regra": "VARIANTE_SN_CANONICA"
                }
            ],
            "error": null
        }
    ]
}

Campos da saída:

Campo Descrição
meta.processed_at Timestamp de conclusão do processamento
meta.processing_ms Tempo total em milissegundos
meta.worker_id Instância do worker que processou
meta.pipeline_version Versão do pipeline utilizada
summary.status ok — todos ok | partial — algum com alerta/erro | error — todos falharam
registros[].status ok | alerta | erro
registros[].qualidade.score_geral Média ponderada das confianças por campo (0-1)
registros[].qualidade.grupo_poluicao G1 limpo | G2 médio | G3 poluído
registros[].qualidade.flags Flags de rastreabilidade geradas pelo pipeline
registros[].transformacoes Lista detalhada de cada transformação aplicada
registros[].error Mensagem de erro se status = erro, caso contrário null

Regras de status por registro:

Status Condição
ok score_geral ≥ confidence_threshold e sem flags de alerta
alerta score_geral ≥ confidence_threshold mas com CEP_GENERICO, VARIANTE_SN ou CAMPO_TROCADO_AMBIGUO
erro Exceção no processamento ou score_geral < confidence_threshold

Pipeline de Normalização

O pipeline executa 13 steps sequenciais. A ordem é crítica — não altere sem validação.

Entrada (dict raw)
      │
      ▼
┌─────────────────────────────────────────────────────────────────┐
│  v1.0 — Limpeza Mecânica                                        │
│  [1] UppercaseStep → [2] RemoveAccentsStep → [3] EncodingStep   │
├─────────────────────────────────────────────────────────────────┤
│  v1.1 — Expansão de Abreviações                                 │
│  [4] AbbreviationStep → [5] PreprocessingStep                   │
├─────────────────────────────────────────────────────────────────┤
│  v1.2 — Normalização Estrutural                                 │
│  [6] NumeroStep → [7] ComplementoStep → [8] BairroStep          │
│  [9] MunicipioStep → [10] CepStep                               │
├─────────────────────────────────────────────────────────────────┤
│  v2.0 — Resolução Contextual (enable_v2=True)                   │
│  [11] FieldSwapStep → [12] RegionalAbbreviationStep             │
│  [13] CepUfValidationStep                                       │
└─────────────────────────────────────────────────────────────────┘
      │
      ▼
NormalizationResult

Dependências de Ordem

  • Uppercase deve vir primeiro (para identificar tokens)
  • RemoveAccents antes de Encoding (encoding opera sobre texto já em ASCII)
  • Abbreviation antes de Preprocessing (expandir antes de deduplicar)
  • Steps v1.2 após Preprocessing (dados já limpos)
  • Steps v2.0 por último (contexto completo disponível)

Flags de Rastreabilidade

Flag Versão Significado
ENCODING_CORRIGIDO v1.0 Encoding corrompido corrigido
PLACEHOLDER_REMOVIDO v1.0 Placeholder removido
ABREV_EXPANDIDA v1.1 Abreviação expandida
DUPLICADO_REMOVIDO v1.1 Tipo de via duplicado removido
VARIANTE_SN v1.2 Variante de S/N normalizada
CEP_GENERICO v1.2 CEP genérico (00000000, etc.)
CAMPO_TROCADO_DETECTADO v2.0 Campo trocado detectado
CAMPO_TROCADO_CORRIGIDO v2.0 Campo trocado corrigido
CAMPO_TROCADO_AMBIGUO v2.0 Detectado mas não corrigido (para v2.1)
ABREV_REGIONAL v2.0 Abreviação expandida por regra regional
CEP_DIVERGENTE_UF v2.0 CEP diverge da UF declarada

Desenvolvimento Local

Setup sem Docker

# Clone
git clone <repo-url>
cd br-address-normalize

# Ambiente virtual
python -m venv .venv
source .venv/bin/activate  # Linux/macOS
.venv\Scripts\activate     # Windows

# Instalar
pip install -e ".[dev]"

Testes Automatizados

# Todos os testes
pytest tests/ -v

# Com cobertura
pytest tests/ --cov=src/address_normalizer --cov-report=html

# Teste específico
pytest tests/test_v2_0.py -v

Testes Manuais com Docker + LocalStack

Este guia permite testar o pipeline de normalização localmente com Docker, simulando o ambiente de produção com SQS.

Pré-requisitos

# Instalar awscli-local para interagir com LocalStack
pip install awscli-local

# Verificar Docker
docker --version
docker compose --version

1. Iniciar o Ambiente Local

# Subir LocalStack + Worker
# Docker moderno (v20.10+):
docker compose up --build

# Docker antigo / WSL2 sem plugin:
docker-compose up --build

# Em outro terminal, verificar se os serviços estão rodando
docker compose ps
# ou
docker-compose ps

Você verá:

  • localstack — SQS simulado na porta 4566
  • normalizer-worker — Worker consumindo a fila

2. Enviar Testes Manuais

Teste 1: Endereço Simples (Formato Raw)

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "R DR JOAO SILVA",
    "numero": "123 APTO 45",
    "complemento": "",
    "bairro": "JD AMERICA",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
  }'

Teste 2: Endereço com Anomalias (Encoding + Abreviações)

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "AV BRASIL",
    "numero": "SN",
    "complemento": "",
    "bairro": "CENTRO",
    "municipio": "MANAUS",
    "uf": "AM",
    "cep": "69000000"
  }'

Teste 3: Endereço com Complemento Embutido

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "RUA BRASIL",
    "numero": "456 SALA 10",
    "complemento": "",
    "bairro": "CENTRO",
    "municipio": "RIO DE JANEIRO",
    "uf": "RJ",
    "cep": "20000000"
  }'

Teste 4: Batch com Metadados (Formato Estruturado)

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "meta": {
      "message_id": "msg-001",
      "idempotency_key": "BATCH-2024-001",
      "sent_at": "2024-01-15T10:30:00Z",
      "priority": "normal"
    },
    "origem": {
      "orgao": "DATAPREV",
      "sistema": "BENEFICIOS-RURAL"
    },
    "modo": "batch",
    "registros": [
      {
        "ref": "NIS-12345678901",
        "endereco": {
          "logradouro": "R DR JOAO SILVA",
          "numero": "123 APTO 45",
          "complemento": "",
          "bairro": "JD AMERICA",
          "municipio": "SAO PAULO",
          "uf": "SP",
          "cep": "01234567"
        }
      },
      {
        "ref": "NIS-98765432100",
        "endereco": {
          "logradouro": "AV BRASIL",
          "numero": "SN",
          "complemento": "",
          "bairro": "CENTRO",
          "municipio": "MANAUS",
          "uf": "AM",
          "cep": "69000000"
        }
      }
    ]
  }'

Teste 5: Campos Trocados (v2.0)

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "123",
    "numero": "RUA BRASIL",
    "complemento": "",
    "bairro": "CENTRO",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
  }'

Teste 6: CEP Divergente da UF (v2.0)

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "RUA BRASIL",
    "numero": "100",
    "complemento": "",
    "bairro": "CENTRO",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "69000000"
  }'

Teste 7: Placeholder Genérico

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "RUA BRASIL",
    "numero": "NI",
    "complemento": "",
    "bairro": "ND",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "00000000"
  }'

3. Verificar Resultados

# Receber mensagens da fila de resultados
awslocal sqs receive-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-results \
  --max-number-of-messages 10

Você verá a resposta com:

  • endereco.normalizado — Endereço após normalização
  • qualidade.score_geral — Score de confiança (0-1)
  • qualidade.flags — Flags de transformação aplicadas
  • transformacoes — Detalhes de cada transformação

4. Monitorar o Worker

Em outro terminal:

# Ver logs em tempo real
docker compose logs -f normalizer-worker

# Ver logs de um serviço específico
docker compose logs -f localstack

5. Testar Idempotência

Envie a mesma mensagem duas vezes:

# Primeira vez
awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "R DR JOAO SILVA",
    "numero": "123 APTO 45",
    "complemento": "",
    "bairro": "JD AMERICA",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
  }'

# Aguarde processamento (ver logs)
# Depois envie novamente

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "R DR JOAO SILVA",
    "numero": "123 APTO 45",
    "complemento": "",
    "bairro": "JD AMERICA",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
  }'

# Receba ambos os resultados
awslocal sqs receive-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-results \
  --max-number-of-messages 10

Esperado: Ambos os resultados devem ser idênticos (mesmo normalizado, mesmas transformacoes).

6. Testar Batch vs Individual

Envie um batch com 2 registros:

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "meta": {
      "message_id": "batch-test",
      "idempotency_key": "BATCH-EQUIV-001",
      "sent_at": "2024-01-15T10:30:00Z"
    },
    "modo": "batch",
    "registros": [
      {
        "ref": "REF-001",
        "endereco": {
          "logradouro": "R DR JOAO SILVA",
          "numero": "123 APTO 45",
          "complemento": "",
          "bairro": "JD AMERICA",
          "municipio": "SAO PAULO",
          "uf": "SP",
          "cep": "01234567"
        }
      },
      {
        "ref": "REF-002",
        "endereco": {
          "logradouro": "AV BRASIL",
          "numero": "SN",
          "complemento": "",
          "bairro": "CENTRO",
          "municipio": "MANAUS",
          "uf": "AM",
          "cep": "69000000"
        }
      }
    ]
  }'

Depois envie os mesmos endereços individualmente:

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "R DR JOAO SILVA",
    "numero": "123 APTO 45",
    "complemento": "",
    "bairro": "JD AMERICA",
    "municipio": "SAO PAULO",
    "uf": "SP",
    "cep": "01234567"
  }'

awslocal sqs send-message \
  --queue-url http://localhost:4566/000000000000/address-normalization-queue \
  --message-body '{
    "logradouro": "AV BRASIL",
    "numero": "SN",
    "complemento": "",
    "bairro": "CENTRO",
    "municipio": "MANAUS",
    "uf": "AM",
    "cep": "69000000"
  }'

Esperado: Os resultados normalizados devem ser idênticos entre batch e individual.

7. Limpar Ambiente

# Parar containers
docker compose down

# Remover volumes (limpar dados)
docker compose down -v

Checklist de Testes Manuais

  • Teste 1: Endereço simples normaliza corretamente
  • Teste 2: Abreviações são expandidas (AV → AVENIDA)
  • Teste 3: Complemento embutido é extraído (123 APTO 45 → 123 + APARTAMENTO 45)
  • Teste 4: Batch com metadados processa todos os registros
  • Teste 5: Campos trocados são detectados e corrigidos
  • Teste 6: CEP divergente da UF gera flag de alerta
  • Teste 7: Placeholders genéricos são removidos
  • Idempotência: Mesma entrada produz mesma saída
  • Batch Equivalência: Batch e individual produzem mesmos resultados
  • Logs: Worker registra todas as transformações
  • Fila de Saída: Resultados aparecem na fila de resultados
  • DLQ: Mensagens inválidas vão para Dead Letter Queue

Resultados

Benchmark Consolidado (24.37M registros, 6 UFs)

Cenário Taxa Ganho
C1 — Raw vs Raw (baseline) 64.53%
C4 — Norm vs Norm 68.11% +3.58pp
Registros resgatados 872.625
Regressões 0

Por Grupo de Poluição

Grupo Volume Baseline Normalizado Ganho
G1 (limpo) 18.17M 65.57% 66.55% +0.98pp
G2 (médio) 5.15M 68.93% 75.86% +6.94pp
G3 (poluído) 1.04M 27.68% 59.95% +32.27pp

Roadmap

Versão Status Escopo
v1.0 Limpeza mecânica: encoding, case, whitespace, placeholders
v1.1 Expansão de 115 abreviações sem ambiguidade
v1.2 Normalização estrutural: número, complemento, CEP, município, bairro
v2.0 Campos trocados, abreviações regionais, validação CEP×UF
Containerização 🔄 Worker SQS + Docker + LocalStack (em andamento)
v2.1 Fallback IA (Maritaca AI) para ambiguidades não resolvidas
v3.0 Validação canônica (Correios/IBGE)
v4.0 ML supervisionado / NER

Planejamento da Containerização (em andamento)

O que será implementado para rodar o serviço localmente com Docker e LocalStack, e subir para AWS sem mudança de código.

Arquivos a criar:

docker/
├── Dockerfile                    # Imagem do worker (python:3.11-slim)
└── localstack/
    └── init-aws.sh               # Cria filas SQS no LocalStack na inicialização

docker-compose.yml                # Orquestra localstack + normalizer-worker
.env.example                      # Variáveis de ambiente documentadas

src/address_normalizer/
└── worker/
    ├── __init__.py
    ├── consumer.py               # Loop de polling SQS com boto3
    ├── processor.py              # Processa batch com NormalizerFacade + asyncio.gather
    └── publisher.py              # Publica resultados na fila de saída + deleta mensagem

Fluxo interno do worker:

consumer.py
  └── poll_messages()             # boto3 receive_message (batch de até 10)
        └── processor.py
              └── process_batch() # asyncio.gather para paralelismo
                    └── NormalizerFacade.normalize()
                          └── publisher.py
                                └── publish_result()  # send_message para fila de saída
                                      └── delete_message()  # confirma processamento

Decisões de design:

  • Sem Lambda: worker roda como processo contínuo, sem limite de timeout
  • Paralelismo via asyncio.gather dentro de cada batch (não threads)
  • Idempotência garantida pelo pipeline (mensagem reprocessada = mesmo resultado)
  • Dead Letter Queue (DLQ) para mensagens que falham após 3 tentativas
  • Graceful shutdown: captura SIGTERM e termina o batch atual antes de parar

Referências de Análise

Todas as decisões de implementação são baseadas em análises quantitativas de 76.9M registros:

Análise Arquivo Conteúdo
A-10 planning/relatorios_tasks/A10_caracteres_anomalos.json Encoding corrompido por campo e UF
A-20/A-21 planning/relatorios_tasks/A20_A21_via_duplicada_pontos.json Vias duplicadas, abreviações
A-30 planning/relatorios_tasks/A30_numero_patologias.json Complemento embutido, variantes S/N
A-50 planning/relatorios_tasks/A50_bairro_anomalias.json Anomalias em bairro
A-70 planning/relatorios_tasks/A70_cep_formatos.json CEPs inválidos, genéricos, divergentes
A-80 planning/relatorios_tasks/A80_placeholders_globais.json Placeholders por campo
A-90 planning/relatorios_tasks/A90_campos_trocados.json Campos trocados
A-99 planning/relatorios_tasks/A99_sintese_decisoes.json 45 decisões consolidadas com prioridade
B-06 planning/relatorios_tasks/B06_score_qualidade_baseline.json Score de qualidade baseline

Roadmap detalhado: planning/PLANEJAMENTO_VERSOES.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

br_address_normalize-0.3.0.dev20260416142724.tar.gz (107.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file br_address_normalize-0.3.0.dev20260416142724.tar.gz.

File metadata

File hashes

Hashes for br_address_normalize-0.3.0.dev20260416142724.tar.gz
Algorithm Hash digest
SHA256 53c95e609b25ddc427c5f25a46eec6c5a88e7c63357e926b66079da4dbca5aab
MD5 47cf1d89c0e3a1090f4188ab03f9cdef
BLAKE2b-256 064b21ddc461f4265edd02e048b60b580772ced5931398030a2d946cd18c2857

See more details on using hashes here.

Provenance

The following attestation bundles were made for br_address_normalize-0.3.0.dev20260416142724.tar.gz:

Publisher: publish.yml on lipiw/br-address-normalize

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl.

File metadata

File hashes

Hashes for br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl
Algorithm Hash digest
SHA256 23b84cbfe501984235fcf51b6f1fec19cf8ef6732cf2588d04bcf2f56bb4875f
MD5 375d802b90cd7fbcadda29f4ec76643b
BLAKE2b-256 969883fe1b03dc16454e934d3c740a15574afc099c1c75cf46c0812734617646

See more details on using hashes here.

Provenance

The following attestation bundles were made for br_address_normalize-0.3.0.dev20260416142724-py3-none-any.whl:

Publisher: publish.yml on lipiw/br-address-normalize

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page