A Yodo1 Python Toolbox
Project description
Yodo1 Python Toolkit
Install
pip install yodo1-toolkit
SSO
Setup
Create base instance on app/base.py
file.
from typing import Dict
from fastapi import Depends
from yodo1.sso import JWTHelper, JWTPayload
auth = JWTHelper()
# Define helper class
# This is to add custom operation after get user info, like setup APM context
# https://www.elastic.co/guide/en/apm/agent/python/master/api.html#api-set-user-context
def get_current_user_dict(payload: JWTPayload = Depends(auth.current_payload)) -> Dict:
elasticapm.set_user_context(username=payload.name,
email=payload.email,
user_id=payload.sub)
return {
"sub": payload.sub,
"email": payload.email,
}
Setup public key when api startup
from fastapi import FastAPI
from app.base import auth
app = FastAPI()
@app.on_event("startup")
async def startup_event() -> None:
# Setup public key via sso verser url, this is path to public_key file.
auth.setup_with_sso_server("<public_key_url>")
# Or just setup with piblic_key str
# >>> public_key = "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BA ..."
# >>> auth.setup_keys(public_key=public_key)
User with FastAPI
from app.base import auth, get_current_user_dict
from yodo1.sso import JWTPayload
...
# use payload without custom model
@router.get("/get_payload")
async def get_payload(
payload: JWTPayload=Depends(auth.current_payload)):
return f"Hello, {payload.sub}"
# use custom user model
@router.get("/user")
async def get_payload(
payload: Dict=Depends(get_current_user_dict)):
return f"Hello, {payload["sub"]}"
sqlalchemy
Create base instances on app/base.py
file.
from sqlalchemy import create_engine
from yodo1.sqlalchemy import DBManager
engine = create_engine("<db_rui>",
pool_size=0,
pool_recycle=600,
max_overflow=-1)
db = DBManager(engine=engine)
Use with FastAPI
from app.base import db
...
@router.post("/update")
async def hello_world(
payload=Depends(auth.current_payload),
session=Depends(db.get_session)) -> ItemModel:
item: ItemModel = session.query(ItemModel).first()
return item
...
Use without FastAPI
from app.base import db
session = db.SessionLocal()
session.query(Model).all()
session.commit()
session.close()
Define Model
from sqlalchemy import (
INTEGER,
Column,
TEXT
)
from yodo1.sqlalchemy import BaseDBModel
class ItemModel(BaseDBModel):
__tablename__ = "item_list"
__table_args__ = {"extend_existing": True}
id = Column(INTEGER, primary_key=True, autoincrement=True, nullable=False)
title = Column(TEXT, nullable=False, comment="notification title")
Define Schema
from yodo1.pydantic import BaseSchema, BaseDateSchema
class OutputModelSchema(BaseSchema):
id: int
title: str
class OutputModelWithDateSchema(BaseDateSchema):
id: int
title: str
Rabbit MQ
How to use Consumer
import logging
import random
import time
import pika
from yodo1.rabbitmq.multi_thread import MultiThreadConsumer, MQAction, CallbackResult
# We can change pika log level to reduce logs.
logging.getLogger("pika").setLevel(logging.INFO)
logging.basicConfig(level="DEBUG")
def demo_callback(method_frame: pika.spec.Basic.Deliver,
header_frame: pika.spec.BasicProperties,
message_body: bytes) -> CallbackResult:
"""
Demo callback function
:param method_frame: method_frame from MQ Message
:param header_frame: header_frame from MQ Message
:param message_body: MQ Message body
:return: whether should ack
"""
logging.info(f"Received message in Queue: {method_frame.routing_key}, delivery_tag: {method_frame.delivery_tag}")
time.sleep(30)
if random.random() > 0.5:
# Failed to process, should nack with `requeue=False`
return CallbackResult(MQAction.ack)
else:
# Process success, should ack
return CallbackResult(MQAction.nack)
consumer = MultiThreadConsumer(uri="amqps://xxxx",
verbose=True)
consumer.setup_queue_consumer(queue_name="test.consumer.a.debug",
exchange_name="target-exchange",
handler_function=demo_callback)
try:
consumer.start_consuming()
except KeyboardInterrupt:
consumer.stop_consuming()
consumer.close()
Consume MQ with apm enabled
import elasticapm
apm_client = elasticapm.Client(
service_name="awesome-api",
server_url="https://apm-host",
secret_token="token",
environment="test",
service_version="2.0.0")
consumer = MultiThreadConsumer(uri="amqps://xxxx",
apm_client=apm_client,
verbose=True)
AsyncRabbit
is Deprecated due to stability, will remove from version 0.3.0. Please use yodo1.rabbitmq.MultiThreadConsumer
How to use Sender
RabbitHttpSender
is a thread safe sender with send MQ directly using the HTTP client.
import json
import aio_pika
from yodo1.rabbitmq import RabbitHttpSender
# Recommend to share one sender client for each app worker
# Init with URI
uri = "https://username:password@rabbit-host/virtualhost"
rabbit_sender = RabbitHttpSender(uri=uri)
# Make sure we have defined target queue and exchange relation on the startup
@app.on_event("startup")
async def startup_event() -> None:
# Register the exchange.
# We need to define the relation in the code
rabbit_sender.declare_exchange(exchange_name="only-queue")
def do_some_magic_and_publish_to_exchange():
do_magic()
rabbit_sender.publish(
exchange_name="exchange-1",
message_body={"magic": "done"}
)
def do_some_magic_and_publish_to_queue_withou_exchange():
do_magic()
# We can publish message directly to a queue using special exchange ""
rabbit_sender.publish(
exchange_name="",
routing_key="target-queue-name",
message_body={"magic": "done"}
)
Send MQ with apm enabled
import elasticapm
apm_client = elasticapm.Client(
service_name="awesome-api-consumer",
server_url="https://apm-host",
secret_token="token",
environment="test",
service_version="2.0.0")
# init sender with apm client
uri = "https://username:password@rabbit-host/virtualhost"
rabbit_sender = RabbitHttpSender(uri=uri)
rabbit_sender.publish(
event_name="alian-found", # Must have a event name when using apm client
exchange_name="",
routing_key="target-queue-name",
message_body={"magic": "done"}
)
rabbit_sender.publish(
event_name="alian-found", # Must have a event name when using apm client
exchange_name="exchange-1",
message_body={"magic": "done"}
)
Send MQ with FastAPI apm enabled
from elasticapm.contrib.starlette import ElasticAPM, make_apm_client
from fastapi import FastAPI
apm_client = make_apm_client(
{
"SERVICE_NAME": "demo-api",
"SECRET_TOKEN": "xxxx",
"SERVER_URL": "https://apm-host",
"ENVIRONMENT": "test",
"SERVICE_VERSION": "1.0.0",
}
)
app = FastAPI()
app.add_middleware(ElasticAPM, client=apm_client)
# init sender with apm client
uri = "https://username:password@rabbit-host/virtualhost"
rabbit_sender = RabbitHttpSender(uri=uri, apm_client=apm_client)
rabbit_sender.publish(
event_name="alian-found", # Must have a event name when using apm client
exchange_name="exchange-1",
message_body={"magic": "done"}
)
Progress Bar
A simple progress bar can display properly on k8s and Grafana.
import logging
from yodo1.progress import ProgressBar
logging.basicConfig(level="DEBUG")
p = ProgressBar(total=100, desc="Hacking ...", step=5)
for i in range(100):
p.update()
This is the output
INFO:yodo1.progress: 12.0% |>>>>>> | 12/100 Hacking ...
INFO:yodo1.progress: 24.0% |>>>>>>>>>>>> | 24/100 Hacking ...
INFO:yodo1.progress: 36.0% |>>>>>>>>>>>>>>>>>> | 36/100 Hacking ...
INFO:yodo1.progress: 48.0% |>>>>>>>>>>>>>>>>>>>>>>>> | 48/100 Hacking ...
INFO:yodo1.progress: 60.0% |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | 60/100 Hacking ...
INFO:yodo1.progress: 72.0% |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | 72/100 Hacking ...
INFO:yodo1.progress: 84.0% |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | 84/100 Hacking ...
INFO:yodo1.progress: 96.0% |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | 96/100 Hacking ...
INFO:yodo1.progress: 100.0% |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>| 100/100 Hacking ...
Use it with ThreadPoolExecutor.
progress = ProgressBar(total=1000, desc="Processing... ")
with ThreadPoolExecutor(max_workers=20) as executor:
for index, row in df.iterrows():
future = executor.submit(do_something
id=row.id)
# Update progress bar when the job done
future.add_done_callback(lambda x: progress.update())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
yodo1-toolkit-0.3.9.tar.gz
(18.7 kB
view details)
Built Distribution
File details
Details for the file yodo1-toolkit-0.3.9.tar.gz
.
File metadata
- Download URL: yodo1-toolkit-0.3.9.tar.gz
- Upload date:
- Size: 18.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b6cf789fb81e8af9823aaa31793a140a156f5d0fa77a41527bfe5f22f7f588bb |
|
MD5 | df4cea56f5fad1512afca9c2a64a6dc5 |
|
BLAKE2b-256 | d72094c46eb1a8046850fdaf0e34fab08454e0b6020633ac732dafb9782f5fdf |
File details
Details for the file yodo1_toolkit-0.3.9-py3-none-any.whl
.
File metadata
- Download URL: yodo1_toolkit-0.3.9-py3-none-any.whl
- Upload date:
- Size: 17.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0d915ebd19f6553b219f01e940686d7ba6766c721220887c4113676e0a3663a3 |
|
MD5 | 684cb6f16144b5a07b94aa89299cb286 |
|
BLAKE2b-256 | 7d7fb897bd279270c073a5cf785728cd4bf756dace70b73b9da21f58003d7b71 |