Uma coleção de utilitários para acelerar o desenvolvimento backend em Python com padrões reutilizáveis e produtivos
Project description
🐍 SnakeStack
📦 Visão Geral
O snakestack é um pacote modular que oferece uma base robusta para construção de serviços backend com foco em:
- Observabilidade com OpenTelemetry
- Cache assíncrono com Redis
- Integração com Google Pub/Sub
- Acesso assíncrono ao MongoDB
- Client HTTPX com suporte a tracing
- Modelos base com
pydanticepydantic-settings - Stack de logging estruturado e configurável
⚙️ Instalação
Instalação base:
Via PIP
pip install snakestack
Via Poetry
poetry add snakestack
Extras disponíveis:
| Extra | Comando de instalação |
|---|---|
| Redis | pip install snakestack[redis] |
| MongoDB | pip install snakestack[mongodb] |
| Pub/Sub | pip install snakestack[pubsub] |
| Telemetry | pip install snakestack[telemetry] |
| Todos | pip install snakestack[all] |
🧪 Testes
O projeto possui cobertura completa de testes unitários e está organizado por domínios.
Executar todos os testes:
make test
Rodar testes com cobertura:
make test-ci
Rodar testes de um domínio específico:
pytest -m cache
pytest -m pubsub
pytest -m telemetry
🛠️ Desenvolvimento Local
1. Clone o repositório:
git clone https://github.com/BrunoSegato/snakestack.git
cd snakestack
2. Instale as dependências:
make install
3. Ative o ambiente virtual:
source .venv/bin/activate
🧾 Comandos Úteis
| Comando | Descrição |
|---|---|
make install |
Instala dependências com Poetry |
make check |
Executa linters e mypy |
make lint |
Roda ruff com auto-fix |
make test |
Executa os testes unitários |
make cov |
Gera relatório de cobertura |
make changelog |
Gera changelog com Towncrier |
make bump |
Realiza bump de versão com Commitizen |
make release |
Gera changelog, bump e cria release/tag |
📚 Módulos disponíveis
-
snakestack.logging: Configuração de log estruturado com filtros e formatadores. -
snakestack.cache: Cliente Redis assíncrono com decorator de cache. -
snakestack.pubsub: Publisher e subscriber com suporte a presets, tracing e decorators. -
snakestack.telemetry: Integração com OpenTelemetry (métricas, traces e logging). -
snakestack.mongodb: Client assíncrono para MongoDB com tracing integrado. -
snakestack.healthz: Health check para status da aplicação e dependências. -
snakestack.httpx: Client HTTPX instrumentado. -
snakestack.model: Base de modelos pydantic para uso interno. -
snakestack.config: Gerenciamento de settings com pydantic-settings. -
snakestack.task: Classe para criar e processar fila assincrona
🧪 Exemplos de Uso
🚧 Em construção — em breve serão adicionados exemplos práticos de uso para cada módulo.
Módulo Cache
Código
async def sample():
client = create_async_redis_client()
redis = AsyncRedisService(client, default_ttl=3600)
values = [
"foo",
1,
(1, 2, 3),
{1, 2, 3},
datetime.now(),
Decimal("10.50")
]
for value in values:
await redis.set("foo", value)
print("Resultado", await redis.get("foo"))
Saída
Resultado foo
Resultado 1
Resultado [1, 2, 3]
Resultado ['1', '2', '3']
Resultado 2025-08-07T18:37:02.149923
Resultado 10.50
Módulo Healthz
1. Sample
Código
async def check_async():
await asyncio.sleep(1)
return True
def check_sync():
time.sleep(1)
return True
def main():
health_check = SnakeHealthCheck(
service_name="Teste",
service_version="0.0.1",
service_environment="test"
)
health_check.add_check(name="check_async", func=check_async)
health_check.add_check(name="check_sync", func=check_sync)
result, check = asyncio.run(health_check.is_healthy())
print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())
Saída
{
"service_name": "Teste",
"version": "0.0.1",
"host": "hostname",
"uptime": "9h 48m 24s",
"timestamp": "2025-08-07T21:41:38.071402+00:00",
"environment": "test",
"status": true,
"latency_ms": 2001.57,
"details": {
"check_async": {
"ok": true,
"latency_ms": 1001.3
},
"check_sync": {
"ok": true,
"latency_ms": 1000.27
}
}
}
2. With deps
Código
import asyncio
import orjson
from snakestack.cache import create_async_redis_client, AsyncRedisService
from snakestack.config import settings
from snakestack.healthz import SnakeHealthCheck
from snakestack.logging import setup_logging
from snakestack.mongodb import close_mongodb_connection, ping, open_mongodb_connection
async def check_cache():
client = create_async_redis_client()
redis = AsyncRedisService(client, default_ttl=3600)
return await redis.ping()
async def check_mongo():
try:
await open_mongodb_connection(host=settings.mongo.host)
return await ping()
finally:
await close_mongodb_connection()
def main():
health_check = SnakeHealthCheck(
service_name="Teste",
service_version="0.0.1",
service_environment="test"
)
health_check.add_check(name="check_cache", func=check_cache)
health_check.add_check(name="check_mongo", func=check_mongo)
result, check = asyncio.run(health_check.is_healthy())
print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())
if __name__ == "__main__":
setup_logging()
main()
Saída
{
"service_name": "Teste",
"version": "0.0.1",
"host": "bruno-Aspire",
"uptime": "4h 1m 12s",
"timestamp": "2025-08-29T15:42:16.262049+00:00",
"environment": "test",
"status": true,
"latency_ms": 11.78,
"details": {
"check_cache": {
"ok": true,
"latency_ms": 2.69
},
"check_mongo": {
"ok": true,
"latency_ms": 9.09
}
}
}
Módulo Httpx
1. Exemplo simples de uso
Código
import asyncio
import orjson
from snakestack.httpx import SnakeHttpClient
class MyAPI(SnakeHttpClient):
async def get_user(
self,
):
response = await self.handle(
method="GET",
url="/get"
)
response.raise_for_status()
return response.json()
async def main():
api = MyAPI(base_url="https://httpbin.org")
result = await api.get_user()
print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())
if __name__ == "__main__":
asyncio.run(main())
Saída
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"User-Agent": "python-httpx/0.28.1",
"X-Amzn-Trace-Id": "Root=1-68a65355-68dcca804d655ab922707790"
},
"origin": "201.95.220.207",
"url": "https://httpbin.org/get"
}
2. Exemplo utilizando open telemetry
Código
import asyncio
import logging
import orjson
from snakestack.httpx import SnakeHttpClient
from snakestack.logging import setup_logging
from snakestack.telemetry import instrument_app, instrumented_span
instrument_app(service_name="foobar", enable_httpx=True, test_mode=True)
setup_logging()
logger = logging.getLogger(__name__)
class MyAPI(SnakeHttpClient):
@instrumented_span(span_name="principal")
async def get_user(
self,
):
response = await self.handle(
method="GET",
url="/get"
)
response.raise_for_status()
result = response.json()
logger.info(result)
return response.json()
async def main():
api = MyAPI(base_url="https://httpbin.org")
result = await api.get_user()
print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())
if __name__ == "__main__":
asyncio.run(main())
Saída
JSON Response
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"Traceparent": "00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01",
"User-Agent": "python-httpx/0.28.1",
"X-Amzn-Trace-Id": "Root=1-68a65450-5e17d033064521816e1e7b58"
},
"origin": "201.95.220.207",
"url": "https://httpbin.org/get"
}
LOG Response
2025-08-20 20:03:44,233 [INFO] httpx: HTTP Request: GET https://httpbin.org/get "HTTP/1.1 200 OK"
2025-08-20 20:03:44,236 [INFO] __main__: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'Traceparent': '00-ee28e45169aa5404fa751c0c51567847-a91c6aa2988520b2-01', 'User-Agent': 'python-httpx/0.28.1', 'X-Amzn-Trace-Id': 'Root=1-68a65450-5e17d033064521816e1e7b58'}, 'origin': '201.95.220.207', 'url': 'https://httpbin.org/get'}
OTEL Response
{
"name": "GET",
"context": {
"trace_id": "0xee28e45169aa5404fa751c0c51567847",
"span_id": "0xa91c6aa2988520b2",
"trace_state": "[]"
},
"kind": "SpanKind.CLIENT",
"parent_id": "0x729c5a2340e0f49f",
"start_time": "2025-08-20T23:03:43.283362Z",
"end_time": "2025-08-20T23:03:44.230366Z",
"status": {
"status_code": "UNSET"
},
"attributes": {
"http.method": "GET",
"http.url": "https://httpbin.org/get",
"http.status_code": 200
},
"events": [],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.36.0",
"service.name": "foobar",
"service.instance.id": "Aspire"
},
"schema_url": ""
}
}
{
"name": "principal",
"context": {
"trace_id": "0xee28e45169aa5404fa751c0c51567847",
"span_id": "0x729c5a2340e0f49f",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": null,
"start_time": "2025-08-20T23:03:43.187189Z",
"end_time": "2025-08-20T23:03:44.236666Z",
"status": {
"status_code": "UNSET"
},
"attributes": {},
"events": [],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.36.0",
"service.name": "foobar",
"service.instance.id": "Aspire"
},
"schema_url": ""
}
}
Módulo Logging
1. Exemplo com formatter default
Código
def main():
setup_logging()
logger = logging.getLogger(__name__)
logger.info("Logging simples funcionando.")
Saída
2025-08-07 18:50:14,385 [INFO] __main__: Logging simples funcionando.
2. Exemplo com formatter with_request_id
Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FORMATTER=with_request_id
Código
def main():
set_request_id("12345678")
logger.info("Logging with_request_id funcionando.")
if __name__ == "__main__":
setup_logging()
logger = logging.getLogger(__name__)
main()
Saída
2025-08-08 16:44:43,236 [INFO] [req_id=12345678] __main__: Logging with_request_id funcionando.
3. Exemplo com formatter custom_json
Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FORMATTER=custom_json
Código
def main():
set_request_id("12345678")
logger.info("Logging custom_json funcionando.")
if __name__ == "__main__":
setup_logging()
logger = logging.getLogger(__name__)
main()
Saída
{"time":"2025-08-08T19:47:53.572825+00:00","level":"INFO","pid":175425,"name":"__main__:8","msg":"Logging with_request_id funcionando.","request":{"id":"12345678"}}
4. Exemplo com filter excluded_name
Variável de ambiente
SNAKESTACK_LOG_DEFAULT_FILTERS=excluded_name,request_id
SNAKESTACK_LOG_DEFAULT_FORMATTER=with_request_id
SNAKESTACK_LOG_EXCLUDED_NAME=ignore.me
Código
def main():
set_request_id("12345678")
logger.info("Logging com filtro excluded_name funcionando.")
logger_exclude.info("Logging será descartado pelo filtro.")
if __name__ == "__main__":
setup_logging()
logger = logging.getLogger(__name__)
logger_exclude = logging.getLogger("exclude.me")
main()
Saída
2025-08-08 16:58:29,570 [INFO] [req_id=12345678] __main__: Logging com filtro excluded_name funcionando.
Módulo Task
1. Exemplo sem sentinela
Código
import asyncio
import logging
from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess
logger = logging.getLogger(__name__)
async def main(num_workers: int = 2):
loop = asyncio.get_running_loop()
process = SnakeTaskProcess(queue_max_size=10)
tasks = []
for i in range(num_workers):
logger.info(f"Scheduling worker [{i}]")
task = loop.create_task(process.worker(num=i))
tasks.append(task)
await asyncio.sleep(0.1)
for x in range(5):
await process.enqueue(message=f"sample {x}")
await process.wait()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
setup_logging()
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Workers finalizado.")
finally:
logger.info("Processo finalizado.")
Saída
2025-08-21 17:29:01,653 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:29:01,653 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:29:01,653 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:29:01,653 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,753 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:29:01,754 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:29:01,754 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:29:01,855 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:29:01,956 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:01,957 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:29:02,059 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:29:02,060 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:29:02,161 [DEBUG] snakestack.task.process: Worker [0] waiting items...
1. Exemplo com sentinela
Código
import asyncio
import logging
import time
from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess
logger = logging.getLogger(__name__)
async def main(num_workers: int = 2):
loop = asyncio.get_running_loop()
process = SnakeTaskProcess(queue_max_size=10, sentinel=None)
tasks = []
for i in range(num_workers):
logger.info(f"Scheduling worker [{i}]")
task = loop.create_task(process.worker(num=i))
tasks.append(task)
await asyncio.sleep(0.1)
for x in range(10):
await process.enqueue(message=f"sample {x}")
await process.wait()
for i in range(num_workers):
logger.info(f"Publishing sentinels worker [{i}]")
await process.enqueue(message=None)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
setup_logging()
start = time.perf_counter()
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Workers finalizado.")
finally:
duration = time.perf_counter() - start
logger.info(f"Processo finalizado. took {duration:.4f}s")
Saída
2025-08-21 17:30:26,946 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:30:26,946 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:30:26,947 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:30:26,947 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,047 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:30:27,048 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:30:27,048 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:30:27,149 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] process item sample 0
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,250 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] process item sample 1
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,251 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:30:27,351 [DEBUG] snakestack.task.process: Worker [0] process item sample 2
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] process item sample 3
2025-08-21 17:30:27,352 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,353 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:30:27,453 [DEBUG] snakestack.task.process: Worker [0] process item sample 4
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] process item sample 5
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,454 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] process item sample 6
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,555 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] process item sample 7
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,556 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:30:27,656 [DEBUG] snakestack.task.process: Worker [0] process item sample 8
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] process item sample 9
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [0]
2025-08-21 17:30:27,657 [INFO] __main__: Publishing sentinels worker [1]
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [0] was finished after get a sentinel
2025-08-21 17:30:27,657 [DEBUG] snakestack.task.process: Worker [1] was finished after get a sentinel
2025-08-21 17:30:27,658 [INFO] __main__: Processo finalizado. took 0.7119s
1. Exemplo com callback customizado
Código
import asyncio
import logging
from typing import Any
from snakestack.logging import setup_logging
from snakestack.task import SnakeTaskProcess
logger = logging.getLogger(__name__)
async def callback(num: int, message: Any):
await asyncio.sleep(0.01)
logger.debug(f"Worker [{num}] process item {message} on custom callback")
async def main(num_workers: int = 2):
loop = asyncio.get_running_loop()
process = SnakeTaskProcess(
queue_max_size=10,
func_callback=callback
)
tasks = []
for i in range(num_workers):
logger.info(f"Scheduling worker [{i}]")
task = loop.create_task(process.worker(num=i))
tasks.append(task)
await asyncio.sleep(0.1)
for x in range(10):
await process.enqueue(message=f"sample {x}")
await process.wait()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
setup_logging()
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Workers finalizado.")
finally:
logger.info("Processo finalizado.")
Saída
2025-08-21 17:31:00,998 [DEBUG] asyncio: Using selector: EpollSelector
2025-08-21 17:31:00,999 [INFO] __main__: Scheduling worker [0]
2025-08-21 17:31:00,999 [INFO] snakestack.task.process: Starting worker [0]
2025-08-21 17:31:00,999 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,099 [INFO] __main__: Scheduling worker [1]
2025-08-21 17:31:01,100 [INFO] snakestack.task.process: Starting worker [1]
2025-08-21 17:31:01,100 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,201 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 0
2025-08-21 17:31:01,202 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 1
2025-08-21 17:31:01,212 [DEBUG] __main__: Worker [0] process item sample 0 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 2
2025-08-21 17:31:01,213 [DEBUG] __main__: Worker [1] process item sample 1 on custom callback
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,213 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 3
2025-08-21 17:31:01,224 [DEBUG] __main__: Worker [0] process item sample 2 on custom callback
2025-08-21 17:31:01,224 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 4
2025-08-21 17:31:01,225 [DEBUG] __main__: Worker [1] process item sample 3 on custom callback
2025-08-21 17:31:01,225 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,226 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 5
2025-08-21 17:31:01,236 [DEBUG] __main__: Worker [0] process item sample 4 on custom callback
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,237 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 6
2025-08-21 17:31:01,238 [DEBUG] __main__: Worker [1] process item sample 5 on custom callback
2025-08-21 17:31:01,238 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,239 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 7
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [0] process item sample 6 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [0] pulled item sample 8
2025-08-21 17:31:01,249 [DEBUG] __main__: Worker [1] process item sample 7 on custom callback
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] waiting items...
2025-08-21 17:31:01,249 [DEBUG] snakestack.task.process: Worker [1] pulled item sample 9
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [0] process item sample 8 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [0] waiting items...
2025-08-21 17:31:01,260 [DEBUG] __main__: Worker [1] process item sample 9 on custom callback
2025-08-21 17:31:01,260 [DEBUG] snakestack.task.process: Worker [1] waiting items...
Módulo MongoDB
1. Exemplo de leitura e escrita
Código
import asyncio
from snakestack.config import settings
from snakestack.mongodb.async_client import (
close_mongodb_connection,
get_collection,
open_mongodb_connection
)
async def write(db):
result = await db.insert_one({
"foo": "bar",
"bar": "baz"
})
print(result)
async def read(db):
result = await db.find_one(filter={"foo": "bar"})
print(result)
async def main():
await open_mongodb_connection(settings=settings)
db = get_collection(settings=settings, collection="teste")
await write(db)
await read(db)
await close_mongodb_connection()
if __name__ == "__main__":
asyncio.run(main())
Saída
InsertOneResult(ObjectId('69ce830ccb1b6edc4674d323'), acknowledged=True)
{'_id': ObjectId('69ce830ccb1b6edc4674d323'), 'foo': 'bar', 'bar': 'baz'}
🧭 Roadmap
-
Modularização por domínio
-
Cobertura completa de testes unitários
-
Suporte a extras no PyPI
-
Documentação online (mkdocs)
-
Dashboard de observabilidade com Tempo + Prometheus + Grafana
-
CI/CD com deploy automático no PyPI
-
CLI para validação de ambientes e testes locais
-
Criação de CHANGELOG via towncrier
👨💻 Autor
Desenvolvido por Bruno Segato — contribuições, sugestões e feedbacks são sempre bem-vindos!
📝 Licença
Este projeto está licenciado sob os termos da MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snakestack-0.24.1.tar.gz.
File metadata
- Download URL: snakestack-0.24.1.tar.gz
- Upload date:
- Size: 34.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fc76b732ade81649628f2257f2e4a676f29e3717b6e46759ed4c7dfee6466f29
|
|
| MD5 |
d4acabb0612dbfef5d66c1685eaad888
|
|
| BLAKE2b-256 |
aea96de80795100c6f8b5cba28023cf541b22fdce693ae9ac6eeb52c787f5e7b
|
File details
Details for the file snakestack-0.24.1-py3-none-any.whl.
File metadata
- Download URL: snakestack-0.24.1-py3-none-any.whl
- Upload date:
- Size: 40.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1fbf3eb9c1be8ca9b137864274b3936b6040ead5e708138355a86dd451c0d2e0
|
|
| MD5 |
a8cb2ee2c6b5c36f986b8e4325996f90
|
|
| BLAKE2b-256 |
4570c3060862508f398018941400711cf55bc49afa39da8c48161ade4ae78553
|