Skip to main content

Extends quix-streams functionality

Project description

QuixStreams Extensions

Holds chainable serializers and utils for railway-oriented programming.

QuixStreams Extension Package

This Python package is an extension for the popular QuixStreams package, providing enhanced functionality with chainable serializers. These serializers allow you to chain different types to each other seamlessly.

Key Features

  • Chainable Serializers: Easily chain different types of serializers to each other.
    • Pydantic Serializers: Chain serializers for Pydantic models.
    • AVRO Serializers: Integrate Confluent Schema Registry AVRO serializers into your chains.
    • Rail-Well-Oriented Programming Serializes: Use serializers designed for railway-oriented Programming. Based on returns

Installation

To install this package, you can use pip:

pip install quixstreams-extension[avro,pydantic]

Usage

Here's an example of how to use the chainable serializers with QuixStreams:

First let’s define our serializers:

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams.models import (
    BytesDeserializer,
)
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import (
    AVROSerializer,
)
from quixstreams_extensions.models.chains import pydantic
from quixstreams_extensions.models.chains.confluent_schema_registry import avro



class AVROPydanticDeserializer(avro.ToDict, pydantic.FromDict, BytesDeserializer):
    """
    Takes AVRO payload form input topic and returns a pydantic model (may fail during pydantic validation)
    """


class PydanticAVROSerializer(pydantic.ToDict, AVROSerializer):
    """
    Takes Pydantic model and convert into AVRO, to be ready for publishing
    """

    def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[BaseModel]):
        super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)

Then we can use them in the app:

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application

# Create an Application - the main configuration entry point
app = Application(...)

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient(...)

class User(AvroBase):
  age: int

class EnhancedUser(AvroBase):
  age: int
  prefer: Literal["quix-streaming", "sleeping", "hiking"]

# Define the input topic
input = app.topic(
    "input",
    value_deserializer=AVROPydanticDeserializer(schema_registry_client, User),
)

# Define the output topics
output = app.topic(
    "output",
    value_serializer=PydanticAVROSerializer(schema_registry_client, EnhancedUser),
)


def adults_only(user: User):
    return user.age > 18


def enhance(user: User):
    return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)
sdf = sdf.to_topic(output).print()

if __name__ == "__main__":
    app.run(sdf)

Please discover examples/ folder for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quixstreams_extensions-0.1.2.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

quixstreams_extensions-0.1.2-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file quixstreams_extensions-0.1.2.tar.gz.

File metadata

  • Download URL: quixstreams_extensions-0.1.2.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0

File hashes

Hashes for quixstreams_extensions-0.1.2.tar.gz
Algorithm Hash digest
SHA256 ab610b939998849aba5ffce534c307bc9480f9dc445e34005c22185c4df4a0ff
MD5 47ed9c1ff563bc169efb37db0b0298f0
BLAKE2b-256 92b5ef3c7595afdfbf93204eea17583b59d5036cd490571d41c740aaeee611d9

See more details on using hashes here.

File details

Details for the file quixstreams_extensions-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for quixstreams_extensions-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 28261acc6d5fe337b55c26b9b8ff3174b4713a740e4f900177541f8ccdb12d1c
MD5 35ab5439e1112dfe8115a1062ac06488
BLAKE2b-256 83e47f4679d5a1f033ad301d117314acb8188ae40bea0528671ce03dd477bdff

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page