Pulsar AVRO interface allowing to read/write AVRO messages from/to dict
Project description
puavro
The puavro is a small convenience library enabling usage of the Apache Pulsar Python client
with pre-defined AVRO schemas and
Python dictionaries instead of AVRO schemas declared as records.
In other words, the library provides an interface to the standard Apache Pulsar client allowing to read/write AVRO messages from/to Python dictionary using AVRO schema, either:
- declared as Python dictionary (using
fastavro.schema.parse_schema()) or - loaded from .avsc file (using
fastavro.schema.load_schema()) or - parsed from JSON string (using
fastavro.schema.parse_schema(json.loads())).
The puavro library consists of just two classes:
DictAVROderived from Pythondictand designated to be used instead ofpulsar.schema.Recordclass;DictAvroSchemaderived frompulsar.schema.AvroSchemaand designated to be used instead.
See also:
Motivation
To enable usage of Python Pulsar client with AVRO messages generated / received by modules written in other languages and using external AVRO schemas (e.g. stored in .avsc files).
Installing
puavro is available on PyPi:
pip install puavro
Dependencies
The library depends on the following modules:
fastavro>=1.4.4
pulsar-client>=2.7.0
Compatibility
The library has been run and tested against Pulsar Python client v. 2.7.0 and 2.8.0. and fastavro v. 1.4.4. and is expected to be compatible with all higher versions also.
License
The library is provided under terms of the MIT license.
How to use
The samples in this sections assume the following imports:
import pulsar
import fastavro
import puavro
import json
import datetime
from pprint import pp
Defining dictionary for AVRO schema
class Segment(puavro.DictAVRO):
SCHEMA = fastavro.schema.load_schema("Segment.avsc")
or
class Segment(puavro.DictAVRO):
SCHEMA = fastavro.schema.parse_schema(json.loads("""{
"type" : "record",
"name" : "Segment",
"namespace" : "try",
"fields" : [ {
"name" : "id",
"type" : "long"
}, {
"name" : "name",
"type" : "string"
}, {
"name" : "when",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "direction",
"type" : {
"type" : "enum",
"name" : "CardinalDirection",
"symbols" : [ "north", "south", "east", "west" ]
}
}, {
"name" : "length",
"type" : [ "null", "long" ]
} ]
}
"""))
or
class Segment(puavro.DictAVRO):
SCHEMA = fastavro.schema.parse_schema({
"type" : "record",
"name" : "Segment",
"namespace" : "try",
"fields" : [ {
"name" : "id",
"type" : "long"
}, {
"name" : "name",
"type" : "string"
}, {
"name" : "when",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "direction",
"type" : {
"type" : "enum",
"name" : "CardinalDirection",
"symbols" : [ "north", "south", "east", "west" ]
}
}, {
"name" : "length",
"type" : [ "null", "long" ]
} ]
})
or
class Segment(puavro.DictAVRO):
pass
Segment.set_schema(fastavro.schema.load_schema("segment.avsc"))
Producer
Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):
PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
producer = pulsar_client.create_producer(topic=TOPIC,
schema=puavro.DictAvroSchema(Segment))
try:
segment = Segment(
id=99,
name = "some name",
when = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc),
direction = "north",
length = 12345,
)
producer.send(segment)
finally:
producer.close()
pulsar_client.close()
Consumer
Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):
PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
WAIT_SECONDS = 3
pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
consumer = pulsar_client.subscribe(TOPIC,
subscription_name="sample",
consumer_type=pulsar.ConsumerType.Shared,
schema=puavro.DictAvroSchema(Segment))
try:
while True:
msg = consumer.receive(WAIT_SECONDS * 1000)
segment = msg.value()
pp(segment)
consumer.acknowledge(msg)
except Exception as e:
if str(e) == 'Pulsar error: TimeOut':
print("END OF DATA")
else:
raise
finally:
consumer.close()
pulsar_client.close()
Samples
The complete samples can be found in the samples directory:
- Producer: sender.py
- Consumer: receiver.py
- AVRO schema: Segment.avsc
- AVRO IDL: try.avdl
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file puavro-1.0.3.tar.gz.
File metadata
- Download URL: puavro-1.0.3.tar.gz
- Upload date:
- Size: 5.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a338dcff960daeaaf9c3df0948d91975d421634dade3896fb460c21861ab8362
|
|
| MD5 |
dc0bf09abea6039c39ef2bab6b07094a
|
|
| BLAKE2b-256 |
ddf08390b8ae7d7d9fcbfbf70f83b4446cf1e6d45aca0c5d28eb02e0753efa42
|
File details
Details for the file puavro-1.0.3-py3-none-any.whl.
File metadata
- Download URL: puavro-1.0.3-py3-none-any.whl
- Upload date:
- Size: 5.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
79a73d46452d30d022ba5c160e558d94c8e4641c78121993be957186d3fba724
|
|
| MD5 |
81585ed851660bba5b3bb4c29a0272b3
|
|
| BLAKE2b-256 |
01fef276635b8c58bb01bc3f246b449e86acc646bca28a92134d8ab8d1a0424a
|