A wk register library for python
Project description
WK Register
The WK Register is a library for logging data to a Kafka topic.
Getting started
Configure your Kafka environment variables first:
- BOOTSTRAP_SERVER (in server:port format)
- SECURITY_PROTOCOL
- SASL_MECHANISM
- SASL_USERNAME
- SAS_PASSWORD
Usage Example
To use the library, add the @record decorator to any function whose output you wish to send to Kafka, and include a record key in the function's return value as demonstrated below:
from wkregister.decorator import record
@record()
def add(a: float, b: float):
return {"result": a + b, "record": Record}
The library processes the record key to ensure that a Record object is sent. The structure of the Record is defined as follows:
from dataclasses import dataclass, asdict
from uuid import uuid4
from datetime import datetime
@dataclass
class Records:
org: str = ""
key: str = ""
userId: str = ""
actionType: str = ""
status: str = ""
errorMessage: str = ""
service: str = ""
id: str = str(uuid.uuid4())
timestamp: str = str(datetime.today())
def dict(self):
return {k: str(v) for k, v in asdict(self).items()}
Import and use Record like this:
from wkregister.decorator import record, Records
# or
from wkregister.models import Records
record = Records(
org="testOrg",
key="logs",
userId="1",
actionType="insert",
status="success",
errorMessage=None,
service="pay-service",
)
Complete Example
Here’s a comprehensive example:
from wkregister.decorator import record, Records
import asyncio
@record()
def add(a: float, b: float):
record = Records(
org="testorg",
key="logs",
userId="1",
actionType="insert",
status="success",
errorMessage=None,
service="pay-service",
)
return {"result": a + b, "record": record}
result = asyncio.run(add(12, 12))
Fast API integration
Define Environment Variables
Start by defining the environment variables using Pydantic. This setup allows your FastAPI application to read configuration variables from environment files or the environment itself, ensuring sensitive credentials are not hard-coded into the application.
from pydantic import BaseSettings
class Settings(BaseSettings):
BOOTSTRAP_SERVER: str
SECURITY_PROTOCOL: str
SASL_MECHANISM: str
SASL_USERNAME: str
SAS_PASSWORD: str
class Config:
env_file = ".env" # Assumes you have a .env file in your project root
# Load settings from the environment
settings = Settings()
Add a Decorator for Logging
Import library and add decorator
from fastapi import FastAPI
from wkregister.decorator import record, Records
app = FastAPI()
@app.get("/")
@record()
def root():
# Create a record object with relevant logging details
record = Records(
org="testorg",
key="logs",
userId="1",
actionType="insert",
status="success",
errorMessage=None,
service="pay-service",
)
# The record is returned for demonstration purposes
return {"result": "ok", "record": record}
Utilize the Built-In Send Method
he send method is designed to forward records to a Kafka topic without requiring additional decorators. Below is an example that demonstrates how to use the send method with the Records model from the wkregister.models module. This example involves sending a record asynchronously:
from wkregister.models import Records
import asyncio
async def sendRecord():
"""
This asynchronous function creates and sends a record to a Kafka topic.
It constructs an instance of the Records class with predefined attributes and
sends the instance using its built-in asynchronous send method.
"""
# Creating an instance of the Records class
record = Records(
org="testorg", # Organization identifier
key="your-key", # Unique key for the record
userId="1", # User ID associated with the record
actionType="update", # Type of action performed
status="success", # Status of the action
errorMessage=None, # Error message, if any (None if no errors)
service="pay-service", # Service associated with the record
payload={"hola": "mundo"} # Payload of the record (example data)
)
# Sending the record asynchronously to a Kafka topic
await record.save()
# Running the asynchronous function using asyncio's event loop
asyncio.run(sendRecord())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file wkregister-0.0.8.tar.gz
.
File metadata
- Download URL: wkregister-0.0.8.tar.gz
- Upload date:
- Size: 7.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | da1065ba8d182f26569c6e3b150b9e68489aac0304ac140511dcabd36c9cfad1 |
|
MD5 | fe4cb612aca2915a77fcc60c7d6be7b7 |
|
BLAKE2b-256 | 9bcc68d219d8d9fdfed52c7c23a097b0ec31b9283813bb399f6da93a17371a2b |
File details
Details for the file wkregister-0.0.8-py3-none-any.whl
.
File metadata
- Download URL: wkregister-0.0.8-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8de30e5fa204a482f6b41e116bec50892b3da5501603c256a0601d2ee212164c |
|
MD5 | f4baca1c369ee99704b693213bf41794 |
|
BLAKE2b-256 | 1321cf001aaacb46127307caaef9fa0c580bf9c6afaea9952e260b4eef224472 |