Skip to main content

No project description provided

Project description

faststream-schema-registry

GitHub Actions Workflow Status codecov Python Version PyPI - Version

Middleware to integrate with the confluent schema registry for managing avro/json schemas.

Schemas are generated from python classes using dataclasses-avroschema and registered using python-schema-registry-client.

Requirements

python 3.9+

Installation

pip install faststream-schema-registry

Examples

Serialize objects using either the AvroSchemaRegistry or the JsonSchemaRegistry

AvroSchemaRegistry

from datetime import datetime

from dataclasses_avroschema.pydantic import AvroBaseModel
from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream_schema_registry.middleware import SchemaRegistryMiddleware
from faststream_schema_registry.registries import AvroSchemaRegistry

schema_registry = AvroSchemaRegistry(url="http://localhost:8081")
broker = KafkaBroker(
    "localhost:29092",
    middlewares=[
        SchemaRegistryMiddleware.make_middleware(schema_registry=schema_registry)
    ],
)

app = FastStream(broker)

# class for message object
class Message(AvroBaseModel):
    timestamp: datetime
    message_id: int
    payload: str

    class Meta:
        namespace = "com.test"

@app.after_startup
async def test():
    await broker.publish(Message.fake(), topic="messages")


@broker.subscriber("messages")
async def on_messages(msg: Message):
    print(msg)

JsonSchemaRegistry

from datetime import datetime

from dataclasses_avroschema.pydantic import AvroBaseModel
from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream_schema_registry.middleware import SchemaRegistryMiddleware
from faststream_schema_registry.registries import JsonSchemaRegistry

schema_registry = JsonSchemaRegistry(url="http://localhost:8081")
broker = KafkaBroker(
    "localhost:29092",
    middlewares=[
        SchemaRegistryMiddleware.make_middleware(schema_registry=schema_registry)
    ],
)
...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_schema_registry-0.1.3a1.tar.gz (7.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faststream_schema_registry-0.1.3a1-py3-none-any.whl (8.6 kB view details)

Uploaded Python 3

File details

Details for the file faststream_schema_registry-0.1.3a1.tar.gz.

File metadata

File hashes

Hashes for faststream_schema_registry-0.1.3a1.tar.gz
Algorithm Hash digest
SHA256 3fd8fa338db8cbe9c7043256ae628d182264b0d6d1601073887d6989495b7a65
MD5 e4f433fe93094803877f1e24c0f7001a
BLAKE2b-256 4e27a629447fd9920c5f60b58b66c8dcbe4b418cbc34310d5cfaef3214d4b0e0

See more details on using hashes here.

Provenance

The following attestation bundles were made for faststream_schema_registry-0.1.3a1.tar.gz:

Publisher: publish-package.yml on mlovretovich/faststream-schema-registry

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file faststream_schema_registry-0.1.3a1-py3-none-any.whl.

File metadata

File hashes

Hashes for faststream_schema_registry-0.1.3a1-py3-none-any.whl
Algorithm Hash digest
SHA256 54e572c2dc2908a121df89acac464436cb05b84de727c909c1d8eecb26dc10fc
MD5 14da113c289b03fc7aadf0cd64d7bcd9
BLAKE2b-256 59cc34337d06938ebca7159ec6f47e06709626938d03412b4b83bf89ed2edf99

See more details on using hashes here.

Provenance

The following attestation bundles were made for faststream_schema_registry-0.1.3a1-py3-none-any.whl:

Publisher: publish-package.yml on mlovretovich/faststream-schema-registry

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page