Skip to main content

Uma coleção de utilitários para acelerar o desenvolvimento backend em Python com padrões reutilizáveis e produtivos

Project description

🐍 SnakeStack

Python built with uv Pipeline PyPI version License: MIT codecov gitleaks badge


📦 Visão Geral

O snakestack é um pacote modular que oferece uma base robusta para construção de serviços backend com foco em:

  • Observabilidade com OpenTelemetry
  • Cache assíncrono com Redis
  • Integração com Google Pub/Sub
  • Acesso assíncrono ao MongoDB
  • Client HTTPX com suporte a tracing
  • Modelos base com pydantic e pydantic-settings
  • Stack de logging estruturado e configurável

⚙️ Instalação

Instalação base:

Via PIP

pip install snakestack

Via Poetry

poetry add snakestack

Extras disponíveis:

Extra Comando de instalação
Redis pip install snakestack[redis]
MongoDB pip install snakestack[mongodb]
Pub/Sub pip install snakestack[pubsub]
Telemetry pip install snakestack[telemetry]
Todos pip install snakestack[all]

🧪 Testes

O projeto possui cobertura completa de testes unitários e está organizado por domínios.

Executar todos os testes:

make test

Rodar testes com cobertura:

make test-ci

Rodar testes de um domínio específico:

pytest -m cache
pytest -m pubsub
pytest -m telemetry

🛠️ Desenvolvimento Local

1. Clone o repositório:

git clone https://github.com/BrunoSegato/snakestack.git
cd snakestack

2. Instale as dependências:

make install

3. Ative o ambiente virtual:

source .venv/bin/activate

🧾 Comandos Úteis

Comando Descrição
make install Instala dependências com Poetry
make check Executa linters e mypy
make lint Roda ruff com auto-fix
make test Executa os testes unitários
make cov Gera relatório de cobertura
make changelog Gera changelog com Towncrier
make bump Realiza bump de versão com Commitizen
make release Gera changelog, bump e cria release/tag

📚 Módulos disponíveis

  • snakestack.logging: Configuração de log estruturado com filtros e formatadores.

  • snakestack.cache: Cliente Redis assíncrono com decorator de cache.

  • snakestack.pubsub: Publisher e subscriber com suporte a presets, tracing e decorators.

  • snakestack.telemetry: Integração com OpenTelemetry (métricas, traces e logging).

  • snakestack.mongodb: Client assíncrono para MongoDB com tracing integrado.

  • snakestack.healthz: Health check para status da aplicação e dependências.

  • snakestack.httpx: Client HTTPX instrumentado.

  • snakestack.model: Base de modelos pydantic para uso interno.

  • snakestack.config: Gerenciamento de settings com pydantic-settings.

  • snakestack.task: Classe para criar e processar fila assincrona


🧪 Exemplos de Uso

🚧 Em construção — em breve serão adicionados exemplos práticos de uso para cada módulo.

Módulo Cache

Código

import asyncio
from datetime import datetime
from decimal import Decimal

from snakestack.cache.async_client import create_async_redis_client
from snakestack.cache.async_service import AsyncRedisService


async def sample():
    client = create_async_redis_client()
    redis = AsyncRedisService(client, default_ttl=3600)

    values = [
        "foo",
        1,
        (1, 2, 3),
        {1, 2, 3},
        datetime.now(),
        Decimal("10.50")
    ]

    for value in values:
        await redis.set("foo", value)
        print("Resultado", await redis.get("foo"))

if __name__ == "__main__":
    asyncio.run(sample())

Saída

Resultado foo
Resultado 1
Resultado [1, 2, 3]
Resultado ['1', '2', '3']
Resultado 2025-08-07T18:37:02.149923
Resultado 10.50

Módulo Healthz

1. Sample

Código
import asyncio
import time

import orjson

from snakestack.healthz.healthz import SnakeHealthCheck

async def check_async():
    await asyncio.sleep(1)
    return True

def check_sync():
    time.sleep(1)
    return True

def main():
    health_check = SnakeHealthCheck(
        service_name="Teste",
        service_version="0.0.1",
        service_environment="test"
    )
    health_check.add_check(name="check_async", func=check_async)
    health_check.add_check(name="check_sync", func=check_sync)

    result, check = asyncio.run(health_check.is_healthy())
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    main()
Saída
{
  "service_name": "Teste",
  "version": "0.0.1",
  "host": "hostname",
  "uptime": "9h 48m 24s",
  "timestamp": "2025-08-07T21:41:38.071402+00:00",
  "environment": "test",
  "status": true,
  "latency_ms": 2001.57,
  "details": {
    "check_async": {
      "ok": true,
      "latency_ms": 1001.3
    },
    "check_sync": {
      "ok": true,
      "latency_ms": 1000.27
    }
  }
}

2. With deps

Código
import asyncio

import orjson

from snakestack.cache.async_client import create_async_redis_client
from snakestack.cache.async_service import AsyncRedisService
from snakestack.config import settings
from snakestack.healthz.healthz import SnakeHealthCheck
from snakestack.logging.config import setup_logging
from snakestack.mongodb.async_client import close_mongodb_connection, ping, open_mongodb_connection


async def check_cache():
    client = create_async_redis_client()
    redis = AsyncRedisService(client, default_ttl=3600)
    return await redis.ping()

async def check_mongo():
    try:
        await open_mongodb_connection(settings=settings)
        return await ping()
    finally:
        await close_mongodb_connection()


def main():
    health_check = SnakeHealthCheck(
        service_name="Teste",
        service_version="0.0.1",
        service_environment="test"
    )
    health_check.add_check(name="check_cache", func=check_cache)
    health_check.add_check(name="check_mongo", func=check_mongo)

    result, check = asyncio.run(health_check.is_healthy())
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    setup_logging()
    main()
Saída
{
  "service_name": "Teste",
  "version": "0.0.1",
  "host": "bruno-Aspire",
  "uptime": "4h 1m 12s",
  "timestamp": "2025-08-29T15:42:16.262049+00:00",
  "environment": "test",
  "status": true,
  "latency_ms": 11.78,
  "details": {
    "check_cache": {
      "ok": true,
      "latency_ms": 2.69
    },
    "check_mongo": {
      "ok": true,
      "latency_ms": 9.09
    }
  }
}

Módulo Httpx

1. Exemplo simples de uso

Código
import asyncio
import orjson

from snakestack.httpx.client import SnakeHttpClient


class MyAPI(SnakeHttpClient):

    async def get_user(
        self,
    ):
        response = await self.handle(
            method="GET",
            url="/get"
        )
        response.raise_for_status()
        return response.json()


async def main():
    api = MyAPI(base_url="https://httpbin.org")
    result = await api.get_user()
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    asyncio.run(main())
Saída
{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "User-Agent": "python-httpx/0.28.1",
    "X-Amzn-Trace-Id": "Root=1-68a65355-68dcca804d655ab922707790"
  },
  "origin": "201.95.220.207",
  "url": "https://httpbin.org/get"
}

2. Exemplo utilizando open telemetry

Código
import asyncio
import logging
import orjson

from snakestack.httpx.client import SnakeHttpClient
from snakestack.logging.config import setup_logging
from snakestack.telemetry.instrumentor import instrument_app
from snakestack.telemetry.helpers import instrumented_span


instrument_app(service_name="foobar", enable_httpx=True, test_mode=True)

setup_logging()
logger = logging.getLogger(__name__)


class MyAPI(SnakeHttpClient):

    @instrumented_span(span_name="principal")
    async def get_user(
        self,
    ):
        response = await self.handle(
            method="GET",
            url="/get"
        )
        response.raise_for_status()
        result = response.json()
        logger.info(result)
        return response.json()


async def main():
    api = MyAPI(base_url="https://httpbin.org")
    result = await api.get_user()
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())

if __name__ == "__main__":
    asyncio.run(main())
Saída
JSON Response
{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "Traceparent": "00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01",
    "User-Agent": "python-httpx/0.28.1",
    "X-Amzn-Trace-Id": "Root=1-68a65450-5e17d033064521816e1e7b58"
  },
  "origin": "201.95.220.207",
  "url": "https://httpbin.org/get"
}
LOG Response
2025-08-20 20:03:44,233 [INFO] httpx: HTTP Request: GET https://httpbin.org/get "HTTP/1.1 200 OK"
2025-08-20 20:03:44,236 [INFO] __main__: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'Traceparent': '00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01', 'User-Agent': 'python-httpx/0.28.1', 'X-Amzn-Trace-Id': 'Root=1-68a65450-5e17d033064521816e1e7b58'}, 'origin': '201.95.220.207', 'url': 'https://httpbin.org/get'}
OTEL Response
{
    "name": "GET",
    "context": {
        "trace_id": "0xee28e45169aa5404fa751c0c51567847",
        "span_id": "0xa91c6aa2988520b2",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": "0x729c5a2340e0f49f",
    "start_time": "2025-08-20T23:03:43.283362Z",
    "end_time": "2025-08-20T23:03:44.230366Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "http.method": "GET",
        "http.url": "https://httpbin.org/get",
        "http.status_code": 200
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "telemetry.sdk.language": "python",
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.version": "1.36.0",
            "service.name": "foobar",
            "service.instance.id": "Aspire"
        },
        "schema_url": ""
    }
}
{
    "name": "principal",
    "context": {
        "trace_id": "0xee28e45169aa5404fa751c0c51567847",
        "span_id": "0x729c5a2340e0f49f",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2025-08-20T23:03:43.187189Z",
    "end_time": "2025-08-20T23:03:44.236666Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {},
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "telemetry.sdk.language": "python",
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.version": "1.36.0",
            "service.name": "foobar",
            "service.instance.id": "Aspire"
        },
        "schema_url": ""
    }
}

Módulo Logging

1. Exemplo com formatter default

Código
import logging

from snakestack.config import settings
from snakestack.logging.config import setup_logging

def main():
    logger.info("Logging simples funcionando.")

if __name__ == "__main__":
    setup_logging(settings)
    logger = logging.getLogger(__name__)
    main()
Saída
2026-04-02 19:12:31,737 [INFO] __main__: Logging simples funcionando.

2. Exemplo com formatter with_request_id

Variável de ambiente
SNAKESTACK_LOGGING_FORMATTER=with_request_id
Código
import logging

from snakestack.config import settings
from snakestack.logging.config import setup_logging
from snakestack.logging.contexts import set_request_id


def main():
    set_request_id("12345678")
    logger.info("Logging with_request_id funcionando.")


if __name__ == "__main__":
    setup_logging(settings)
    logger = logging.getLogger(__name__)
    main()
Saída
2026-04-02 19:12:46,559 [INFO] [req_id=12345678] __main__: Logging with_request_id funcionando.

3. Exemplo com formatter json

Variável de ambiente
SNAKESTACK_LOGGING_FORMATTER=json
Código
import logging

from snakestack.config import settings
from snakestack.logging.config import setup_logging
from snakestack.logging.contexts import set_request_id


def main():
    set_request_id("12345678")
    logger.info("Logging custom_json funcionando.")


if __name__ == "__main__":
    setup_logging(settings)
    logger = logging.getLogger(__name__)
    main()
Saída
{"time":"2026-04-02T22:13:24.064384+00:00","level":"INFO","pid":21961,"name":"__main__:10","message":"Logging custom_json funcionando.","request":{"id":"12345678"}}

4. Exemplo com filter excluded_name

Variável de ambiente
SNAKESTACK_LOGGING_FILTERS=excluded_name,request_id
SNAKESTACK_LOGGING_FORMATTER=with_request_id
SNAKESTACK_LOGGING_EXCLUDED_NAME=ignore.me
Código
import logging

from snakestack.config import settings
from snakestack.logging.config import setup_logging
from snakestack.logging.contexts import set_request_id


def main():
    set_request_id("12345678")
    logger.info("Logging com filtro excluded_name funcionando.")
    logger_exclude.warning("Logging será descartado pelo filtro.")


if __name__ == "__main__":
    setup_logging(settings)
    logger = logging.getLogger(__name__)
    logger_exclude = logging.getLogger("ignore.me")
    main()
Saída
2026-04-02 19:14:32,736 [INFO] [req_id=12345678] __main__: Logging com filtro excluded_name funcionando.

5. Exemplo com filter filter_paths

Variável de ambiente
SNAKESTACK_LOGGING_FILTER_PATHS=proibido
SNAKESTACK_LOGGING_FILTERS=path_filter,request_id
SNAKESTACK_LOGGING_FORMATTER=with_request_id
Código
import logging

from snakestack.config import settings
from snakestack.logging.config import setup_logging
from snakestack.logging.contexts import set_request_id


def main():
    set_request_id("12345678")
    logger.info("Logging com filtro por texto funcionando.")
    logger.warning("Logging com filtro proibido por texto não mostrar.", extra={"path_url": "proibido"})
    logger.info("Logging com filtro proibido texto funcionando.", extra={"path_url": "foooo"})


if __name__ == "__main__":
    setup_logging(settings)
    logger = logging.getLogger(__name__)
    main()
Saída
2026-04-03 12:45:54,812 [INFO] [req_id=12345678] __main__: Logging com filtro por texto funcionando.
2026-04-03 12:45:54,812 [INFO] [req_id=12345678] __main__: Logging com filtro proibido texto funcionando.

Módulo Task

1. Exemplo sem sentinela

Código
import asyncio
import logging

from snakestack.logging.config import setup_logging
from snakestack.task.process import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(queue_max_size=10)
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(5):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        logger.info("Processo finalizado.")
Saída
2025-08-21 17:29:01,653 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:29:01,653 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:29:01,653 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:29:01,653 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,753 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:29:01,754 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:29:01,754 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:29:01,956 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] waiting items...

2. Exemplo com sentinela

Código
import asyncio
import logging
import time

from snakestack.logging.config import setup_logging
from snakestack.task.process import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(queue_max_size=10, sentinel=None)
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(10):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    for i in range(num_workers):
        logger.info(f"Publishing sentinels worker [{i}]")
        await process.enqueue(message=None)

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    start = time.perf_counter()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        duration = time.perf_counter() - start
        logger.info(f"Processo finalizado. took {duration:.4f}s")
Saída
2025-08-21 17:30:26,946 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:30:26,946 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:30:26,947 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:30:26,947 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,047 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:30:27,048 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:30:27,048 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:30:27,351 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,353 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:30:27,453 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] process item sample 5
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] process item sample 6
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] process item sample 7
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:30:27,656 [DEBUG] snakestack.task.process: Worker [0] process item sample 8
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] process item sample 9
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [0]
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [1]
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] was finished after get a sentinel
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] was finished after get a sentinel
2025-08-21 17:30:27,658 [INFO] __main__: Processo finalizado. took 0.7119s

3. Exemplo com callback customizado

Código
import asyncio
import logging
from typing import Any

from snakestack.logging.config import setup_logging
from snakestack.task.process import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def callback(num: int, message: Any):
    await asyncio.sleep(0.01)
    logger.debug(f"Worker [{num}] process item {message} on custom callback")



async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(
        queue_max_size=10,
        func_callback=callback
    )
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(10):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        logger.info("Processo finalizado.")
Saída
2025-08-21 17:31:00,998 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:31:00,999 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:31:00,999 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:31:00,999 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,099 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:31:01,100 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:31:01,100 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,201 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:31:01,202 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:31:01,212 [DEBUG] __main__: Worker [0] process item sample 0 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:31:01,213 [DEBUG] __main__: Worker [1] process item sample 1 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:31:01,224 [DEBUG] __main__: Worker [0] process item sample 2 on custom callback
2025-08-21 17:31:01,224 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:31:01,225 [DEBUG] __main__: Worker [1] process item sample 3 on custom callback
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,226 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:31:01,236 [DEBUG] __main__: Worker [0] process item sample 4 on custom callback
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:31:01,238 [DEBUG] __main__: Worker [1] process item sample 5 on custom callback
2025-08-21 17:31:01,238 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,239 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [0] process item sample 6 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [1] process item sample 7 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [0] process item sample 8 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [1] process item sample 9 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [1] waiting items...

Módulo MongoDB

1. Exemplo de leitura e escrita

Código
import asyncio

from snakestack.config import settings
from snakestack.mongodb.async_client import (
    close_mongodb_connection,
    get_database,
    open_mongodb_connection
)

async def write(db):
    result = await db.insert_one({
        "foo": "bar",
        "bar": "baz"
    })
    print(result)

async def read(db):
    result = await db.find_one(filter={"foo": "bar"})
    print(result)


async def main():
    await open_mongodb_connection(settings=settings)
    db = get_database(settings=settings)["teste"]
    await write(db)
    await read(db)
    await close_mongodb_connection()

if __name__ == "__main__":
    asyncio.run(main())
Saída
InsertOneResult(ObjectId('69ce830ccb1b6edc4674d323'), acknowledged=True)
{'_id': ObjectId('69ce830ccb1b6edc4674d323'), 'foo': 'bar', 'bar': 'baz'}

🧭 Roadmap

  • Modularização por domínio

  • Cobertura completa de testes unitários

  • Suporte a extras no PyPI

  • Documentação online (mkdocs)

  • Dashboard de observabilidade com Tempo + Prometheus + Grafana

  • CI/CD com deploy automático no PyPI

  • CLI para validação de ambientes e testes locais

  • Criação de CHANGELOG via towncrier


👨‍💻 Autor

Desenvolvido por Bruno Segato — contribuições, sugestões e feedbacks são sempre bem-vindos!


📝 Licença

Este projeto está licenciado sob os termos da MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snakestack-0.27.0.tar.gz (36.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snakestack-0.27.0-py3-none-any.whl (43.3 kB view details)

Uploaded Python 3

File details

Details for the file snakestack-0.27.0.tar.gz.

File metadata

  • Download URL: snakestack-0.27.0.tar.gz
  • Upload date:
  • Size: 36.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.6

File hashes

Hashes for snakestack-0.27.0.tar.gz
Algorithm Hash digest
SHA256 6554a0e12f15b86d843e46e927d9fc9b5b8798e124b31cfffb1028386f532a85
MD5 48cee860f36d3598a91c4af6d823842c
BLAKE2b-256 c879e915a3df5d3b908443de913fbde3652138ae8ac4768be670fd647c183172

See more details on using hashes here.

File details

Details for the file snakestack-0.27.0-py3-none-any.whl.

File metadata

File hashes

Hashes for snakestack-0.27.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d36ff5cecc50e501509990e43a2df224ff09741a13accdc194bed0909b7fb0ff
MD5 ebec7c507d374ccdbe9c92add7a66ddf
BLAKE2b-256 c60eb4933717ec7dd265bc5f8d52b690740a3ee4b3351c2ceafe5529dd940cff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page