Extends quix-streams functionality
Project description
QuixStreams Extensions
Holds chainable serializers and utils for railway-oriented programming.
QuixStreams Extension Package
This Python package is an extension for the popular QuixStreams package, providing enhanced functionality with chainable serializers. These serializers allow you to chain different types to each other seamlessly.
Key Features
- Chainable Serializers: Easily chain different types of serializers to each other.
- Pydantic Serializers: Chain serializers for Pydantic models.
- AVRO Serializers: Integrate Confluent Schema Registry AVRO serializers into your chains.
- Rail-Well-Oriented Programming Serializes: Use serializers designed for railway-oriented Programming. Based on returns
Installation
To install this package, you can use pip:
pip install quixstreams-extension[avro,pydantic]
Usage
Here's an example of how to use the chainable serializers with QuixStreams:
First let’s define our serializers:
from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams.models import (
BytesDeserializer,
)
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import (
AVROSerializer,
)
from quixstreams_extensions.models.chains import pydantic
from quixstreams_extensions.models.chains.confluent_schema_registry import avro
class AVROPydanticDeserializer(avro.ToDict, pydantic.FromDict, BytesDeserializer):
"""
Takes AVRO payload form input topic and returns a pydantic model (may fail during pydantic validation)
"""
class PydanticAVROSerializer(pydantic.ToDict, AVROSerializer):
"""
Takes Pydantic model and convert into AVRO, to be ready for publishing
"""
def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[BaseModel]):
super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)
Then we can use them in the app:
from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application
# Create an Application - the main configuration entry point
app = Application(...)
# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient(...)
class User(AvroBase):
age: int
class EnhancedUser(AvroBase):
age: int
prefer: Literal["quix-streaming", "sleeping", "hiking"]
# Define the input topic
input = app.topic(
"input",
value_deserializer=AVROPydanticDeserializer(schema_registry_client, User),
)
# Define the output topics
output = app.topic(
"output",
value_serializer=PydanticAVROSerializer(schema_registry_client, EnhancedUser),
)
def adults_only(user: User):
return user.age > 18
def enhance(user: User):
return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")
sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)
sdf = sdf.to_topic(output).print()
if __name__ == "__main__":
app.run(sdf)
Please discover examples/
folder for more information.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file quixstreams_extensions-0.1.1.tar.gz
.
File metadata
- Download URL: quixstreams_extensions-0.1.1.tar.gz
- Upload date:
- Size: 14.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f133ec549653e8246137a9325b510df9fc98a93b50bf306fac4da43b6948a41a |
|
MD5 | 591f99ce7937ab6b5436ee6b80bb6b9b |
|
BLAKE2b-256 | 16e079bb6f2c121818765065f3db575b4d47593eacf5010f08d6f9a2c6d7f0ce |
File details
Details for the file quixstreams_extensions-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: quixstreams_extensions-0.1.1-py3-none-any.whl
- Upload date:
- Size: 14.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.6 Darwin/23.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 32a24f14901a5e6ebab27425a290d6dcec9b9262d1695a4a132020f7e662b09b |
|
MD5 | 199ed193daf37a28ebee59850b2ebb9a |
|
BLAKE2b-256 | 70664aca5bc6a0645512073577d6c91f889269b27b2cbd66a51bf760e3abe71c |