Skip to main content

Uma coleção de utilitários para acelerar o desenvolvimento backend em Python com padrões reutilizáveis e produtivos

Project description

🐍 SnakeStack

Python built with uv Pipeline PyPI version License: MIT codecov gitleaks badge


📦 Visão Geral

O snakestack é um pacote modular que oferece uma base robusta para construção de serviços backend com foco em:

  • Observabilidade com OpenTelemetry
  • Cache assíncrono com Redis
  • Integração com Google Pub/Sub
  • Acesso assíncrono ao MongoDB
  • Client HTTPX com suporte a tracing
  • Modelos base com pydantic e pydantic-settings
  • Stack de logging estruturado e configurável

⚙️ Instalação

Instalação base:

Via PIP

pip install snakestack

Via Poetry

poetry add snakestack

Extras disponíveis:

Extra Comando de instalação
Redis pip install snakestack[redis]
MongoDB pip install snakestack[mongodb]
Pub/Sub pip install snakestack[pubsub]
Telemetry pip install snakestack[telemetry]
Todos pip install snakestack[all]

🧪 Testes

O projeto possui cobertura completa de testes unitários e está organizado por domínios.

Executar todos os testes:

make test

Rodar testes com cobertura:

make test-ci

Rodar testes de um domínio específico:

pytest -m cache
pytest -m pubsub
pytest -m telemetry

🛠️ Desenvolvimento Local

1. Clone o repositório:

git clone https://github.com/BrunoSegato/snakestack.git
cd snakestack

2. Instale as dependências:

make install

3. Ative o ambiente virtual:

source .venv/bin/activate

🧾 Comandos Úteis

Comando Descrição
make install Instala dependências com Poetry
make check Executa linters e mypy
make lint Roda ruff com auto-fix
make test Executa os testes unitários
make cov Gera relatório de cobertura
make changelog Gera changelog com Towncrier
make bump Realiza bump de versão com Commitizen
make release Gera changelog, bump e cria release/tag

📚 Módulos disponíveis

  • snakestack.logging: Configuração de log estruturado com filtros e formatadores.

  • snakestack.cache: Cliente Redis assíncrono com decorator de cache.

  • snakestack.pubsub: Publisher e subscriber com suporte a presets, tracing e decorators.

  • snakestack.telemetry: Integração com OpenTelemetry (métricas, traces e logging).

  • snakestack.mongodb: Client assíncrono para MongoDB com tracing integrado.

  • snakestack.healthz: Health check para status da aplicação e dependências.

  • snakestack.httpx: Client HTTPX instrumentado.

  • snakestack.model: Base de modelos pydantic para uso interno.

  • snakestack.config: Gerenciamento de settings com pydantic-settings.

  • snakestack.task: Classe para criar e processar fila assincrona


🧪 Exemplos de Uso

🚧 Em construção — em breve serão adicionados exemplos práticos de uso para cada módulo.

Módulo Cache

Código

async def sample():
    client = create_async_redis_client()
    redis = AsyncRedisService(client, default_ttl=3600)

    values = [
        "foo",
        1,
        (1, 2, 3),
        {1, 2, 3},
        datetime.now(),
        Decimal("10.50")
    ]

    for value in values:
        await redis.set("foo", value)
        print("Resultado", await redis.get("foo"))

Saída

Resultado foo
Resultado 1
Resultado [1, 2, 3]
Resultado ['1', '2', '3']
Resultado 2025-08-07T18:37:02.149923
Resultado 10.50

Módulo Healthz

1. Sample

Código
async def check_async():
    await asyncio.sleep(1)
    return True

def check_sync():
    time.sleep(1)
    return True

def main():
    health_check = SnakeHealthCheck(
        service_name="Teste",
        service_version="0.0.1",
        service_environment="test"
    )
    health_check.add_check(name="check_async", func=check_async)
    health_check.add_check(name="check_sync", func=check_sync)

    result, check = asyncio.run(health_check.is_healthy())
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())
Saída
{
  "service_name": "Teste",
  "version": "0.0.1",
  "host": "hostname",
  "uptime": "9h 48m 24s",
  "timestamp": "2025-08-07T21:41:38.071402+00:00",
  "environment": "test",
  "status": true,
  "latency_ms": 2001.57,
  "details": {
    "check_async": {
      "ok": true,
      "latency_ms": 1001.3
    },
    "check_sync": {
      "ok": true,
      "latency_ms": 1000.27
    }
  }
}

2. With deps

Código
import asyncio

import orjson

from snakestack.cache import create_async_redis_client, AsyncRedisService
from snakestack.healthz import SnakeHealthCheck
from snakestack.logging import setup_logging
from snakestack.mongodb import close_mongodb_connection, ping, open_mongodb_connection


async def check_cache():
    client = create_async_redis_client()
    redis = AsyncRedisService(client, default_ttl=3600)
    return await redis.ping()

async def check_mongo():
    try:
        await open_mongodb_connection()
        return await ping()
    finally:
        await close_mongodb_connection()


def main():
    health_check = SnakeHealthCheck(
        service_name="Teste",
        service_version="0.0.1",
        service_environment="test"
    )
    health_check.add_check(name="check_cache", func=check_cache)
    health_check.add_check(name="check_mongo", func=check_mongo)

    result, check = asyncio.run(health_check.is_healthy())
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    setup_logging()
    main()
Saída
{
  "service_name": "Teste",
  "version": "0.0.1",
  "host": "bruno-Aspire",
  "uptime": "4h 1m 12s",
  "timestamp": "2025-08-29T15:42:16.262049+00:00",
  "environment": "test",
  "status": true,
  "latency_ms": 11.78,
  "details": {
    "check_cache": {
      "ok": true,
      "latency_ms": 2.69
    },
    "check_mongo": {
      "ok": true,
      "latency_ms": 9.09
    }
  }
}

Módulo Httpx

1. Exemplo simples de uso

Código
import asyncio
import orjson

from snakestack.httpx import SnakeHttpClient


class MyAPI(SnakeHttpClient):

    async def get_user(
        self,
    ):
        response = await self.handle(
            method="GET",
            url="/get"
        )
        response.raise_for_status()
        return response.json()


async def main():
    api = MyAPI(base_url="https://httpbin.org")
    result = await api.get_user()
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    asyncio.run(main())
Saída
{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "User-Agent": "python-httpx/0.28.1",
    "X-Amzn-Trace-Id": "Root=1-68a65355-68dcca804d655ab922707790"
  },
  "origin": "201.95.220.207",
  "url": "https://httpbin.org/get"
}

2. Exemplo utilizando open telemetry

Código
import asyncio
import logging
import orjson

from snakestack.httpx import SnakeHttpClient
from snakestack.logging import setup_logging
from snakestack.telemetry import instrument_app, instrumented_span


instrument_app(service_name="foobar", enable_httpx=True, test_mode=True)

setup_logging()
logger = logging.getLogger(__name__)


class MyAPI(SnakeHttpClient):

    @instrumented_span(span_name="principal")
    async def get_user(
        self,
    ):
        response = await self.handle(
            method="GET",
            url="/get"
        )
        response.raise_for_status()
        result = response.json()
        logger.info(result)
        return response.json()


async def main():
    api = MyAPI(base_url="https://httpbin.org")
    result = await api.get_user()
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())

if __name__ == "__main__":
    asyncio.run(main())
Saída
JSON Response
{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "Traceparent": "00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01",
    "User-Agent": "python-httpx/0.28.1",
    "X-Amzn-Trace-Id": "Root=1-68a65450-5e17d033064521816e1e7b58"
  },
  "origin": "201.95.220.207",
  "url": "https://httpbin.org/get"
}
LOG Response
2025-08-20 20:03:44,233 [INFO] httpx: HTTP Request: GET https://httpbin.org/get "HTTP/1.1 200 OK"
2025-08-20 20:03:44,236 [INFO] __main__: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'Traceparent': '00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01', 'User-Agent': 'python-httpx/0.28.1', 'X-Amzn-Trace-Id': 'Root=1-68a65450-5e17d033064521816e1e7b58'}, 'origin': '201.95.220.207', 'url': 'https://httpbin.org/get'}
OTEL Response
{
    "name": "GET",
    "context": {
        "trace_id": "0xee28e45169aa5404fa751c0c51567847",
        "span_id": "0xa91c6aa2988520b2",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": "0x729c5a2340e0f49f",
    "start_time": "2025-08-20T23:03:43.283362Z",
    "end_time": "2025-08-20T23:03:44.230366Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "http.method": "GET",
        "http.url": "https://httpbin.org/get",
        "http.status_code": 200
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "telemetry.sdk.language": "python",
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.version": "1.36.0",
            "service.name": "foobar",
            "service.instance.id": "Aspire"
        },
        "schema_url": ""
    }
}
{
    "name": "principal",
    "context": {
        "trace_id": "0xee28e45169aa5404fa751c0c51567847",
        "span_id": "0x729c5a2340e0f49f",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2025-08-20T23:03:43.187189Z",
    "end_time": "2025-08-20T23:03:44.236666Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {},
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "telemetry.sdk.language": "python",
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.version": "1.36.0",
            "service.name": "foobar",
            "service.instance.id": "Aspire"
        },
        "schema_url": ""
    }
}

Módulo Logging

1. Exemplo com formatter default

Código
def main():
    setup_logging()
    logger = logging.getLogger(__name__)
    logger.info("Logging simples funcionando.")
Saída
2025-08-07 18:50:14,385 [INFO] __main__: Logging simples funcionando.

2. Exemplo com formatter with_request_id

Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FORMATTER=with_request_id
Código
def main():
    set_request_id("12345678")
    logger.info("Logging with_request_id funcionando.")


if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)
    main()
Saída
2025-08-08 16:44:43,236 [INFO] [req_id=12345678] __main__: Logging with_request_id funcionando.

3. Exemplo com formatter custom_json

Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FORMATTER=custom_json
Código
def main():
    set_request_id("12345678")
    logger.info("Logging custom_json funcionando.")


if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)
    main()
Saída
{"time":"2025-08-08T19:47:53.572825+00:00","level":"INFO","pid":175425,"name":"__main__:8","msg":"Logging with_request_id funcionando.","request":{"id":"12345678"}}

4. Exemplo com filter excluded_name

Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FILTERS=excluded_name,request_id
SNAKESTACK_LOG_DEFAULT_FORMATTER=with_request_id
SNAKESTACK_LOG_EXCLUDED_NAME=ignore.me
Código
def main():
    set_request_id("12345678")
    logger.info("Logging com filtro excluded_name funcionando.")
    logger_exclude.info("Logging será descartado pelo filtro.")


if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)
    logger_exclude = logging.getLogger("exclude.me")
    main()
Saída
2025-08-08 16:58:29,570 [INFO] [req_id=12345678] __main__: Logging com filtro excluded_name funcionando.

Módulo Task

1. Exemplo sem sentinela

Código
import asyncio
import logging

from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(queue_max_size=10)
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(5):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        logger.info("Processo finalizado.")
Saída
2025-08-21 17:29:01,653 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:29:01,653 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:29:01,653 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:29:01,653 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,753 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:29:01,754 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:29:01,754 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:29:01,956 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] waiting items...

1. Exemplo com sentinela

Código
import asyncio
import logging
import time

from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(queue_max_size=10, sentinel=None)
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(10):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    for i in range(num_workers):
        logger.info(f"Publishing sentinels worker [{i}]")
        await process.enqueue(message=None)

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    start = time.perf_counter()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        duration = time.perf_counter() - start
        logger.info(f"Processo finalizado. took {duration:.4f}s")
Saída
2025-08-21 17:30:26,946 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:30:26,946 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:30:26,947 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:30:26,947 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,047 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:30:27,048 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:30:27,048 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:30:27,351 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,353 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:30:27,453 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] process item sample 5
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] process item sample 6
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] process item sample 7
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:30:27,656 [DEBUG] snakestack.task.process: Worker [0] process item sample 8
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] process item sample 9
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [0]
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [1]
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] was finished after get a sentinel
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] was finished after get a sentinel
2025-08-21 17:30:27,658 [INFO] __main__: Processo finalizado. took 0.7119s

1. Exemplo com callback customizado

Código
import asyncio
import logging
from typing import Any

from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def callback(num: int, message: Any):
    await asyncio.sleep(0.01)
    logger.debug(f"Worker [{num}] process item {message} on custom callback")



async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(
        queue_max_size=10,
        func_callback=callback
    )
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(10):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        logger.info("Processo finalizado.")
Saída
2025-08-21 17:31:00,998 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:31:00,999 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:31:00,999 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:31:00,999 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,099 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:31:01,100 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:31:01,100 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,201 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:31:01,202 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:31:01,212 [DEBUG] __main__: Worker [0] process item sample 0 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:31:01,213 [DEBUG] __main__: Worker [1] process item sample 1 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:31:01,224 [DEBUG] __main__: Worker [0] process item sample 2 on custom callback
2025-08-21 17:31:01,224 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:31:01,225 [DEBUG] __main__: Worker [1] process item sample 3 on custom callback
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,226 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:31:01,236 [DEBUG] __main__: Worker [0] process item sample 4 on custom callback
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:31:01,238 [DEBUG] __main__: Worker [1] process item sample 5 on custom callback
2025-08-21 17:31:01,238 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,239 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [0] process item sample 6 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [1] process item sample 7 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [0] process item sample 8 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [1] process item sample 9 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [1] waiting items...

🧭 Roadmap

  • Modularização por domínio

  • Cobertura completa de testes unitários

  • Suporte a extras no PyPI

  • Documentação online (mkdocs)

  • Dashboard de observabilidade com Tempo + Prometheus + Grafana

  • CI/CD com deploy automático no PyPI

  • CLI para validação de ambientes e testes locais

  • Criação de CHANGELOG via towncrier


👨‍💻 Autor

Desenvolvido por Bruno Segato — contribuições, sugestões e feedbacks são sempre bem-vindos!


📝 Licença

Este projeto está licenciado sob os termos da MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snakestack-0.19.0.tar.gz (34.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snakestack-0.19.0-py3-none-any.whl (40.8 kB view details)

Uploaded Python 3

File details

Details for the file snakestack-0.19.0.tar.gz.

File metadata

  • Download URL: snakestack-0.19.0.tar.gz
  • Upload date:
  • Size: 34.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.6

File hashes

Hashes for snakestack-0.19.0.tar.gz
Algorithm Hash digest
SHA256 8f9618cf90d1a8f475f0b286b4d39260f8f241992465d3757148c11be9a48cab
MD5 190382f29b2c13b43e33f7fd00f726de
BLAKE2b-256 c3a7364a08ba8e4b15cb151786383fce87e285464b80effb1426c91aa1352652

See more details on using hashes here.

File details

Details for the file snakestack-0.19.0-py3-none-any.whl.

File metadata

File hashes

Hashes for snakestack-0.19.0-py3-none-any.whl
Algorithm Hash digest
SHA256 557826a9052a4a149d727faa57b26b1bbd4e0748d45e6492683441e6d0e88de4
MD5 81286a3e7ec48948b272dd2c64a2c01c
BLAKE2b-256 ddd3981e2749711147f2e2795f65a6b37e0b20a2a14271ea5b24b7f37ab46637

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page