Skip to main content

Pulsar AVRO interface allowing to read/write AVRO messages from/to dict

Project description

puavro

The puavro is a small convenience library enabling usage of the Apache Pulsar Python client with pre-defined AVRO schemas and Python dictionaries instead of AVRO schemas declared as records.

In other words, the library provides an interface to the standard Apache Pulsar client allowing to read/write AVRO messages from/to Python dictionary using AVRO schema, either:

The puavro library consists of just two classes:

  • DictAVRO derived from Python dict and designated to be used instead of pulsar.schema.Record class;
  • DictAvroSchema derived from pulsar.schema.AvroSchema and designated to be used instead.

See also:

Motivation

To enable usage of Python Pulsar client with AVRO messages generated / received by modules written in other languages and using external AVRO schemas (e.g. stored in .avsc files).

Installing

puavro is available on PyPi:

pip install puavro

Dependencies

The library depends on the following modules:

fastavro>=1.4.4
pulsar-client>=2.7.0

Compatibility

The library has been run and tested against Pulsar Python client v. 2.7.0 and 2.8.0. and fastavro v. 1.4.4. and is expected to be compatible with all higher versions also.

License

The library is provided under terms of the MIT license.

How to use

The samples in this sections assume the following imports:

import pulsar
import fastavro
import puavro

import json
import datetime
from pprint import pp

Defining dictionary for AVRO schema

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.load_schema("Segment.avsc")

or

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.parse_schema(json.loads("""{
  "type" : "record",
  "name" : "Segment",
  "namespace" : "try",
  "fields" : [ {
    "name" : "id",
    "type" : "long"
  }, {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "when",
    "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  }, {
    "name" : "direction",
    "type" : {
      "type" : "enum",
      "name" : "CardinalDirection",
      "symbols" : [ "north", "south", "east", "west" ]
    }
  }, {
    "name" : "length",
    "type" : [ "null", "long" ]
  } ]
}
"""))

or

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.parse_schema({
        "type" : "record",
        "name" : "Segment",
        "namespace" : "try",
        "fields" : [ {
            "name" : "id",
            "type" : "long"
        }, {
            "name" : "name",
            "type" : "string"
        }, {
            "name" : "when",
            "type" : {
            "type" : "long",
            "logicalType" : "timestamp-millis"
            }
        }, {
            "name" : "direction",
            "type" : {
            "type" : "enum",
            "name" : "CardinalDirection",
            "symbols" : [ "north", "south", "east", "west" ]
            }
        }, {
            "name" : "length",
            "type" : [ "null", "long" ]
        } ]
        })

or

class Segment(puavro.DictAVRO):
    pass

Segment.set_schema(fastavro.schema.load_schema("segment.avsc"))

Producer

Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"

pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
producer = pulsar_client.create_producer(topic=TOPIC, 
                                         schema=puavro.DictAvroSchema(Segment))
try:
    segment = Segment(
        id=99,
        name = "some name",
        when = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc),
        direction = "north",
        length = 12345,
    )
    producer.send(segment)
finally:
    producer.close()
    pulsar_client.close()

Consumer

Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
WAIT_SECONDS = 3

pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
consumer = pulsar_client.subscribe(TOPIC, 
                                   subscription_name="sample", 
                                   consumer_type=pulsar.ConsumerType.Shared,
                                   schema=puavro.DictAvroSchema(Segment))
try:
    while True:
        msg = consumer.receive(WAIT_SECONDS * 1000)
        segment = msg.value()

        pp(segment)

        consumer.acknowledge(msg)
except Exception as e:
    if str(e) == 'Pulsar error: TimeOut':
        print("END OF DATA")
    else:
        raise
finally:
    consumer.close()
    pulsar_client.close()

Samples

The complete samples can be found in the samples directory:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

puavro-1.0.2.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

puavro-1.0.2-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file puavro-1.0.2.tar.gz.

File metadata

  • Download URL: puavro-1.0.2.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for puavro-1.0.2.tar.gz
Algorithm Hash digest
SHA256 66121d4a7778d6a8e50ff2b155b3e426bbd24ba7ee5c10e4f77b8563ae253abd
MD5 0b177b4071d4991d32b8ca1768446c1b
BLAKE2b-256 8637ce93c67d962b61a40764c80ea7a5b6a5499760961fb77ea69295fb7dee8c

See more details on using hashes here.

File details

Details for the file puavro-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: puavro-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for puavro-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a84635400fb5eda81c4b3efa115223f3ab36259310e7d507c61e6a0c27d05ceb
MD5 884800fcca284287ffc85cf1bf7b7356
BLAKE2b-256 726f3c28a86e8f3b2ee6db0a06b60c15e0c32afa1ec2c744d59a4fcbaaa5c5a4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page