Skip to main content

A shared tools for other services

Project description

Qena shared lib

A shared tools for other services. It includes.

  • FastAPI app builder
  • A wrapper around fastapi to make it class based.
  • RabbitMQ utility class to listen , respond , publish and make rpc request.
  • Remote logging
    • Logstash utility class to log message in ecs ( elastic common schema ).
  • A simple task scheduler , to schedule task to run in specific time.
  • Background task runner.
  • Security tools ( password hasher , jwt , acl ).
  • IOC container to manager dependencies used across fastapi , rabbitmq manager and schedule manager.
  • Kafka producer and consumer wrapper.
  • Mongodb client wrapper , with repository and index manager.
  • Redis wrapper with cache and distributed lock manager.

Installation

It is prefered to use astral.sh / uv as a package manager.

$ uv add qena-shared-lib[all]

to install all extras , or specific extras kafka , rabbitmq , scheduler , security , redis or mongodb.

Usage

Environment variables

  • QENA_SHARED_LIB_LOGGING_LOGGER_NAME root logger name.
  • QENA_SHARED_LIB_SECURITY_UNAUTHORIZED_RESPONSE_CODE an integer response on an authorized access of resource.
  • QENA_SHARED_LIB_SECURITY_TOKEN_HEADER to header key for jwt token.

Http

To create fastapi app.

from qena_shared_lib.application import Builder, Environment


def main() -> FastAPI:
    builder = (
        Builder()
        .with_title("Qena shared lib")
        .with_description("A shared tools for other services.")
        .with_version("0.1.0")
        .with_environment(Environment.PRODUCTION)
        .with_default_exception_handlers()
    )

    app = builder.build()

    return app

To run app

$ uvicorn --factory main:main

Lifespan

from contextlib import asynccontextmanager

from fastapi import FastAPI


@asynccontextmanager
def lifespan(app: FastAPI):
    ...

    yield

    ...


def main() -> FastAPI:
    ...

    builder.with_lifespan(lifespan)

    ...

Dependencies

class EmailService:
    def __init__(self):
        ...


class Database:
    def __init__(self):
        ...


def main() -> FastAPI:
    ...

    builder.with_singleton(EmailService)
    builder.with_transient(Database)

    ...

Controllers

from qena_shared_lib.http import ControllerBase, api_controller, post


@api_controller("/users")
class UserController(ControllerBase):

    def __init__(self, email_service: EmailService):
        self._email_service = email_service

    @post()
    async def send_email(self, message: str):
        await self._email_service.send(message)


def main() -> FastAPI:
    ...

    builder.with_controllers(UserController)

    ...

Routers

from fastapi import APIRouter

from qena_shared_lib.dependencies.http import DependsOn


router = APIRouter(prefix="/auth")


@router.post("")
async def login(
    db: Annotated[Database, DependsOn(Database)],
    username: str,
    password: str
):
    ...


def main() -> FastAPI:
    ...

    builder.with_routers(router)

    ...

To enable metrics.

def main() -> FastAPI:
    ...

    builder.with_metrics()

    ...

Remote logging

Logstash

from qena_shared_lib.remotelogging import BaseRemoteLogSender
from qena_shared_lib.remotelogging.logstash import HTTPSender, # TCPSender


@asynccontextmanager
async def lifespan(app: FastAPI):
    remote_logger = get_service(BaseRemoteLogSender)

    await remote_logger.start()

    yield

    await remote_logger.stop()


def main() -> FastAPI:
    ...

    remote_logger = HTTPSender(
        service_name="qena-shared-lib",
        url="http://127.0.0.1:18080",
        user="logstash",
        password="logstash",
    )
    # or
    # remote_logger = TCPSender(
    #   service_name="qena-shared-lib",
    #   host="127.0.0.1",
    #   port=18090
    # )
    builder.with_singleton(
        service=BaseRemoteLogSender,
        instance=remote_logger,
    )

    ...


@router.get("")
def log_message(
    remote_logger: Annotated[
        BaseRemoteLogSender,
        DependsOn(BaseRemoteLogSender),
    ],
    message: str,
):
    remote_logger.info(message)

Rabbitmq

To create rabbitmq connection manager.

from qena_shared_lib.rabbitmq import ListenerBase, consume, consumer


@asynccontextmanager
async def lifespan(app: FastAPI):
    rabbitmq = get_service(RabbitMqManager)

    await rabbitmq.connect()

    yield

    rabbitmq.disconnect()


@consumer("UserQueue")
class UserConsumer(ListenerBase):

    def __init__(self, db: Database):
        self._db = db

    @consume()
    async def store_user(self, user: User):
        await self._db.save(user)


def main() -> FastAPI:
    ...

    rabbitmq = RabbitMqManager(
        remote_logger=remote_logger,
        container=builder.container,
    )

    rabbitmq.init_default_exception_handlers()
    rabbitmq.include_listener(UserConsumer)
    builder.add_singleton(
        service=RabbitMqManager,
        instance=rabbitmq,
    )

    ...

Publisher

@router.post("")
async def store_user(
    rabbitmq: Annotated[
        RabbitMqManager,
        DependsOn(RabbitMqManager)
    ],
    user: User,
)
    publisher = rabbitmq.publisher("UserQueue")

    await publisher.publish(user)
    # await publisher.publish_as_arguments(user)

RPC client

@router.get("")
async def get_user(
    rabbitmq: Annotated[
        RabbitMqManager,
        DependsOn(RabbitMqManager)
    ],
    user_id: str,
)
    rpc_client = rabbitmq.rpc_client("UserQueue")

    user = await rpc_client.call(user_id)
    # user = await rpc_client.call_with_arguments(user_id)

    return user

Flow control

from qena_shared_lib.rabbitmq import ... , ListenerContext


@consumer("UserQueue")
class UserConsumer(ListenerBase):

    @consume()
    async def store_user(self, ctx: ListenerContext, user: User):
        ...

        await ctx.flow_control.request(10)

        ...

Rpc reply

Optionally it is possible to reply to rpc calls, through.

from qena_shared_lib.rabbitmq import ... , rpc_worker


@rpc_worker("UserQueue")
class UserWorker(ListenerBase):

    @execute()
    async def store_user(self, ctx: ListenerContext, user: User):
        ...

        await ctx.rpc_reply.reply("Done")

        ...

Retry consumer

Consumer can retry to consumer a message in an event of failure.

from qena_shared_lib.rabbitmq import (
    BackoffRetryDelay,
    FixedRetryDelay,
    RabbitMqManager,
    RetryDelayJitter,
    RetryPolicy,
)


@consumer(
    queue="UserQueue",
    # can be defined for consumer of specific queue
    retry_policy=RetryPolicy(
        exceptions=(AMQPError,),
        max_retry=5,
        retry_delay_strategy=FixedRetryDelay(
            retry_delay=2
        ),
        retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
    )
)
class UserConsumer(ListenerBase):

    @consume(
        # for specific target
        retry_policy=RetryPolicy(
            exceptions=(AMQPError,),
            max_retry=5,
            retry_delay_strategy=FixedRetryDelay(
                retry_delay=2
            ),
            retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
        )
    )
    async def store_user(self, ctx: ListenerContext, user: User):
        ...

        await ctx.flow_control.request(10)

        ...


def main() -> FastAPI:
    ...

    rabbitmq = RabbitMqManager(
        remote_logger=remote_logger,
        container=builder.container,
        # or globally for all consumers
        listener_global_retry_policy=RetryPolicy(
            exceptions=(AMQPError,),
            max_retry=10,
            retry_delay_strategy=BackoffRetryDelay(
                multiplier=1.5, min=2, max=10
            ),
            retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
            match_by_cause=True,
        ),
    )

    rabbitmq.include_listener(UserConsumer)
    builder.add_singleton(
        service=RabbitMqManager,
        instance=rabbitmq,
    )

Scheduler

from qena_shared_lib.scheduler import (
    ScheduleManager,
    # Scheduler,
    SchedulerBase,
    schedule,
    scheduler,
)


@asynccontextmanager
async def lifespan(app: FastAPI):
    schedule_manager = get_service(ScheduleManager)

    rabbitmq.start()

    yield

    schedule_manager.stop()


@scheduler()
class TaskScheduler(SchedulerBase):

    def __init__(self, db: Database)

    @schedule("* * * * *")
    def do_task(
        self,

    ):
        ...
# or
# scheduler = Scheduler()

# @scheduler.schedule("* * * * *")
# def do_task(
#     db: Annotated[Database, DependsOn(Database)]
# ):
#     ...


def main() -> FastAPI:
    ...

    schedule_manager = ScheduleManager(
        remote_logger=remote_logger,
        container=builder.container
    )

    schedule_manager.include_scheduler(TaskScheduler)
    builder.with_singleton(
        service=ScheduleManager,
        instance=schedule_manager,
    )

    ...

Background

from qena_shared_lib.background import Background


@asynccontextmanager
async def lifespan(app: FastAPI):
    background = get_service(Background)

    background.start()

    yield

    background.stop()


def main() -> FastAPI:
    ...

    builder.with_singleton(
        service=BaseRemoteLogSender,
        instance=remote_logger,
    )
    builder.with_singleton(Background)

    ...


async def data_processor(data: Data):
    ...


@router.get("")
async def process_data(
    background: Annotated[
        Background,
        DependsOne(Background)
    ],
    data: Data
)
    background.add_task(BackgroundTask(data_processor, data))

Security

Password hasher

from qena_shared_lib.security import PasswordHasher


@api_controller("/users")
class UserController(ControllerBase):

    def __init__(self, password_hasher: PasswordHasher):
        self._password_hasher = password_hasher

    @post()
    async def signup(self, user: User):
        await self._password_hasher.hash(user.password)

    @post()
    async def login(self, user: User):
        await self._password_hasher.verify(user.password)


def main() -> FastAPI:
    ...

    builder.with_singleton(PasswordHasher)
    builder.with_controllers([
        UserController
    ])

    ...

JWT

from qena_shared_lib.security import JwtAdapter


@ApiController("/users")
class UserController(ControllerBase):

    def __init__(
        self,

        ...

        jwt: JwtAdapter,
    ):
        ...

        self._jwt = jwt

    @post()
    async def login(self, user: User):
        payload = { ... }

        await self._jwt.encode(payload)

    @post
    async def verifiy(self, token: str):
        await self._jwt.decode(token)


def main() -> FastAPI:
    ...

    builder.with_singleton(JwtAdapter)
    builder.with_controllers([
        UserController
    ])

    ...

ACL

from qena_shared_lib.security import Authorization


@api_controller("/users")
class UserController(ControllerBase):

    @post()
    async def get_user(
        self,
        user: Annotated[
            UserInfo,
            Authorization(
                user_type="ADMIN",
                persmissions=[
                    "READ"
                ],
            )
        ]
    ):
        ...


@router.get("")
async def get_users(
    user: Annotated[
        UserInfo,
        Authorization("ADMIN")
    ]
)
    ...

Kafka

from qena_shared_lib.kafka import KafkaManager


@asynccontextmanager
async def lifespan(app: FastAPI):
    kafka = get_service(KafkaManager)

    await kafka.connect()

    yield

    await kafka.disconnect()


def main() -> FastAPI:
    ...

    kafka = KafkaManager(
        remote_logger=...,
        bootstrap_servers="127.0.0.1:9092",
    )

    builder.with_singleton(
        service=KafkaManager,
        instance=kafka,
    )

    ...

Producer

class UserService:
    def __init__(self, kafka: KafkaManager):
        self._kafka = kafka

    async def create_user(self):
        ...

        async with await self._kafka.producer("user") as producer:
            await producer.send(key="some_key", value=user)

        ...

Consumer

from qena_shared_lib.kafka import (
    ConsumerBase,
    consume,
    consumer,
)


@consumer(["user"])
class USerConsumer(ConsumerBase):
    def __init__(self, user_service: UserService):
        self._user_service = user_service

    @consume()
    async def user_created(self, key: Any | None, value: Any | None):
        ...

        await self._user_service.create_user(...)

        ...

Mongodb

from qena_shared_lib.mongodb import MongoDBManager


@asynccontextmanager
async def lifespan(app: FastAPI):
    db = get_service(MongoDBManager)

    await db.connect()

    yield

    await db.disconnect()


def main() -> FastAPI:
    ...

    db = MongoDBManager(
        connection_string="mongodb://127.0.0.1:27017",
        db="userDb"
    )

    builder.with_singleton(
        service=MongoDBManager,
        instance=db
    )

    ...

Crud

from qena_shared_lib.mongodb import (
    Document,
    MongoDBManager,
    ProjectedDocument,
    RepositoryBase,
)


class User(Document):
    __collection_name__ = "users"

    full_name: str
    phone: str


class FullNameProjectedUser(ProjectedDocument):
    full_name: str


class UserRepository(RepositoryBase[User]):
    pass


class UserService:
    def __init__(self, user_repository: UserRepository):
        self._user_repository = user_repository

    async def add_user(self):
        await self._user_repository.insert(
            User(
                full_name="user one",
                phone="+251900000000"
            )
        )

    async def get_user(self):
        user = await self._user_repository.find_by_filter(
            filter={"phone": "+251900000000"}
        )

    async def get_user_fullname(self):
        user = await self._user_repository.find_by_filter(
            filter={"phone": "+251900000000"}, projection=FullNameProjectedUser
        )

    async def update_user(self):
        user = await self._user_repository.find_by_filter(
            filter={"phone": "+251900000000"}, projection
        )
        user.phone = "+251900000001"

        await user_repository.replace(user)

Aggregation

class AggregatedUser(AggregatedDocument):
    __pipeline__ = [
        {"$match": {"phone": {"$in": ["+251900000000", "+251900000001"]}}},
        {"$project": {"fullName": True}},
    ]

    full_name: str

class UserService:
    ...

    async def get_user_fullnames(self):
        users = user_repository.aggregate(aggregation=AggregatedUser)

    ...

Index

from qena_shared_lib.mongodb import Document, IndexManager, IndexModel


class User(Document):
    __collection_name__ = "users"
    __indexes__ = [IndexModel("phone")]

    full_name: str
    phone: str


async def manage_indexes():
    ...

    index_manager = IndexManager(db=db, documents=[User])

    await index_manager.create_indexes

    ...

    await index_manager.drop_indexes()

    ...

Redis

from qena_shared_lib.redis import RedisManager


@asynccontextmanager
async def lifespan(app: FastAPI):
    redis_manager = get_service(RedisManager)

    await dredis_managerb.connect()

    yield

    await redis_manager.disconnect()


def main() -> FastAPI:
    ...

    redis_manager = RedisManager("redis://127.0.0.1:6379")

    builder.with_singleton(
        service=RedisManager,
        instance=redis_manager
    )

    ...

Cache

from qena_shared_lib.cache import CachedObject, CacheManager


def main() -> FastAPI:
    ...

    cache_manager = CacheManager()

    redis_manager.add(cache_manager)
    builder.with_singleton(
        service=CacheManager,
        instance=cache_manager
    )

    ...


class UserCache(CachedObject):
    full_name: str


class UserService:
    def __init__(self, cache_manager: CacheManager):
        self._cache_manager = cache_manager

    async def cache_user(self):
        await self._cache_manager.set(
            UserCache(full_name="user one")
        )

    async def get_cached_user(self):
        user_cache = await self._cache_manager.get(UserCache)

    async def unset_cached_user(self):
        await self._cache_manager.unset(UserCache)

Distribute lock

from qena_shared_lib.sync import DistributedLockManager


def main() -> FastAPI:
    ...

    distributed_lock_manager = DistributedLockManager()

    redis_manager.add(distributed_lock_manager)
    builder.with_singleton(
        service=DistributedLockManager,
        instance=distributed_lock_manager
    )

    ...


class UserService:
    def __init__(self, distributed_lock_manager: DistributedLockManager):
        self._distributed_lock_manager = distributed_lock_manager

    async def create_user(self):
        async with self._distributed_lock_manager("user_one_create") as _:
            ...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qena_shared_lib-0.1.23.tar.gz (55.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qena_shared_lib-0.1.23-py3-none-any.whl (74.9 kB view details)

Uploaded Python 3

File details

Details for the file qena_shared_lib-0.1.23.tar.gz.

File metadata

  • Download URL: qena_shared_lib-0.1.23.tar.gz
  • Upload date:
  • Size: 55.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for qena_shared_lib-0.1.23.tar.gz
Algorithm Hash digest
SHA256 9ba177cbf05e1d8599c27fb954fd566b80ee7b246fbba821364ea4e445147eab
MD5 5bcef878ccd553ad3ca8d49d81791c50
BLAKE2b-256 7337840bfade30df9c5424a6394a99149a67a786e284bed494a175c65a39f10d

See more details on using hashes here.

File details

Details for the file qena_shared_lib-0.1.23-py3-none-any.whl.

File metadata

File hashes

Hashes for qena_shared_lib-0.1.23-py3-none-any.whl
Algorithm Hash digest
SHA256 54da79f410bb9646ece6e5073f6e715a70c8ffbedacfcaacf72871a4f1c4357c
MD5 877bbf0ba9f20447ca28a504d380f264
BLAKE2b-256 1171b67353a585065a365aac5202172bdfdf0b78e7193bfe58e89c4da82e89dd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page