Skip to main content

Extends quix-streams functionality

Project description

QuixStreams Extensions

Holds chainable serializers and utils for railway-oriented programming.

QuixStreams Extension Package

This Python package is an extension for the popular QuixStreams package, providing enhanced functionality with chainable serializers. These serializers allow you to chain different types to each other seamlessly.

Key Features

  • Chainable Serializers: Easily chain different types of serializers to each other.
    • Pydantic Serializers: Chain serializers for Pydantic models.
    • AVRO Serializers: Integrate Confluent Schema Registry AVRO serializers into your chains.
    • Rail-Well-Oriented Programming Serializes: Use serializers designed for railway-oriented Programming. Based on returns

Installation

To install this package, you can use pip:

pip install quixstreams-extension[avro,pydantic]

Usage

Here's an example of how to use the chainable serializers with QuixStreams:

First let’s define our serializers:

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams.models import (
    BytesDeserializer,
)
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import (
    AVROSerializer,
)
from quixstreams_extensions.models.chains import pydantic
from quixstreams_extensions.models.chains.confluent_schema_registry import avro



class AVROPydanticDeserializer(avro.ToDict, pydantic.FromDict, BytesDeserializer):
    """
    Takes AVRO payload form input topic and returns a pydantic model (may fail during pydantic validation)
    """


class PydanticAVROSerializer(pydantic.ToDict, AVROSerializer):
    """
    Takes Pydantic model and convert into AVRO, to be ready for publishing
    """

    def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[BaseModel]):
        super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)

Then we can use them in the app:

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application

# Create an Application - the main configuration entry point
app = Application(...)

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient(...)

class User(AvroBase):
  age: int

class EnhancedUser(AvroBase):
  age: int
  prefer: Literal["quix-streaming", "sleeping", "hiking"]

# Define the input topic
input = app.topic(
    "input",
    value_deserializer=AVROPydanticDeserializer(schema_registry_client, User),
)

# Define the output topics
output = app.topic(
    "output",
    value_serializer=PydanticAVROSerializer(schema_registry_client, EnhancedUser),
)


def adults_only(user: User):
    return user.age > 18


def enhance(user: User):
    return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)
sdf = sdf.to_topic(output).print()

if __name__ == "__main__":
    app.run(sdf)

Please discover examples/ folder for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quixstreams_extensions-0.1.3.tar.gz (15.0 kB view details)

Uploaded Source

Built Distribution

quixstreams_extensions-0.1.3-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file quixstreams_extensions-0.1.3.tar.gz.

File metadata

  • Download URL: quixstreams_extensions-0.1.3.tar.gz
  • Upload date:
  • Size: 15.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0

File hashes

Hashes for quixstreams_extensions-0.1.3.tar.gz
Algorithm Hash digest
SHA256 edded3034dde2791722e98171a5ac963db87d7ee58481b46b59cbc19061c485f
MD5 dcce5f9019ad4aea052522c6a1a6eb41
BLAKE2b-256 3ebf36a464fe74f1f6aec4dc3e785316688d3e18283e7437cf5c8bb0d45b7ce2

See more details on using hashes here.

File details

Details for the file quixstreams_extensions-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for quixstreams_extensions-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a94f4e18dfe9788ca1cda2c6c4c90e24ebac99254a0210608e8569140e93eaa7
MD5 6ced3f9a2ad80df919734b69db8e33cc
BLAKE2b-256 864c47ef0a57d73026eb7c36bb6b425530a59b9fb58a1be60f76d4e1561c3ccc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page