Skip to main content

A wk register library for python

Project description

WK Register

The WK Register is a library for logging data to a Kafka topic.

Getting started

Configure your Kafka environment variables first:

  • BOOTSTRAP_SERVER (in server:port format)
  • SECURITY_PROTOCOL
  • SASL_MECHANISM
  • SASL_USERNAME
  • SAS_PASSWORD

Usage Example

To use the library, add the @record decorator to any function whose output you wish to send to Kafka, and include a record key in the function's return value as demonstrated below:

from wkregister.decorator import record

@record()
def add(a: float, b: float):

    return {"result": a + b, "record": Record}

The library processes the record key to ensure that a Record object is sent. The structure of the Record is defined as follows:

from dataclasses import dataclass, asdict
from uuid import uuid4
from datetime import datetime

@dataclass
class Records:
    org: str = ""
    key: str = ""
    userId: str = ""
    actionType: str = ""
    status: str = ""
    errorMessage: str = ""
    service: str = ""
    id: str = str(uuid.uuid4())
    timestamp: str = str(datetime.today())

    def dict(self):
        return {k: str(v) for k, v in asdict(self).items()}

Import and use Record like this:

from wkregister.decorator import record, Records
# or
from wkregister.models import Records

record = Records(
        org="testOrg",
        key="logs",
        userId="1",
        actionType="insert",
        status="success",
        errorMessage=None,
        service="pay-service",
    )

Complete Example

Here’s a comprehensive example:

from wkregister.decorator import record, Records
import asyncio

@record()
def add(a: float, b: float):

    record = Records(
        org="testorg",
        key="logs",
        userId="1",
        actionType="insert",
        status="success",
        errorMessage=None,
        service="pay-service",
    )
    return {"result": a + b, "record": record}


    result = asyncio.run(add(12, 12))

Fast API integration

Define Environment Variables

Start by defining the environment variables using Pydantic. This setup allows your FastAPI application to read configuration variables from environment files or the environment itself, ensuring sensitive credentials are not hard-coded into the application.

from pydantic import BaseSettings

class Settings(BaseSettings):
    BOOTSTRAP_SERVER: str
    SECURITY_PROTOCOL: str
    SASL_MECHANISM: str
    SASL_USERNAME: str
    SAS_PASSWORD: str

    class Config:
        env_file = ".env"  # Assumes you have a .env file in your project root

# Load settings from the environment
settings = Settings()

Add a Decorator for Logging

Import library and add decorator

from fastapi import FastAPI
from wkregister.decorator import record, Records

app = FastAPI()

@app.get("/")
@record()
def root():
    # Create a record object with relevant logging details
    record = Records(
        org="testorg",
        key="logs",
        userId="1",
        actionType="insert",
        status="success",
        errorMessage=None,
        service="pay-service",
    )
    # The record is returned for demonstration purposes
    return {"result": "ok", "record": record}

Utilize the Built-In Send Method

he send method is designed to forward records to a Kafka topic without requiring additional decorators. Below is an example that demonstrates how to use the send method with the Records model from the wkregister.models module. This example involves sending a record asynchronously:

from wkregister.models import Records
import asyncio

async def sendRecord():
    """
    This asynchronous function creates and sends a record to a Kafka topic.
    It constructs an instance of the Records class with predefined attributes and
    sends the instance using its built-in asynchronous send method.
    """
    # Creating an instance of the Records class
    record = Records(
        org="testorg",           # Organization identifier
        key="your-key",          # Unique key for the record
        userId="1",              # User ID associated with the record
        actionType="update",     # Type of action performed
        status="success",        # Status of the action
        errorMessage=None,       # Error message, if any (None if no errors)
        service="pay-service",   # Service associated with the record
        payload={"hola": "mundo"}  # Payload of the record (example data)
    )
    # Sending the record asynchronously to a Kafka topic
    await record.save()

# Running the asynchronous function using asyncio's event loop
asyncio.run(sendRecord())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wkregister-0.0.8.tar.gz (7.4 kB view details)

Uploaded Source

Built Distribution

wkregister-0.0.8-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file wkregister-0.0.8.tar.gz.

File metadata

  • Download URL: wkregister-0.0.8.tar.gz
  • Upload date:
  • Size: 7.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for wkregister-0.0.8.tar.gz
Algorithm Hash digest
SHA256 da1065ba8d182f26569c6e3b150b9e68489aac0304ac140511dcabd36c9cfad1
MD5 fe4cb612aca2915a77fcc60c7d6be7b7
BLAKE2b-256 9bcc68d219d8d9fdfed52c7c23a097b0ec31b9283813bb399f6da93a17371a2b

See more details on using hashes here.

File details

Details for the file wkregister-0.0.8-py3-none-any.whl.

File metadata

  • Download URL: wkregister-0.0.8-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for wkregister-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 8de30e5fa204a482f6b41e116bec50892b3da5501603c256a0601d2ee212164c
MD5 f4baca1c369ee99704b693213bf41794
BLAKE2b-256 1321cf001aaacb46127307caaef9fa0c580bf9c6afaea9952e260b4eef224472

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page