Skip to main content

Python Rest Client to interact against Schema Registry confluent server

Project description

Python Rest Client Schema Registry

Python package GitHub license codecov Python Version

Python Rest Client to interact against schema-registry confluent server to manage Avro and JSON schemas resources.

Requirements

python 3.8+

Installation

pip install python-schema-registry-client

If you want the Faust functionality:

pip install python-schema-registry-client[faust]

Note that this will automatically add a dependency on the faust-streaming fork of faust. If you want to use the old faust version, simply install it manually and then install python-schema-registry-client without the faust extra enabled, the functionality will be the same.

Client API, Serializer, Faust Integration and Schema Server description

Documentation: https://marcosschroh.github.io/python-schema-registry-client.io

Avro Schema Usage

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = client.register("test-deployment", avro_schema)

or async

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", avro_schema)

JSON Schema Usage

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
            }
        }
    },
    "$ref" : "#/definitions/JsonDeployment"
}

json_schema = schema.JsonSchema(deployment_schema)

schema_id = client.register("test-deployment", json_schema)

or async

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
            }
        }
    },
    "$ref" : "#/definitions/JsonDeployment"
}

json_schema = schema.JsonSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", json_schema)

Usage with dataclasses-avroschema for avro schemas

You can generate the avro schema directely from a python class using dataclasses-avroschema and use it in the API for register schemas, check versions and test compatibility:

import dataclasses

from dataclasses_avroschema import AvroModel, types

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")


@dataclasses.dataclass
class UserAdvance(AvroModel):
    name: str
    age: int
    pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')

compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)

# >>> True

Usage with pydantic for json schemas

You can generate the json schema directely from a python class using pydantic and use it in the API for register schemas, check versions and test compatibility:

import typing

from enum import Enum

from pydantic import BaseModel

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

class ColorEnum(str, Enum):
  BLUE = "BLUE"
  YELLOW = "YELLOW"
  GREEN = "GREEN"


class UserAdvance(BaseModel):
    name: str
    age: int
    pets: typing.List[str] = ["dog", "cat"]
    accounts: typing.Dict[str, int] = {"key": 1}
    has_car: bool = False
    favorite_colors: ColorEnum = ColorEnum.BLUE
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)

compatibility = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatibility)

# >>> True

Serializers

You can use AvroMessageSerializer to encode/decode messages in avro

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer


client = SchemaRegistryClient("http://127.0.0.1:8081")
avro_message_serializer = AvroMessageSerializer(client)

avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"},
        {"name": "age", "type": "int"},

    ],
})

# We want to encode the user_record with avro_user_schema
user_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": 20,
}

# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
    "user", avro_user_schema, user_record)

print(message_encoded)
# >>> b'\x00\x00\x00\x00\x01\x1amy_first_name\x18my_last_name('

or with json schemas

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer


client = SchemaRegistryClient("http://127.0.0.1:8081")
json_message_serializer = JsonMessageSerializer(client)

json_schema = schema.JsonSchema({
  "definitions" : {
    "record:python.test.basic.basic" : {
      "description" : "basic schema for tests",
      "type" : "object",
      "required" : [ "number", "name" ],
      "properties" : {
        "number" : {
          "oneOf" : [ {
            "type" : "integer"
          }, {
            "type" : "null"
          } ]
        },
        "name" : {
          "oneOf" : [ {
            "type" : "string"
          } ]
        }
      }
    }
  },
  "$ref" : "#/definitions/record:python.test.basic.basic"
})

# Encode the record
basic_record = {
    "number": 10,
    "name": "a_name",
}

message_encoded = json_message_serializer.encode_record_with_schema(
    "basic", json_schema, basic_record)

print(message_encoded)
# >>> b'\x00\x00\x00\x00\x02{"number": 10, "name": "a_name"}'

When use this library

Usually, we have a situation like this:

Confluent Architecture

So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server to do that for us. In this scenario, the MessageSerializer is perfect.

Also, could be a use case that we would like to have an Application only to administrate Avro Schemas (register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient is perfect.

Development

Poetry is needed to install the dependencies and develope locally

  1. Install dependencies: poetry install --all-extras
  2. Code linting: ./scripts/format
  3. Run tests: ./scripts/test

For commit messages we use commitizen in order to standardize a way of committing rules

Note: The tests are run against the Schema Server using docker compose, so you will need Docker and Docker Compose installed.

In a terminal run docker-compose up. Then in a different terminal run the tests:

./scripts/test

All additional args will be passed to pytest, for example:

./scripts/test ./tests/client/

Tests usind the python shell

To perform tests using the python shell you can run the project using docker-compose.

  1. Execute docker-compose up. Then, the schema registry server will run on http://127.0.0.1:8081, then you can interact against it using the SchemaRegistryClient:
  2. Use the python interpreter (get a python shell typing python in your command line)
  3. Play with the schema server
from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

# do some operations with the client...
deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1

Then, you can check the schema using your browser going to the url http://127.0.0.1:8081/schemas/ids/1

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_schema_registry_client-2.6.1.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file python_schema_registry_client-2.6.1.tar.gz.

File metadata

File hashes

Hashes for python_schema_registry_client-2.6.1.tar.gz
Algorithm Hash digest
SHA256 017fd45a36a4517d9c87c03c992393cce2c437c5ffa8fe1c9dfde1664caa89c9
MD5 81286341983d7ef7c19768cb269ca015
BLAKE2b-256 004c3b10063174780ee1ad97bca6c100cf9634aaba9559f03a588d721403567b

See more details on using hashes here.

File details

Details for the file python_schema_registry_client-2.6.1-py3-none-any.whl.

File metadata

File hashes

Hashes for python_schema_registry_client-2.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 05950ca8f9a3409247514bef3fdb421839d6e1ae544b32dfd3b7b16237673303
MD5 2e2baf73236a19f20240aff48b4ad0bf
BLAKE2b-256 41c1abd18fc3c23dbe09321fcd812091320d4dc954046f95cb431ef2926cb11c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page