Collect and aggregate on spark events for profitz. In 🐍 way!
Project description
pyspark-spy
Collect and aggregate on spark events for profitz. In 🐍 way!
Installation
pip install pyspark-spy
How to
You register a listener
import pyspark_spy
listener = pyspark_spy.PersistingSparkListener()
pyspark_spy.register_listener(spark_context, listener)
Execute your spark job as usual
spark_context.range(1, 100).count()
And you have all metrics collected!
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)
Look Ma, no actions!
Tested on python 3.5 - 3.7 and pyspark 2.3 - 3.0
Available listeners
-
pyspark_spy.interface.SparkListener
- Base listener class. It defineson_spark_event(event_name, java_event)
method that you can implement yourself for custom logic when any event is received. -
LoggingSparkListener
- just logs event names received into supplied or automatically created logger. -
StdoutSparkListener
- writes event names into stdout -
PersistingSparkListener
- saves spark events into internal buffer -
ContextSparkListener
- same as PersistingSparkListener but also allows you to record only events occured within python context manager scope. More on that later
PersistingSparkListener
Spark events collected (as java objects):
- applicationEnd
- applicationStart
- blockManagerRemoved
- blockUpdated
- environmentUpdate
- executorAdded
- executorMetricsUpdate
- executorRemoved
- jobEnd
- jobStart
- otherEvent
- stageCompleted
- stageSubmitted
- taskEnd
- taskGettingResult
- taskStart
- unpersistRDD
listener.java_events['executorMetricsUpdate'] # -> List of py4j java objects
View all possible spark events and their fields https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/scheduler/SparkListener.html
Events converted to python objects:
- jobEnd
- stageCompleted
listener.python_events['jobEnd'] # -> List of java events converted to typed namedtuples.
listener.jobEnd # same
Available aggregations
Only in PersistingSparkListener
and ContextSparkListener
stage_input_metrics_aggregate
- sums up allstageCompleted
event inputMetrics into one
print(listener.stage_input_metrics_aggregate())
InputMetrics(bytesRead=21574, recordsRead=584)
stage_output_metrics_aggregate
- sums up allstageCompleted
event outputMetrics into one
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)
ContextSparkListener
To collect events from different actions and to build separate aggregations, use ContextSparkListener
.
listener = ContextSparkListener()
register_listener(sc, listener)
with listener as events: # events is basically another listener
run_spark_job()
events.stage_output_metrics_aggregate() # events collected only within context manager
with listener as events_2:
run_other_spark_job()
events_2.stage_output_metrics_aggregate() # metrics collected during second job
listener.stage_output_metrics_aggregate() # metrics collected for all jobs
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pyspark_spy-1.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9071ea457b24fe3cb7b3a8ee45e3d2de98b085c28e60263b4afefc0e9c37e80f |
|
MD5 | 2bde0d59514b02d21770626a46d5a620 |
|
BLAKE2b-256 | 1a93958eca02ecca4ffadfcab41b16f084333681a2ce6ad8e712e754610b0793 |