Skip to main content

Extends quix-streams functionality

Project description

QuixStreams Extensions

Holds chainable serializers and utils for railway-oriented programming.

QuixStreams Extension Package

This Python package is an extension for the popular QuixStreams package, providing enhanced functionality with chainable serializers. These serializers allow you to chain different types to each other seamlessly.

Key Features

  • Chainable Serializers: Easily chain different types of serializers to each other.
    • Pydantic Serializers: Chain serializers for Pydantic models.
    • AVRO Serializers: Integrate Confluent Schema Registry AVRO serializers into your chains.
    • Rail-Well-Oriented Programming Serializes: Use serializers designed for railway-oriented Programming. Based on returns

Installation

To install this package, you can use pip:

pip install quixstreams-extension[avro,pydantic]

Usage

Here's an example of how to use the chainable serializers with QuixStreams:

First let’s define our serializers:

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams.models import (
    BytesDeserializer,
)
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import (
    AVROSerializer,
)
from quixstreams_extensions.models.chains import pydantic
from quixstreams_extensions.models.chains.confluent_schema_registry import avro



class AVROPydanticDeserializer(avro.ToDict, pydantic.FromDict, BytesDeserializer):
    """
    Takes AVRO payload form input topic and returns a pydantic model (may fail during pydantic validation)
    """


class PydanticAVROSerializer(pydantic.ToDict, AVROSerializer):
    """
    Takes Pydantic model and convert into AVRO, to be ready for publishing
    """

    def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[BaseModel]):
        super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)

Then we can use them in the app:

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application

# Create an Application - the main configuration entry point
app = Application(...)

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient(...)

class User(AvroBase):
  age: int

class EnhancedUser(AvroBase):
  age: int
  prefer: Literal["quix-streaming", "sleeping", "hiking"]

# Define the input topic
input = app.topic(
    "input",
    value_deserializer=AVROPydanticDeserializer(schema_registry_client, User),
)

# Define the output topics
output = app.topic(
    "output",
    value_serializer=PydanticAVROSerializer(schema_registry_client, EnhancedUser),
)


def adults_only(user: User):
    return user.age > 18


def enhance(user: User):
    return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)
sdf = sdf.to_topic(output).print()

if __name__ == "__main__":
    app.run(sdf)

Please discover examples/ folder for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quixstreams_extensions-0.1.1.tar.gz (14.5 kB view details)

Uploaded Source

Built Distribution

quixstreams_extensions-0.1.1-py3-none-any.whl (14.4 kB view details)

Uploaded Python 3

File details

Details for the file quixstreams_extensions-0.1.1.tar.gz.

File metadata

  • Download URL: quixstreams_extensions-0.1.1.tar.gz
  • Upload date:
  • Size: 14.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0

File hashes

Hashes for quixstreams_extensions-0.1.1.tar.gz
Algorithm Hash digest
SHA256 f133ec549653e8246137a9325b510df9fc98a93b50bf306fac4da43b6948a41a
MD5 591f99ce7937ab6b5436ee6b80bb6b9b
BLAKE2b-256 16e079bb6f2c121818765065f3db575b4d47593eacf5010f08d6f9a2c6d7f0ce

See more details on using hashes here.

File details

Details for the file quixstreams_extensions-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for quixstreams_extensions-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 32a24f14901a5e6ebab27425a290d6dcec9b9262d1695a4a132020f7e662b09b
MD5 199ed193daf37a28ebee59850b2ebb9a
BLAKE2b-256 70664aca5bc6a0645512073577d6c91f889269b27b2cbd66a51bf760e3abe71c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page