Convert between protobuf messages and pyspark dataframes
Project description
pbspark
This package provides a way to convert protobuf messages into pyspark dataframes and vice versa using a pyspark udf.
Installation
To install:
pip install pbspark
Usage
Suppose we have a pyspark DataFrame which contains a column value
which has protobuf encoded messages of our SimpleMessage
:
syntax = "proto3"; package example; message SimpleMessage { string name = 1; int64 quantity = 2; float measure = 3; }
Using pbspark
we can decode the messages into spark StructType
and then flatten them.
from pyspark.sql.session import SparkSession from pbspark import MessageConverter from example.example_pb2 import SimpleMessage spark = SparkSession.builder.getOrCreate() example = SimpleMessage(name="hello", quantity=5, measure=12.3) data = [{"value": example.SerializeToString()}] df = spark.createDataFrame(data) mc = MessageConverter() df_decoded = df.select(mc.from_protobuf(df.value, SimpleMessage).alias("value")) df_flattened = df_decoded.select("value.*") df_flattened.show() # +-----+--------+-------+ # | name|quantity|measure| # +-----+--------+-------+ # |hello| 5| 12.3| # +-----+--------+-------+ df_flattened.schema # StructType(List(StructField(name,StringType,true),StructField(quantity,IntegerType,true),StructField(measure,FloatType,true))
We can also re-encode them into protobuf strings.
df_reencoded = df_decoded.select(mc.to_protobuf(df_decoded.value, SimpleMessage).alias("value"))
pbspark
uses protobuf's MessageToDict
, which deserializes everything into JSON compatible objects by default. The exception is the bytes type, which MessageToDict
would decode to a base64-encoded string; pbspark
will decode any bytes fields directly to a spark ByteType
.
Conversion between google.protobuf.Timestamp
and spark TimestampType
can be enabled using:
from pbspark import MessageConverter mc = MessageConverter() mc.register_timestamp_serializer() mc.register_timestamp_deserializer()
Custom serde is also supported. Suppose we have a message in which we want to combine fields when we serialize.
Create and register a custom serializer with the MessageConverter
.
from pbspark import MessageConverter from example.example_pb2 import ExampleMessage from example.example_pb2 import NestedMessage from pyspark.sql.types import StringType mc = MessageConverter() # built-in to serialize Timestamp messages to datetime objects mc.register_timestamp_serializer() # register a custom serializer # this will serialize the NestedMessages into a string rather than a # struct with `key` and `value` fields combine_key_value = lambda message: message.key + ":" + message.value mc.register_serializer(NestedMessage, combine_key_value, StringType) ... from pyspark.sql.session import SparkSession from pyspark import SparkContext from pyspark.serializers import CloudPickleSerializer sc = SparkContext(serializer=CloudPickleSerializer()) spark = SparkSession(sc).builder.getOrCreate() message = ExampleMessage(nested=NestedMessage(key="hello", value="world")) data = [{"value": message.SerializeToString()}] df = spark.createDataFrame(data) df_decoded = df.select(mc.from_protobuf(df.value, ExampleMessage).alias("value")) # rather than a struct the value of `nested` is a string df_decoded.select("value.nested").show() # +-----------+ # | nested| # +-----------+ # |hello:world| # +-----------+
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.