Skip to main content

Extends quix-streams functionality

Project description

QuixStreams Extensions

Composable Serializers and non-official Sinks.

It is an extension for the popular QuixStreams package, providing enhanced functionality that doesn't suit the main stream branch.

Key Features

Chainable Serializers

Easily chain different types of serializers to each other. Including:

  • Pydantic Serializers: Converts back-and-forth Pydantic models and dataclasses. Helps writing type safe code.
  • Confluent AVRO Serializers: Integrate Confluent Schema Registry.

Installation

To install this package, you can use pip:

pip install quixstreams-extension[avro,pydantic]

Quick start

Here's an example of using composable serializers with QuixStreams:

Imagine you want to write a type safe code and forget asking your self "what was the input schema? let me check examples or logs...". Also, you want to process a topic that contains AVRO messages serialised with a help of Confluent Schema Registry.

So first let’s define our input topic schema, as pydantic model:

from pydantic import BaseModel

class User(BaseModel):
    age: int

Now, let's define an input topic with its deserializer:

from confluent_kafka.schema_registry import SchemaRegistryClient
from quixstreams.models import Deserializer
from quixstreams_extensions.serializers.composer import composed
from quixstreams_extensions.serializers.compositions import pydantic, confluent

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})

composed_deserializer = (
    composed(Deserializer, confluent.to_dict(schema_registry_client), pydantic.to_instance_of(User)),
)  # Takes AVRO payload and returns a pydantic model (may fail during pydantic validation)

input = app.topic("input", value_deserializer=composed_deserializer)

Take a look closer to composed_deserializer. The main entry point is composed(SerialiserClass, *functions) function, which accept either base quixstreams.models.Deserializer class or quixstreams.models.Serializer class (subclasses are allowed), then it accept a series of composed callable which will be called sequentially to achieve a final result. As we can see in our example above it:

  • creates composable Deserializer, which first
  • take AVRO payload and convert it to python dictionary, with the help of SchemaRegistryClient, then it
  • take python dict and convert it to User instance, which now can to used in pipeline.

Now we can use them in the app that defines business logic:

from pydantic_avro import AvroBase


class EnhancedUser(AvroBase):  # output data model 
  age: int
  prefer: Literal["quix-streaming", "sleeping", "hiking"]


def adults_only(user: User):
    return user.age > 18


def enhance(user: User) -> EnhancedUser:
    return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)

The pipeline has two processing functions, both of them are type safe, with help of pydantic (we could achieve the same with dataclasses of course). The EnhancedUser is our output data model, inherits from AvroBase(pydantic.BaseModel), it leverages pydantic-avro that will be useful later.

Finally let's push our enhanced data into another AVRO topic:

output = app.topic(
    "output",
    value_serializer=composed(
        Serializer,
        pydantic.to_dict,
        confluent.to_avro(
            schema_registry_client, EnhancedUser.avro_schema()
        ),  # Takes Pydantic model and convert into AVRO, to be ready for publishing
    ),
)

sdf = sdf.to_topic(output).print()

if __name__ == "__main__":
    app.run(sdf)

We we've got a composed serializer that:

  • pydantic.to_dict takes pydantic model and converts it to python dict
  • confluent.to_avro takes the dict and converts it to AVRO with help of Confluent SchemaRegistryClient and generated AVRO schema by EnhancedUser.avro_schema()
    • by default schema_registry_client will try to register AVRO schema in its registry; with time being and schema evolving it may crash due to migration policy

Please discover examples/ folder for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quixstreams_extensions-0.2.1.tar.gz (12.4 kB view details)

Uploaded Source

Built Distribution

quixstreams_extensions-0.2.1-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file quixstreams_extensions-0.2.1.tar.gz.

File metadata

  • Download URL: quixstreams_extensions-0.2.1.tar.gz
  • Upload date:
  • Size: 12.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0

File hashes

Hashes for quixstreams_extensions-0.2.1.tar.gz
Algorithm Hash digest
SHA256 28fcc1c90fdd7f3cd74dae1bd8721f277c018370e76abbaa7fdb6c63b0d65cc8
MD5 e38ff46828716bceb6ded2bf48454b8e
BLAKE2b-256 d1af916571bd49682c959fc6b16c0c51ddf9afb27a4b1cfd84e573c759f5e797

See more details on using hashes here.

File details

Details for the file quixstreams_extensions-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for quixstreams_extensions-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 71159f13c35f754e92b87f52d883a78ceabfcd01510204132abc1426daba7181
MD5 0156237c4a30dc2758cf6a8ad37092b3
BLAKE2b-256 ab5cad1ce9f19a17fd665fd4ca1d06668ad070e8ad34de51ced4a4a3e705f4d6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page