Skip to main content

Pulsar AVRO interface allowing to read/write AVRO messages from/to dict

Project description

puavro

The puavro is a small convenience library enabling usage of the Apache Pulsar Python client with pre-defined AVRO schemas and Python dictionaries instead of AVRO schemas declared as records.

In other words, the library provides an interface to the standard Apache Pulsar client allowing to read/write AVRO messages from/to Python dictionary using AVRO schema, either:

The puavro library consists of just two classes:

  • DictAVRO derived from Python dict and designated to be used instead of pulsar.schema.Record class;
  • DictAvroSchema derived from pulsar.schema.AvroSchema and designated to be used instead.

See also:

Motivation

To enable usage of Python Pulsar client with AVRO messages generated / received by modules written in other languages and using external AVRO schemas (e.g. stored in .avsc files).

Installing

puavro is available on PyPi:

pip install puavro

Dependencies

The library depends on the following modules:

fastavro>=1.4.4
pulsar-client>=2.7.0

Compatibility

The library has been run and tested against Pulsar Python client v. 2.7.0 and 2.8.0. and fastavro v. 1.4.4. and is expected to be compatible with all higher versions also.

License

The library is provided under terms of the MIT license.

How to use

The samples in this sections assume the following imports:

import pulsar
import fastavro
import puavro

import json
import datetime
from pprint import pp

Defining dictionary for AVRO schema

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.load_schema("Segment.avsc")

or

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.parse_schema(json.loads("""{
  "type" : "record",
  "name" : "Segment",
  "namespace" : "try",
  "fields" : [ {
    "name" : "id",
    "type" : "long"
  }, {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "when",
    "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  }, {
    "name" : "direction",
    "type" : {
      "type" : "enum",
      "name" : "CardinalDirection",
      "symbols" : [ "north", "south", "east", "west" ]
    }
  }, {
    "name" : "length",
    "type" : [ "null", "long" ]
  } ]
}
"""))

or

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.parse_schema({
        "type" : "record",
        "name" : "Segment",
        "namespace" : "try",
        "fields" : [ {
            "name" : "id",
            "type" : "long"
        }, {
            "name" : "name",
            "type" : "string"
        }, {
            "name" : "when",
            "type" : {
            "type" : "long",
            "logicalType" : "timestamp-millis"
            }
        }, {
            "name" : "direction",
            "type" : {
            "type" : "enum",
            "name" : "CardinalDirection",
            "symbols" : [ "north", "south", "east", "west" ]
            }
        }, {
            "name" : "length",
            "type" : [ "null", "long" ]
        } ]
        })

or

class Segment(puavro.DictAVRO):
    pass

Segment.set_schema(fastavro.schema.load_schema("segment.avsc"))

Producer

Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"

pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
producer = pulsar_client.create_producer(topic=TOPIC, 
                                         schema=puavro.DictAvroSchema(Segment))
try:
    segment = Segment(
        id=99,
        name = "some name",
        when = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc),
        direction = "north",
        length = 12345,
    )
    producer.send(segment)
finally:
    producer.close()
    pulsar_client.close()

Consumer

Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
WAIT_SECONDS = 3

pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
consumer = pulsar_client.subscribe(TOPIC, 
                                   subscription_name="sample", 
                                   consumer_type=pulsar.ConsumerType.Shared,
                                   schema=puavro.DictAvroSchema(Segment))
try:
    while True:
        msg = consumer.receive(WAIT_SECONDS * 1000)
        segment = msg.value()

        pp(segment)

        consumer.acknowledge(msg)
except Exception as e:
    if str(e) == 'Pulsar error: TimeOut':
        print("END OF DATA")
    else:
        raise
finally:
    consumer.close()
    pulsar_client.close()

Samples

The complete samples can be found in the samples directory:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

puavro-1.0.3.tar.gz (5.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

puavro-1.0.3-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file puavro-1.0.3.tar.gz.

File metadata

  • Download URL: puavro-1.0.3.tar.gz
  • Upload date:
  • Size: 5.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for puavro-1.0.3.tar.gz
Algorithm Hash digest
SHA256 a338dcff960daeaaf9c3df0948d91975d421634dade3896fb460c21861ab8362
MD5 dc0bf09abea6039c39ef2bab6b07094a
BLAKE2b-256 ddf08390b8ae7d7d9fcbfbf70f83b4446cf1e6d45aca0c5d28eb02e0753efa42

See more details on using hashes here.

File details

Details for the file puavro-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: puavro-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for puavro-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 79a73d46452d30d022ba5c160e558d94c8e4641c78121993be957186d3fba724
MD5 81585ed851660bba5b3bb4c29a0272b3
BLAKE2b-256 01fef276635b8c58bb01bc3f246b449e86acc646bca28a92134d8ab8d1a0424a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page