Convert between protobuf messages and pyspark dataframes

Project description


This package provides a way to convert protobuf messages into pyspark dataframes and vice versa using a pyspark udf.


To install:

pip install pbspark


Suppose we have a pyspark DataFrame which contains a column value which has protobuf encoded messages of our SimpleMessage:

syntax = "proto3";

package example;

message SimpleMessage {
  string name = 1;
  int64 quantity = 2;
  float measure = 3;

Using pbspark we can decode the messages into spark StructType and then flatten them.

from pyspark.sql.session import SparkSession
from pbspark import MessageConverter
from example.example_pb2 import SimpleMessage

spark = SparkSession.builder.getOrCreate()

example = SimpleMessage(name="hello", quantity=5, measure=12.3)
data = [{"value": example.SerializeToString()}]
df = spark.createDataFrame(data)

mc = MessageConverter()
df_decoded =, SimpleMessage).alias("value"))
df_flattened ="value.*")

# +-----+--------+-------+
# | name|quantity|measure|
# +-----+--------+-------+
# |hello|       5|   12.3|
# +-----+--------+-------+

# StructType(List(StructField(name,StringType,true),StructField(quantity,IntegerType,true),StructField(measure,FloatType,true))

We can also re-encode them into protobuf strings.

df_reencoded =, SimpleMessage).alias("value"))

pbspark uses protobuf's MessageToDict, which deserializes everything into JSON compatible objects by default. The exception is the bytes type, which MessageToDict would decode to a base64-encoded string; pbspark will decode any bytes fields directly to a spark ByteType.

Conversion between google.protobuf.Timestamp and spark TimestampType can be enabled using:

from pbspark import MessageConverter

mc = MessageConverter()

Custom serde is also supported. Suppose we have a message in which we want to combine fields when we serialize.

Create and register a custom serializer with the MessageConverter.

from pbspark import MessageConverter
from example.example_pb2 import ExampleMessage
from example.example_pb2 import NestedMessage
from pyspark.sql.types import StringType

mc = MessageConverter()
# built-in to serialize Timestamp messages to datetime objects

# register a custom serializer
# this will serialize the NestedMessages into a string rather than a
# struct with `key` and `value` fields
combine_key_value = lambda message: message.key + ":" + message.value

mc.register_serializer(NestedMessage, combine_key_value, StringType)


from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.serializers import CloudPickleSerializer

sc = SparkContext(serializer=CloudPickleSerializer())
spark = SparkSession(sc).builder.getOrCreate()

message = ExampleMessage(nested=NestedMessage(key="hello", value="world"))
data = [{"value": message.SerializeToString()}]
df = spark.createDataFrame(data)

df_decoded =, ExampleMessage).alias("value"))
# rather than a struct the value of `nested` is a string"value.nested").show()

# +-----------+
# |     nested|
# +-----------+
# |hello:world|
# +-----------+

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pbspark-0.3.0.tar.gz (7.7 kB view hashes)

Uploaded source

Built Distribution

pbspark-0.3.0-py3-none-any.whl (7.1 kB view hashes)

Uploaded py3

