A shared tools for other services
Project description
Qena shared lib
A shared tools for other services. It includes.
- FastAPI app builder
- A wrapper around fastapi to make it class based.
- RabbitMQ utility class to listen , respond , publish and make rpc request.
- Remote logging
- Logstash utility class to log message in
ecs( elastic common schema ).
- Logstash utility class to log message in
- A simple task scheduler , to schedule task to run in specific time.
- Background task runner.
- Security tools ( password hasher , jwt , acl ).
- IOC container to manager dependencies used across fastapi , rabbitmq manager and schedule manager.
- Kafka producer and consumer wrapper.
- Mongodb client wrapper , with repository and index manager.
- Redis wrapper with cache and distributed lock manager.
Installation
It is prefered to use astral.sh / uv as a package manager.
$ uv add qena-shared-lib[all]
to install all extras , or specific extras kafka , rabbitmq , scheduler , security , redis or mongodb.
Usage
- Environment variables
- Http
- Lifespan
- Dependencies
- Controllers
- Routers
- Remote logging
- Rabbitmq
- Scheduler
- Background
- Security
- Kafka
- Mongodb
- Redis
Environment variables
QENA_SHARED_LIB_LOGGING_LOGGER_NAMEroot logger name.QENA_SHARED_LIB_SECURITY_UNAUTHORIZED_RESPONSE_CODEan integer response on an authorized access of resource.QENA_SHARED_LIB_SECURITY_TOKEN_HEADERto header key for jwt token.
Http
To create fastapi app.
from qena_shared_lib.application import Builder, Environment
def main() -> FastAPI:
builder = (
Builder()
.with_title("Qena shared lib")
.with_description("A shared tools for other services.")
.with_version("0.1.0")
.with_environment(Environment.PRODUCTION)
.with_default_exception_handlers()
)
app = builder.build()
return app
To run app
$ uvicorn --factory main:main
Lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
def lifespan(app: FastAPI):
...
yield
...
def main() -> FastAPI:
...
builder.with_lifespan(lifespan)
...
Dependencies
class EmailService:
def __init__(self):
...
class Database:
def __init__(self):
...
def main() -> FastAPI:
...
builder.with_singleton(EmailService)
builder.with_transient(Database)
...
Controllers
from qena_shared_lib.http import ControllerBase, api_controller, post
@api_controller("/users")
class UserController(ControllerBase):
def __init__(self, email_service: EmailService):
self._email_service = email_service
@post()
async def send_email(self, message: str):
await self._email_service.send(message)
def main() -> FastAPI:
...
builder.with_controllers(UserController)
...
Routers
from fastapi import APIRouter
from qena_shared_lib.dependencies.http import DependsOn
router = APIRouter(prefix="/auth")
@router.post("")
async def login(
db: Annotated[Database, DependsOn(Database)],
username: str,
password: str
):
...
def main() -> FastAPI:
...
builder.with_routers(router)
...
To enable metrics.
def main() -> FastAPI:
...
builder.with_metrics()
...
Remote logging
Logstash
from qena_shared_lib.remotelogging import BaseRemoteLogSender
from qena_shared_lib.remotelogging.logstash import HTTPSender, # TCPSender
@asynccontextmanager
async def lifespan(app: FastAPI):
remote_logger = get_service(BaseRemoteLogSender)
await remote_logger.start()
yield
await remote_logger.stop()
def main() -> FastAPI:
...
remote_logger = HTTPSender(
service_name="qena-shared-lib",
url="http://127.0.0.1:18080",
user="logstash",
password="logstash",
)
# or
# remote_logger = TCPSender(
# service_name="qena-shared-lib",
# host="127.0.0.1",
# port=18090
# )
builder.with_singleton(
service=BaseRemoteLogSender,
instance=remote_logger,
)
...
@router.get("")
def log_message(
remote_logger: Annotated[
BaseRemoteLogSender,
DependsOn(BaseRemoteLogSender),
],
message: str,
):
remote_logger.info(message)
Rabbitmq
To create rabbitmq connection manager.
from qena_shared_lib.rabbitmq import ListenerBase, consume, consumer
@asynccontextmanager
async def lifespan(app: FastAPI):
rabbitmq = get_service(RabbitMqManager)
await rabbitmq.connect()
yield
rabbitmq.disconnect()
@consumer("UserQueue")
class UserConsumer(ListenerBase):
def __init__(self, db: Database):
self._db = db
@consume()
async def store_user(self, user: User):
await self._db.save(user)
def main() -> FastAPI:
...
rabbitmq = RabbitMqManager(
remote_logger=remote_logger,
container=builder.container,
)
rabbitmq.init_default_exception_handlers()
rabbitmq.include_listener(UserConsumer)
builder.add_singleton(
service=RabbitMqManager,
instance=rabbitmq,
)
...
Publisher
@router.post("")
async def store_user(
rabbitmq: Annotated[
RabbitMqManager,
DependsOn(RabbitMqManager)
],
user: User,
)
publisher = rabbitmq.publisher("UserQueue")
await publisher.publish(user)
# await publisher.publish_as_arguments(user)
RPC client
@router.get("")
async def get_user(
rabbitmq: Annotated[
RabbitMqManager,
DependsOn(RabbitMqManager)
],
user_id: str,
)
rpc_client = rabbitmq.rpc_client("UserQueue")
user = await rpc_client.call(user_id)
# user = await rpc_client.call_with_arguments(user_id)
return user
Flow control
from qena_shared_lib.rabbitmq import ... , ListenerContext
@consumer("UserQueue")
class UserConsumer(ListenerBase):
@consume()
async def store_user(self, ctx: ListenerContext, user: User):
...
await ctx.flow_control.request(10)
...
Rpc reply
Optionally it is possible to reply to rpc calls, through.
from qena_shared_lib.rabbitmq import ... , rpc_worker
@rpc_worker("UserQueue")
class UserWorker(ListenerBase):
@execute()
async def store_user(self, ctx: ListenerContext, user: User):
...
await ctx.rpc_reply.reply("Done")
...
Retry consumer
Consumer can retry to consumer a message in an event of failure.
from qena_shared_lib.rabbitmq import (
BackoffRetryDelay,
FixedRetryDelay,
RabbitMqManager,
RetryDelayJitter,
RetryPolicy,
)
@consumer(
queue="UserQueue",
# can be defined for consumer of specific queue
retry_policy=RetryPolicy(
exceptions=(AMQPError,),
max_retry=5,
retry_delay_strategy=FixedRetryDelay(
retry_delay=2
),
retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
)
)
class UserConsumer(ListenerBase):
@consume(
# for specific target
retry_policy=RetryPolicy(
exceptions=(AMQPError,),
max_retry=5,
retry_delay_strategy=FixedRetryDelay(
retry_delay=2
),
retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
)
)
async def store_user(self, ctx: ListenerContext, user: User):
...
await ctx.flow_control.request(10)
...
def main() -> FastAPI:
...
rabbitmq = RabbitMqManager(
remote_logger=remote_logger,
container=builder.container,
# or globally for all consumers
listener_global_retry_policy=RetryPolicy(
exceptions=(AMQPError,),
max_retry=10,
retry_delay_strategy=BackoffRetryDelay(
multiplier=1.5, min=2, max=10
),
retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
match_by_cause=True,
),
)
rabbitmq.include_listener(UserConsumer)
builder.add_singleton(
service=RabbitMqManager,
instance=rabbitmq,
)
Scheduler
from qena_shared_lib.scheduler import (
ScheduleManager,
# Scheduler,
SchedulerBase,
schedule,
scheduler,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
schedule_manager = get_service(ScheduleManager)
rabbitmq.start()
yield
schedule_manager.stop()
@scheduler()
class TaskScheduler(SchedulerBase):
def __init__(self, db: Database)
@schedule("* * * * *")
def do_task(
self,
):
...
# or
# scheduler = Scheduler()
# @scheduler.schedule("* * * * *")
# def do_task(
# db: Annotated[Database, DependsOn(Database)]
# ):
# ...
def main() -> FastAPI:
...
schedule_manager = ScheduleManager(
remote_logger=remote_logger,
container=builder.container
)
schedule_manager.include_scheduler(TaskScheduler)
builder.with_singleton(
service=ScheduleManager,
instance=schedule_manager,
)
...
Background
from qena_shared_lib.background import Background
@asynccontextmanager
async def lifespan(app: FastAPI):
background = get_service(Background)
background.start()
yield
background.stop()
def main() -> FastAPI:
...
builder.with_singleton(
service=BaseRemoteLogSender,
instance=remote_logger,
)
builder.with_singleton(Background)
...
async def data_processor(data: Data):
...
@router.get("")
async def process_data(
background: Annotated[
Background,
DependsOne(Background)
],
data: Data
)
background.add_task(BackgroundTask(data_processor, data))
Security
Password hasher
from qena_shared_lib.security import PasswordHasher
@api_controller("/users")
class UserController(ControllerBase):
def __init__(self, password_hasher: PasswordHasher):
self._password_hasher = password_hasher
@post()
async def signup(self, user: User):
await self._password_hasher.hash(user.password)
@post()
async def login(self, user: User):
await self._password_hasher.verify(user.password)
def main() -> FastAPI:
...
builder.with_singleton(PasswordHasher)
builder.with_controllers([
UserController
])
...
JWT
from qena_shared_lib.security import JwtAdapter
@ApiController("/users")
class UserController(ControllerBase):
def __init__(
self,
...
jwt: JwtAdapter,
):
...
self._jwt = jwt
@post()
async def login(self, user: User):
payload = { ... }
await self._jwt.encode(payload)
@post
async def verifiy(self, token: str):
await self._jwt.decode(token)
def main() -> FastAPI:
...
builder.with_singleton(JwtAdapter)
builder.with_controllers([
UserController
])
...
ACL
from qena_shared_lib.security import Authorization
@api_controller("/users")
class UserController(ControllerBase):
@post()
async def get_user(
self,
user: Annotated[
UserInfo,
Authorization(
user_type="ADMIN",
persmissions=[
"READ"
],
)
]
):
...
@router.get("")
async def get_users(
user: Annotated[
UserInfo,
Authorization("ADMIN")
]
)
...
Kafka
from qena_shared_lib.kafka import KafkaManager
@asynccontextmanager
async def lifespan(app: FastAPI):
kafka = get_service(KafkaManager)
await kafka.connect()
yield
await kafka.disconnect()
def main() -> FastAPI:
...
kafka = KafkaManager(
remote_logger=...,
bootstrap_servers="127.0.0.1:9092",
)
builder.with_singleton(
service=KafkaManager,
instance=kafka,
)
...
Producer
class UserService:
def __init__(self, kafka: KafkaManager):
self._kafka = kafka
async def create_user(self):
...
async with await self._kafka.producer("user") as producer:
await producer.send(key="some_key", value=user)
...
Consumer
from qena_shared_lib.kafka import (
ConsumerBase,
consume,
consumer,
)
@consumer(["user"])
class USerConsumer(ConsumerBase):
def __init__(self, user_service: UserService):
self._user_service = user_service
@consume()
async def user_created(self, key: Any | None, value: Any | None):
...
await self._user_service.create_user(...)
...
Mongodb
from qena_shared_lib.mongodb import MongoDBManager
@asynccontextmanager
async def lifespan(app: FastAPI):
db = get_service(MongoDBManager)
await db.connect()
yield
await db.disconnect()
def main() -> FastAPI:
...
db = MongoDBManager(
connection_string="mongodb://127.0.0.1:27017",
db="userDb"
)
builder.with_singleton(
service=MongoDBManager,
instance=db
)
...
Crud
from qena_shared_lib.mongodb import (
Document,
MongoDBManager,
ProjectedDocument,
RepositoryBase,
)
class User(Document):
__collection_name__ = "users"
full_name: str
phone: str
class FullNameProjectedUser(ProjectedDocument):
full_name: str
class UserRepository(RepositoryBase[User]):
pass
class UserService:
def __init__(self, user_repository: UserRepository):
self._user_repository = user_repository
async def add_user(self):
await self._user_repository.insert(
User(
full_name="user one",
phone="+251900000000"
)
)
async def get_user(self):
user = await self._user_repository.find_by_filter(
filter={"phone": "+251900000000"}
)
async def get_user_fullname(self):
user = await self._user_repository.find_by_filter(
filter={"phone": "+251900000000"}, projection=FullNameProjectedUser
)
async def update_user(self):
user = await self._user_repository.find_by_filter(
filter={"phone": "+251900000000"}, projection
)
user.phone = "+251900000001"
await user_repository.replace(user)
Aggregation
class AggregatedUser(AggregatedDocument):
__pipeline__ = [
{"$match": {"phone": {"$in": ["+251900000000", "+251900000001"]}}},
{"$project": {"fullName": True}},
]
full_name: str
class UserService:
...
async def get_user_fullnames(self):
users = user_repository.aggregate(aggregation=AggregatedUser)
...
Index
from qena_shared_lib.mongodb import Document, IndexManager, IndexModel
class User(Document):
__collection_name__ = "users"
__indexes__ = [IndexModel("phone")]
full_name: str
phone: str
async def manage_indexes():
...
index_manager = IndexManager(db=db, documents=[User])
await index_manager.create_indexes
...
await index_manager.drop_indexes()
...
Redis
from qena_shared_lib.redis import RedisManager
@asynccontextmanager
async def lifespan(app: FastAPI):
redis_manager = get_service(RedisManager)
await dredis_managerb.connect()
yield
await redis_manager.disconnect()
def main() -> FastAPI:
...
redis_manager = RedisManager("redis://127.0.0.1:6379")
builder.with_singleton(
service=RedisManager,
instance=redis_manager
)
...
Cache
from qena_shared_lib.cache import CachedObject, CacheManager
def main() -> FastAPI:
...
cache_manager = CacheManager()
redis_manager.add(cache_manager)
builder.with_singleton(
service=CacheManager,
instance=cache_manager
)
...
class UserCache(CachedObject):
full_name: str
class UserService:
def __init__(self, cache_manager: CacheManager):
self._cache_manager = cache_manager
async def cache_user(self):
await self._cache_manager.set(
UserCache(full_name="user one")
)
async def get_cached_user(self):
user_cache = await self._cache_manager.get(UserCache)
async def unset_cached_user(self):
await self._cache_manager.unset(UserCache)
Distribute lock
from qena_shared_lib.sync import DistributedLockManager
def main() -> FastAPI:
...
distributed_lock_manager = DistributedLockManager()
redis_manager.add(distributed_lock_manager)
builder.with_singleton(
service=DistributedLockManager,
instance=distributed_lock_manager
)
...
class UserService:
def __init__(self, distributed_lock_manager: DistributedLockManager):
self._distributed_lock_manager = distributed_lock_manager
async def create_user(self):
async with self._distributed_lock_manager("user_one_create") as _:
...
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters