Skip to main content

A Yodo1 Python Toolbox

Project description

Yodo1 Python Toolkit

PyPI - Python Version PyPI PyPI - Downloads

Install

pip install yodo1-toolkit

SSO

Setup

Create base instance on app/base.py file.

from typing import Dict
from fastapi import Depends
from yodo1.sso import JWTHelper, JWTPayload

auth = JWTHelper()

# Define helper class
# This is to add custom operation after get user info, like setup APM context
# https://www.elastic.co/guide/en/apm/agent/python/master/api.html#api-set-user-context
def get_current_user_dict(payload: JWTPayload = Depends(auth.current_payload)) -> Dict:
  elasticapm.set_user_context(username=payload.name,
                              email=payload.email,
                              user_id=payload.sub)
  return {
    "sub": payload.sub,
    "email": payload.email,
  }

Setup public key when api startup

from fastapi import FastAPI
from app.base import auth

app = FastAPI()


@app.on_event("startup")
async def startup_event() -> None:
  # Setup public key via sso verser url, this is path to public_key file.
  auth.setup_with_sso_server("<public_key_url>")

  # Or just setup with piblic_key str
  # >>> public_key = "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BA ..."
  # >>> auth.setup_keys(public_key=public_key)

User with FastAPI

from app.base import auth, get_current_user_dict
from yodo1.sso import JWTPayload
...

# use payload without custom model
@router.get("/get_payload")
async def get_payload(
  payload: JWTPayload=Depends(auth.current_payload)):
  return f"Hello, {payload.sub}"


# use custom user model
@router.get("/user")
async def get_payload(
  payload: Dict=Depends(get_current_user_dict)):
  return f"Hello, {payload["sub"]}"

sqlalchemy

Create base instances on app/base.py file.

from sqlalchemy import create_engine
from yodo1.sqlalchemy import DBManager

engine = create_engine("<db_rui>",
                       pool_size=0,
                       pool_recycle=600,
                       max_overflow=-1)

db = DBManager(engine=engine)

Use with FastAPI

from app.base import db

...

@router.post("/update")
async def hello_world(
  payload=Depends(auth.current_payload),
  session=Depends(db.get_session)) -> ItemModel:
  item: ItemModel = session.query(ItemModel).first()
  return item

...

Use without FastAPI

from app.base import db

session = db.SessionLocal()
session.query(Model).all()
session.commit()
session.close()

Define Model

from sqlalchemy import (
  INTEGER,
  Column,
  TEXT
)
from yodo1.sqlalchemy import BaseDBModel


class ItemModel(BaseDBModel):
  __tablename__ = "item_list"
  __table_args__ = {"extend_existing": True}

  id = Column(INTEGER, primary_key=True, autoincrement=True, nullable=False)
  title = Column(TEXT, nullable=False, comment="notification title")

Define Schema

from yodo1.pydantic import BaseSchema, BaseDateSchema


class OutputModelSchema(BaseSchema):
  id: int
  title: str


class OutputModelWithDateSchema(BaseDateSchema):
  id: int
  title: str

Rabbit MQ

How to use Consumer

import logging
import random
import time
import pika
from yodo1.rabbitmq.multi_thread import MultiThreadConsumer, MQAction, CallbackResult

# We can change pika log level to reduce logs.
logging.getLogger("pika").setLevel(logging.INFO)
logging.basicConfig(level="DEBUG")


def demo_callback(method_frame: pika.spec.Basic.Deliver,
                  header_frame: pika.spec.BasicProperties,
                  message_body: bytes) -> CallbackResult:
    """
    Demo callback function
    :param method_frame: method_frame from MQ Message
    :param header_frame: header_frame from MQ Message
    :param message_body: MQ Message body
    :return: whether should ack
    """
    logging.info(f"Received message in Queue: {method_frame.routing_key}, delivery_tag: {method_frame.delivery_tag}")
    time.sleep(30)
    if random.random() > 0.5:
        # Failed to process, should nack with `requeue=False`
        return CallbackResult(MQAction.ack)
    else:
        # Process success, should ack
        return CallbackResult(MQAction.nack)

consumer = MultiThreadConsumer(uri="amqps://xxxx",
                               verbose=True)
consumer.setup_queue_consumer(queue_name="test.consumer.a.debug",
                              exchange_name="target-exchange",
                              handler_function=demo_callback)

try:
    consumer.start_consuming()
except KeyboardInterrupt:
    consumer.stop_consuming()

consumer.close()

Consume MQ with apm enabled

import elasticapm
apm_client = elasticapm.Client(
      service_name="awesome-api",
      server_url="https://apm-host",
      secret_token="token",
      environment="test",
      service_version="2.0.0")

consumer = MultiThreadConsumer(uri="amqps://xxxx",
                               apm_client=apm_client,
                               verbose=True)

AsyncRabbit is Deprecated due to stability, will remove from version 0.3.0. Please use yodo1.rabbitmq.MultiThreadConsumer

How to use Sender

RabbitHttpSender is a thread safe sender with send MQ directly using the HTTP client.

import json
import aio_pika
from yodo1.rabbitmq import RabbitHttpSender

# Recommend to share one sender client for each app worker

# Init with URI
uri = "https://username:password@rabbit-host/virtualhost"
rabbit_sender = RabbitHttpSender(uri=uri)

# Make sure we have defined target queue and exchange relation on the startup
@app.on_event("startup")
async def startup_event() -> None:
    # Register the exchange.
    # We need to define the relation in the code
    rabbit_sender.declare_exchange(exchange_name="only-queue")


def do_some_magic_and_publish_to_exchange():
    do_magic()
    rabbit_sender.publish(
      exchange_name="exchange-1",
      message_body={"magic": "done"}
    )

def do_some_magic_and_publish_to_queue_withou_exchange():
    do_magic()
    # We can publish message directly to a queue using special exchange ""
    rabbit_sender.publish(
      exchange_name="",
      routing_key="target-queue-name",
      message_body={"magic": "done"}
    )

Send MQ with apm enabled

import elasticapm
apm_client = elasticapm.Client(
      service_name="awesome-api-consumer",
      server_url="https://apm-host",
      secret_token="token",
      environment="test",
      service_version="2.0.0")

# init sender with apm client
uri = "https://username:password@rabbit-host/virtualhost"
rabbit_sender = RabbitHttpSender(uri=uri)

rabbit_sender.publish(
      event_name="alian-found", # Must have a event name when using apm client
      exchange_name="",
      routing_key="target-queue-name",
      message_body={"magic": "done"}
    )

rabbit_sender.publish(
      event_name="alian-found", # Must have a event name when using apm client
      exchange_name="exchange-1",
      message_body={"magic": "done"}
    )

Send MQ with FastAPI apm enabled

from elasticapm.contrib.starlette import ElasticAPM, make_apm_client
from fastapi import FastAPI

apm_client = make_apm_client(
    {
        "SERVICE_NAME": "demo-api",
        "SECRET_TOKEN": "xxxx",
        "SERVER_URL": "https://apm-host",
        "ENVIRONMENT": "test",
        "SERVICE_VERSION": "1.0.0",
    }
)

app = FastAPI()
app.add_middleware(ElasticAPM, client=apm_client)

# init sender with apm client
uri = "https://username:password@rabbit-host/virtualhost"
rabbit_sender = RabbitHttpSender(uri=uri, apm_client=apm_client)

rabbit_sender.publish(
      event_name="alian-found", # Must have a event name when using apm client
      exchange_name="exchange-1",
      message_body={"magic": "done"}
    )

Progress Bar

A simple progress bar can display properly on k8s and Grafana.

import logging
from yodo1.progress import ProgressBar

logging.basicConfig(level="DEBUG")

p = ProgressBar(total=100, desc="Hacking ...", step=5)

for i in range(100):
    p.update()

This is the output

INFO:yodo1.progress: 12.0%  |>>>>>>                                            | 12/100 Hacking ...
INFO:yodo1.progress: 24.0%  |>>>>>>>>>>>>                                      | 24/100 Hacking ...
INFO:yodo1.progress: 36.0%  |>>>>>>>>>>>>>>>>>>                                | 36/100 Hacking ...
INFO:yodo1.progress: 48.0%  |>>>>>>>>>>>>>>>>>>>>>>>>                          | 48/100 Hacking ...
INFO:yodo1.progress: 60.0%  |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>                    | 60/100 Hacking ...
INFO:yodo1.progress: 72.0%  |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              | 72/100 Hacking ...
INFO:yodo1.progress: 84.0%  |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>        | 84/100 Hacking ...
INFO:yodo1.progress: 96.0%  |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  | 96/100 Hacking ...
INFO:yodo1.progress: 100.0% |>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>| 100/100 Hacking ...

Use it with ThreadPoolExecutor.

progress = ProgressBar(total=1000, desc="Processing... ")

with ThreadPoolExecutor(max_workers=20) as executor:
    for index, row in df.iterrows():
        future = executor.submit(do_something
                                 id=row.id)
        # Update progress bar when the job done
        future.add_done_callback(lambda x: progress.update())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yodo1-toolkit-0.3.9.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

yodo1_toolkit-0.3.9-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file yodo1-toolkit-0.3.9.tar.gz.

File metadata

  • Download URL: yodo1-toolkit-0.3.9.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for yodo1-toolkit-0.3.9.tar.gz
Algorithm Hash digest
SHA256 b6cf789fb81e8af9823aaa31793a140a156f5d0fa77a41527bfe5f22f7f588bb
MD5 df4cea56f5fad1512afca9c2a64a6dc5
BLAKE2b-256 d72094c46eb1a8046850fdaf0e34fab08454e0b6020633ac732dafb9782f5fdf

See more details on using hashes here.

File details

Details for the file yodo1_toolkit-0.3.9-py3-none-any.whl.

File metadata

File hashes

Hashes for yodo1_toolkit-0.3.9-py3-none-any.whl
Algorithm Hash digest
SHA256 0d915ebd19f6553b219f01e940686d7ba6766c721220887c4113676e0a3663a3
MD5 684cb6f16144b5a07b94aa89299cb286
BLAKE2b-256 7d7fb897bd279270c073a5cf785728cd4bf756dace70b73b9da21f58003d7b71

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page