Skip to main content

A wk register library for python

Project description

WK Register

The WK Register is a library for logging data to a Kafka topic.

Getting started

Configure your Kafka environment variables first:

  • BOOTSTRAP_SERVER (in server:port format)
  • SECURITY_PROTOCOL
  • SASL_MECHANISM
  • SASL_USERNAME
  • SAS_PASSWORD

Usage Example

To use the library, add the @record decorator to any function whose output you wish to send to Kafka, and include a record key in the function's return value as demonstrated below:

from wkregister.decorator import record

@record()
def add(a: float, b: float):

    return {"result": a + b, "record": Record}

The library processes the record key to ensure that a Record object is sent. The structure of the Record is defined as follows:

from dataclasses import dataclass, asdict
from uuid import uuid4
from datetime import datetime

@dataclass
class Records:
    org: str = ""
    key: str = ""
    userId: str = ""
    actionType: str = ""
    status: str = ""
    errorMessage: str = ""
    service: str = ""
    id: str = str(uuid.uuid4())
    timestamp: str = str(datetime.today())

    def dict(self):
        return {k: str(v) for k, v in asdict(self).items()}

Import and use Record like this:

from wkregister.decorator import record, Records
# or
from wkregister.models import Records

record = Records(
        org="testOrg",
        key="logs",
        userId="1",
        actionType="insert",
        status="success",
        errorMessage=None,
        service="pay-service",
    )

Complete Example

Here’s a comprehensive example:

from wkregister.decorator import record, Records
import asyncio

@record()
def add(a: float, b: float):

    record = Records(
        org="testorg",
        key="logs",
        userId="1",
        actionType="insert",
        status="success",
        errorMessage=None,
        service="pay-service",
    )
    return {"result": a + b, "record": record}


    result = asyncio.run(add(12, 12))

Fast API integration

Define Environment Variables

Start by defining the environment variables using Pydantic. This setup allows your FastAPI application to read configuration variables from environment files or the environment itself, ensuring sensitive credentials are not hard-coded into the application.

from pydantic import BaseSettings

class Settings(BaseSettings):
    BOOTSTRAP_SERVER: str
    SECURITY_PROTOCOL: str
    SASL_MECHANISM: str
    SASL_USERNAME: str
    SAS_PASSWORD: str

    class Config:
        env_file = ".env"  # Assumes you have a .env file in your project root

# Load settings from the environment
settings = Settings()

Add a Decorator for Logging

Import library and add decorator

from fastapi import FastAPI
from wkregister.decorator import record, Records

app = FastAPI()

@app.get("/")
@record()
def root():
    # Create a record object with relevant logging details
    record = Records(
        org="testorg",
        key="logs",
        userId="1",
        actionType="insert",
        status="success",
        errorMessage=None,
        service="pay-service",
    )
    # The record is returned for demonstration purposes
    return {"result": "ok", "record": record}

Utilize the Built-In Send Method

he send method is designed to forward records to a Kafka topic without requiring additional decorators. Below is an example that demonstrates how to use the send method with the Records model from the wkregister.models module. This example involves sending a record asynchronously:

from wkregister.models import Records
import asyncio

async def sendRecord():
    """
    This asynchronous function creates and sends a record to a Kafka topic.
    It constructs an instance of the Records class with predefined attributes and
    sends the instance using its built-in asynchronous send method.
    """
    # Creating an instance of the Records class
    record = Records(
        org="testorg",           # Organization identifier
        key="your-key",          # Unique key for the record
        userId="1",              # User ID associated with the record
        actionType="update",     # Type of action performed
        status="success",        # Status of the action
        errorMessage=None,       # Error message, if any (None if no errors)
        service="pay-service",   # Service associated with the record
        payload={"hola": "mundo"}  # Payload of the record (example data)
    )
    # Sending the record asynchronously to a Kafka topic
    await record.send()

# Running the asynchronous function using asyncio's event loop
asyncio.run(sendRecord())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wkregister-0.0.7.tar.gz (7.4 kB view details)

Uploaded Source

Built Distribution

wkregister-0.0.7-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file wkregister-0.0.7.tar.gz.

File metadata

  • Download URL: wkregister-0.0.7.tar.gz
  • Upload date:
  • Size: 7.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for wkregister-0.0.7.tar.gz
Algorithm Hash digest
SHA256 e6460e57cbfb052cd2e260d80c2945bb17d31b980b018ec8ae8ae3d4a78be2d8
MD5 7b7a4cdfc7f3acc722299aa003212cdd
BLAKE2b-256 a0b1bddb3de3258269f02464de7ed03d370933ecf0e9121f10fc90339aa2201a

See more details on using hashes here.

File details

Details for the file wkregister-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: wkregister-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for wkregister-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 ea8833cc0b961a7e5c8ba4a9f11413fb8770b3dd53bdc8f411f9e45567781a79
MD5 7f59e980b9468a11a2ef033f00c2ffec
BLAKE2b-256 cbddf3d58921fc11f0ea9f213985cba6e01d80168fec1b59c5dd65fae8a186f4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page