Skip to main content

Uma coleção de utilitários para acelerar o desenvolvimento backend em Python com padrões reutilizáveis e produtivos

Project description

🐍 SnakeStack

Python built with uv Pipeline PyPI version License: MIT codecov gitleaks badge


📦 Visão Geral

O snakestack é um pacote modular que oferece uma base robusta para construção de serviços backend com foco em:

  • Observabilidade com OpenTelemetry
  • Cache assíncrono com Redis
  • Integração com Google Pub/Sub
  • Acesso assíncrono ao MongoDB
  • Client HTTPX com suporte a tracing
  • Modelos base com pydantic e pydantic-settings
  • Stack de logging estruturado e configurável

⚙️ Instalação

Instalação base:

Via PIP

pip install snakestack

Via Poetry

poetry add snakestack

Extras disponíveis:

Extra Comando de instalação
Redis pip install snakestack[redis]
MongoDB pip install snakestack[mongodb]
Pub/Sub pip install snakestack[pubsub]
Telemetry pip install snakestack[telemetry]
Todos pip install snakestack[all]

🧪 Testes

O projeto possui cobertura completa de testes unitários e está organizado por domínios.

Executar todos os testes:

make test

Rodar testes com cobertura:

make test-ci

Rodar testes de um domínio específico:

pytest -m cache
pytest -m pubsub
pytest -m telemetry

🛠️ Desenvolvimento Local

1. Clone o repositório:

git clone https://github.com/BrunoSegato/snakestack.git
cd snakestack

2. Instale as dependências:

make install

3. Ative o ambiente virtual:

source .venv/bin/activate

🧾 Comandos Úteis

Comando Descrição
make install Instala dependências com Poetry
make check Executa linters e mypy
make lint Roda ruff com auto-fix
make test Executa os testes unitários
make cov Gera relatório de cobertura
make changelog Gera changelog com Towncrier
make bump Realiza bump de versão com Commitizen
make release Gera changelog, bump e cria release/tag

📚 Módulos disponíveis

  • snakestack.logging: Configuração de log estruturado com filtros e formatadores.

  • snakestack.cache: Cliente Redis assíncrono com decorator de cache.

  • snakestack.pubsub: Publisher e subscriber com suporte a presets, tracing e decorators.

  • snakestack.telemetry: Integração com OpenTelemetry (métricas, traces e logging).

  • snakestack.mongodb: Client assíncrono para MongoDB com tracing integrado.

  • snakestack.healthz: Health check para status da aplicação e dependências.

  • snakestack.httpx: Client HTTPX instrumentado.

  • snakestack.model: Base de modelos pydantic para uso interno.

  • snakestack.config: Gerenciamento de settings com pydantic-settings.

  • snakestack.task: Classe para criar e processar fila assincrona


🧪 Exemplos de Uso

🚧 Em construção — em breve serão adicionados exemplos práticos de uso para cada módulo.

Módulo Cache

Código

async def sample():
    client = create_async_redis_client()
    redis = AsyncRedisService(client, default_ttl=3600)

    values = [
        "foo",
        1,
        (1, 2, 3),
        {1, 2, 3},
        datetime.now(),
        Decimal("10.50")
    ]

    for value in values:
        await redis.set("foo", value)
        print("Resultado", await redis.get("foo"))

Saída

Resultado foo
Resultado 1
Resultado [1, 2, 3]
Resultado ['1', '2', '3']
Resultado 2025-08-07T18:37:02.149923
Resultado 10.50

Módulo Healthz

1. Sample

Código
async def check_async():
    await asyncio.sleep(1)
    return True

def check_sync():
    time.sleep(1)
    return True

def main():
    health_check = SnakeHealthCheck(
        service_name="Teste",
        service_version="0.0.1",
        service_environment="test"
    )
    health_check.add_check(name="check_async", func=check_async)
    health_check.add_check(name="check_sync", func=check_sync)

    result, check = asyncio.run(health_check.is_healthy())
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())
Saída
{
  "service_name": "Teste",
  "version": "0.0.1",
  "host": "hostname",
  "uptime": "9h 48m 24s",
  "timestamp": "2025-08-07T21:41:38.071402+00:00",
  "environment": "test",
  "status": true,
  "latency_ms": 2001.57,
  "details": {
    "check_async": {
      "ok": true,
      "latency_ms": 1001.3
    },
    "check_sync": {
      "ok": true,
      "latency_ms": 1000.27
    }
  }
}

2. With deps

Código
import asyncio

import orjson

from snakestack.cache import create_async_redis_client, AsyncRedisService
from snakestack.config import settings
from snakestack.healthz import SnakeHealthCheck
from snakestack.logging import setup_logging
from snakestack.mongodb import close_mongodb_connection, ping, open_mongodb_connection


async def check_cache():
    client = create_async_redis_client()
    redis = AsyncRedisService(client, default_ttl=3600)
    return await redis.ping()

async def check_mongo():
    try:
        await open_mongodb_connection(host=settings.mongo.host)
        return await ping()
    finally:
        await close_mongodb_connection()


def main():
    health_check = SnakeHealthCheck(
        service_name="Teste",
        service_version="0.0.1",
        service_environment="test"
    )
    health_check.add_check(name="check_cache", func=check_cache)
    health_check.add_check(name="check_mongo", func=check_mongo)

    result, check = asyncio.run(health_check.is_healthy())
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    setup_logging()
    main()
Saída
{
  "service_name": "Teste",
  "version": "0.0.1",
  "host": "bruno-Aspire",
  "uptime": "4h 1m 12s",
  "timestamp": "2025-08-29T15:42:16.262049+00:00",
  "environment": "test",
  "status": true,
  "latency_ms": 11.78,
  "details": {
    "check_cache": {
      "ok": true,
      "latency_ms": 2.69
    },
    "check_mongo": {
      "ok": true,
      "latency_ms": 9.09
    }
  }
}

Módulo Httpx

1. Exemplo simples de uso

Código
import asyncio
import orjson

from snakestack.httpx import SnakeHttpClient


class MyAPI(SnakeHttpClient):

    async def get_user(
        self,
    ):
        response = await self.handle(
            method="GET",
            url="/get"
        )
        response.raise_for_status()
        return response.json()


async def main():
    api = MyAPI(base_url="https://httpbin.org")
    result = await api.get_user()
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())


if __name__ == "__main__":
    asyncio.run(main())
Saída
{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "User-Agent": "python-httpx/0.28.1",
    "X-Amzn-Trace-Id": "Root=1-68a65355-68dcca804d655ab922707790"
  },
  "origin": "201.95.220.207",
  "url": "https://httpbin.org/get"
}

2. Exemplo utilizando open telemetry

Código
import asyncio
import logging
import orjson

from snakestack.httpx import SnakeHttpClient
from snakestack.logging import setup_logging
from snakestack.telemetry import instrument_app, instrumented_span


instrument_app(service_name="foobar", enable_httpx=True, test_mode=True)

setup_logging()
logger = logging.getLogger(__name__)


class MyAPI(SnakeHttpClient):

    @instrumented_span(span_name="principal")
    async def get_user(
        self,
    ):
        response = await self.handle(
            method="GET",
            url="/get"
        )
        response.raise_for_status()
        result = response.json()
        logger.info(result)
        return response.json()


async def main():
    api = MyAPI(base_url="https://httpbin.org")
    result = await api.get_user()
    print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())

if __name__ == "__main__":
    asyncio.run(main())
Saída
JSON Response
{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "Traceparent": "00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01",
    "User-Agent": "python-httpx/0.28.1",
    "X-Amzn-Trace-Id": "Root=1-68a65450-5e17d033064521816e1e7b58"
  },
  "origin": "201.95.220.207",
  "url": "https://httpbin.org/get"
}
LOG Response
2025-08-20 20:03:44,233 [INFO] httpx: HTTP Request: GET https://httpbin.org/get "HTTP/1.1 200 OK"
2025-08-20 20:03:44,236 [INFO] __main__: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'Traceparent': '00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01', 'User-Agent': 'python-httpx/0.28.1', 'X-Amzn-Trace-Id': 'Root=1-68a65450-5e17d033064521816e1e7b58'}, 'origin': '201.95.220.207', 'url': 'https://httpbin.org/get'}
OTEL Response
{
    "name": "GET",
    "context": {
        "trace_id": "0xee28e45169aa5404fa751c0c51567847",
        "span_id": "0xa91c6aa2988520b2",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": "0x729c5a2340e0f49f",
    "start_time": "2025-08-20T23:03:43.283362Z",
    "end_time": "2025-08-20T23:03:44.230366Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "http.method": "GET",
        "http.url": "https://httpbin.org/get",
        "http.status_code": 200
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "telemetry.sdk.language": "python",
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.version": "1.36.0",
            "service.name": "foobar",
            "service.instance.id": "Aspire"
        },
        "schema_url": ""
    }
}
{
    "name": "principal",
    "context": {
        "trace_id": "0xee28e45169aa5404fa751c0c51567847",
        "span_id": "0x729c5a2340e0f49f",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2025-08-20T23:03:43.187189Z",
    "end_time": "2025-08-20T23:03:44.236666Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {},
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "telemetry.sdk.language": "python",
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.version": "1.36.0",
            "service.name": "foobar",
            "service.instance.id": "Aspire"
        },
        "schema_url": ""
    }
}

Módulo Logging

1. Exemplo com formatter default

Código
def main():
    setup_logging()
    logger = logging.getLogger(__name__)
    logger.info("Logging simples funcionando.")
Saída
2025-08-07 18:50:14,385 [INFO] __main__: Logging simples funcionando.

2. Exemplo com formatter with_request_id

Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FORMATTER=with_request_id
Código
def main():
    set_request_id("12345678")
    logger.info("Logging with_request_id funcionando.")


if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)
    main()
Saída
2025-08-08 16:44:43,236 [INFO] [req_id=12345678] __main__: Logging with_request_id funcionando.

3. Exemplo com formatter custom_json

Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FORMATTER=custom_json
Código
def main():
    set_request_id("12345678")
    logger.info("Logging custom_json funcionando.")


if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)
    main()
Saída
{"time":"2025-08-08T19:47:53.572825+00:00","level":"INFO","pid":175425,"name":"__main__:8","msg":"Logging with_request_id funcionando.","request":{"id":"12345678"}}

4. Exemplo com filter excluded_name

Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FILTERS=excluded_name,request_id
SNAKESTACK_LOG_DEFAULT_FORMATTER=with_request_id
SNAKESTACK_LOG_EXCLUDED_NAME=ignore.me
Código
def main():
    set_request_id("12345678")
    logger.info("Logging com filtro excluded_name funcionando.")
    logger_exclude.info("Logging será descartado pelo filtro.")


if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)
    logger_exclude = logging.getLogger("exclude.me")
    main()
Saída
2025-08-08 16:58:29,570 [INFO] [req_id=12345678] __main__: Logging com filtro excluded_name funcionando.

Módulo Task

1. Exemplo sem sentinela

Código
import asyncio
import logging

from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(queue_max_size=10)
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(5):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        logger.info("Processo finalizado.")
Saída
2025-08-21 17:29:01,653 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:29:01,653 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:29:01,653 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:29:01,653 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,753 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:29:01,754 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:29:01,754 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:29:01,956 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] waiting items...

1. Exemplo com sentinela

Código
import asyncio
import logging
import time

from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(queue_max_size=10, sentinel=None)
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(10):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    for i in range(num_workers):
        logger.info(f"Publishing sentinels worker [{i}]")
        await process.enqueue(message=None)

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    start = time.perf_counter()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        duration = time.perf_counter() - start
        logger.info(f"Processo finalizado. took {duration:.4f}s")
Saída
2025-08-21 17:30:26,946 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:30:26,946 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:30:26,947 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:30:26,947 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,047 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:30:27,048 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:30:27,048 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:30:27,351 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,353 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:30:27,453 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] process item sample 5
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] process item sample 6
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] process item sample 7
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:30:27,656 [DEBUG] snakestack.task.process: Worker [0] process item sample 8
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] process item sample 9
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [0]
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [1]
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] was finished after get a sentinel
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] was finished after get a sentinel
2025-08-21 17:30:27,658 [INFO] __main__: Processo finalizado. took 0.7119s

1. Exemplo com callback customizado

Código
import asyncio
import logging
from typing import Any

from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess

logger = logging.getLogger(__name__)


async def callback(num: int, message: Any):
    await asyncio.sleep(0.01)
    logger.debug(f"Worker [{num}] process item {message} on custom callback")



async def main(num_workers: int = 2):
    loop = asyncio.get_running_loop()
    process = SnakeTaskProcess(
        queue_max_size=10,
        func_callback=callback
    )
    tasks = []

    for i in range(num_workers):
        logger.info(f"Scheduling worker [{i}]")
        task = loop.create_task(process.worker(num=i))
        tasks.append(task)
        await asyncio.sleep(0.1)

    for x in range(10):
        await process.enqueue(message=f"sample {x}")

    await process.wait()

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    setup_logging()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Workers finalizado.")
    finally:
        logger.info("Processo finalizado.")
Saída
2025-08-21 17:31:00,998 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:31:00,999 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:31:00,999 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:31:00,999 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,099 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:31:01,100 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:31:01,100 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,201 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:31:01,202 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:31:01,212 [DEBUG] __main__: Worker [0] process item sample 0 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:31:01,213 [DEBUG] __main__: Worker [1] process item sample 1 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:31:01,224 [DEBUG] __main__: Worker [0] process item sample 2 on custom callback
2025-08-21 17:31:01,224 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:31:01,225 [DEBUG] __main__: Worker [1] process item sample 3 on custom callback
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,226 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:31:01,236 [DEBUG] __main__: Worker [0] process item sample 4 on custom callback
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:31:01,238 [DEBUG] __main__: Worker [1] process item sample 5 on custom callback
2025-08-21 17:31:01,238 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,239 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [0] process item sample 6 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [1] process item sample 7 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [0] process item sample 8 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [1] process item sample 9 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [1] waiting items...

🧭 Roadmap

  • Modularização por domínio

  • Cobertura completa de testes unitários

  • Suporte a extras no PyPI

  • Documentação online (mkdocs)

  • Dashboard de observabilidade com Tempo + Prometheus + Grafana

  • CI/CD com deploy automático no PyPI

  • CLI para validação de ambientes e testes locais

  • Criação de CHANGELOG via towncrier


👨‍💻 Autor

Desenvolvido por Bruno Segato — contribuições, sugestões e feedbacks são sempre bem-vindos!


📝 Licença

Este projeto está licenciado sob os termos da MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snakestack-0.21.0.tar.gz (33.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snakestack-0.21.0-py3-none-any.whl (40.9 kB view details)

Uploaded Python 3

File details

Details for the file snakestack-0.21.0.tar.gz.

File metadata

  • Download URL: snakestack-0.21.0.tar.gz
  • Upload date:
  • Size: 33.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.6

File hashes

Hashes for snakestack-0.21.0.tar.gz
Algorithm Hash digest
SHA256 cab3b02340f6d05e458f26d342fdee8f7a91074a322520f50441159b19c97e6a
MD5 5e8b3c0154ed01c5c3e1f60da75c217f
BLAKE2b-256 a9cf3c88d143b42530118c2ca6c4339469cefccb68366b1e3dec97805e700700

See more details on using hashes here.

File details

Details for the file snakestack-0.21.0-py3-none-any.whl.

File metadata

File hashes

Hashes for snakestack-0.21.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a95239075b62afad10dd449cef4ea515ee5ce94a84359780414a5bd86de70abd
MD5 3e7f039ffc1cae3f5a50d0029bb47b05
BLAKE2b-256 ef8a0cc9183fc58b9754c1024f83576e9554c74866a384e8ab2f3572a68dec11

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page