Skip to main content

Extends quix-streams functionality

Project description

QuixStreams Extensions

Composable Serializers and non-official Sinks.

It is an extension for the popular QuixStreams package, providing enhanced functionality that doesn't suit the main stream branch.

Key Features

Chainable Serializers

Easily chain different types of serializers to each other. Including:

  • Pydantic Serializers: Converts back-and-forth Pydantic models and dataclasses. Helps writing type safe code.
  • Confluent AVRO Serializers: Integrate Confluent Schema Registry.

Installation

To install this package, you can use pip:

pip install quixstreams-extension[avro,pydantic]

Quick start

Here's an example of using composable serializers with QuixStreams:

Imagine you want to write a type safe code and forget asking your self "what was the input schema? let me check examples or logs...". Also, you want to process a topic that contains AVRO messages serialised with a help of Confluent Schema Registry.

So first let’s define our input topic schema, as pydantic model:

from pydantic import BaseModel

class User(BaseModel):
    age: int

Now, let's define an input topic with its deserializer:

from confluent_kafka.schema_registry import SchemaRegistryClient
from quixstreams.models import Deserializer
from quixstreams_extensions.serializers.composer import composed
from quixstreams_extensions.serializers.compositions import pydantic, confluent

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})

composed_deserializer = (
    composed(Deserializer, confluent.to_dict(schema_registry_client), pydantic.to_instance_of(User)),
)  # Takes AVRO payload and returns a pydantic model (may fail during pydantic validation)

input = app.topic("input", value_deserializer=composed_deserializer)

Take a look closer to composed_deserializer. The main entry point is composed(SerialiserClass, *functions) function, which accept either base quixstreams.models.Deserializer class or quixstreams.models.Serializer class (subclasses are allowed), then it accept a series of composed callable which will be called sequentially to achieve a final result. As we can see in our example above it:

  • creates composable Deserializer, which first
  • take AVRO payload and convert it to python dictionary, with the help of SchemaRegistryClient, then it
  • take python dict and convert it to User instance, which now can to used in pipeline.

Now we can use them in the app that defines business logic:

from pydantic_avro import AvroBase


class EnhancedUser(AvroBase):  # output data model 
  age: int
  prefer: Literal["quix-streaming", "sleeping", "hiking"]


def adults_only(user: User):
    return user.age > 18


def enhance(user: User) -> EnhancedUser:
    return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)

The pipeline has two processing functions, both of them are type safe, with help of pydantic (we could achieve the same with dataclasses of course). The EnhancedUser is our output data model, inherits from AvroBase(pydantic.BaseModel), it leverages pydantic-avro that will be useful later.

Finally let's push our enhanced data into another AVRO topic:

output = app.topic(
    "output",
    value_serializer=composed(
        Serializer,
        pydantic.to_dict,
        confluent.to_avro(
            schema_registry_client, EnhancedUser.avro_schema()
        ),  # Takes Pydantic model and convert into AVRO, to be ready for publishing
    ),
)

sdf = sdf.to_topic(output).print()

if __name__ == "__main__":
    app.run(sdf)

We we've got a composed serializer that:

  • pydantic.to_dict takes pydantic model and converts it to python dict
  • confluent.to_avro takes the dict and converts it to AVRO with help of Confluent SchemaRegistryClient and generated AVRO schema by EnhancedUser.avro_schema()
    • by default schema_registry_client will try to register AVRO schema in its registry; with time being and schema evolving it may crash due to migration policy

Please discover examples/ folder for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quixstreams_extensions-0.2.0.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

quixstreams_extensions-0.2.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file quixstreams_extensions-0.2.0.tar.gz.

File metadata

  • Download URL: quixstreams_extensions-0.2.0.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0

File hashes

Hashes for quixstreams_extensions-0.2.0.tar.gz
Algorithm Hash digest
SHA256 b6a09baa7301ee36105a11b6cc2150d80eca1996448306d1a6258c974152a48e
MD5 d7ac09817ea4c8f09512109184182f2f
BLAKE2b-256 6895767c4ce465a100bb3c4ad915e22d779cc5ddf91dc61abeda22f534aca61b

See more details on using hashes here.

File details

Details for the file quixstreams_extensions-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for quixstreams_extensions-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 67a8b89ee8e7c727278c3b3c349fc55829717019b05bfcde229e17b55133dc96
MD5 d7d58c6368de996f2430e123d2f76098
BLAKE2b-256 229eddec453e413909a6dbd7ad51e1495175c19fcb2a58d8ed2a825ec24fb99f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page