Collect and aggregate on spark events for profitz. In 🐍 way!
Project description
pyspark-spy
Collect and aggregate on spark events for profitz. In 🐍 way!
How to
You register a listener
import pyspark_spy
listener = pyspark_spy.PersistingSparkListener()
pyspark_spy.register_listener(spark_context, listener)
Execute your spark job as usual
spark_context.range(1, 100).count()
And you have all metrics collected!
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)
Look Ma, no actions!
Tested on python 3.5 - 3.7 and pyspark 2.3 - 3.0
Available listeners
-
pyspark_spy.interface.SparkListener
- Base listener class. It defineson_spark_event(event_name, java_event)
method that you can implement yourself for custom logic when any event is received. -
LoggingSparkListener
- just logs event names received into supplied or automatically created logger. -
StdoutSparkListener
- writes event names into stdout -
PersistingSparkListener
- saves spark events into internal buffer -
ContextSparkListener
- same as PersistingSparkListener but also allows you to record only events occured within python context manager scope. More on that later
PersistingSparkListener
Spark events collected (as java objects):
- applicationEnd
- applicationStart
- blockManagerRemoved
- blockUpdated
- environmentUpdate
- executorAdded
- executorMetricsUpdate
- executorRemoved
- jobEnd
- jobStart
- otherEvent
- stageCompleted
- stageSubmitted
- taskEnd
- taskGettingResult
- taskStart
- unpersistRDD
listener.java_events['executorMetricsUpdate'] # -> List of py4j java objects
View all possible spark events and their fields https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/scheduler/SparkListener.html
Events converted to python objects:
- jobEnd
- stageCompleted
listener.python_events['jobEnd'] # -> List of java events converted to typed namedtuples.
listener.jobEnd # same
Available aggregations
Only in PersistingSparkListener
and ContextSparkListener
stage_input_metrics_aggregate
- sums up allstageCompleted
event inputMetrics into one
print(listener.stage_input_metrics_aggregate())
InputMetrics(bytesRead=21574, recordsRead=584)
stage_output_metrics_aggregate
- sums up allstageCompleted
event outputMetrics into one
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)
ContextSparkListener
To collect events from different actions and to build separate aggregations, use ContextSparkListener
.
listener = ContextSparkListener()
register_listener(sc, listener)
with listener as events: # events is basically another listener
run_spark_job()
events.stage_output_metrics_aggregate() # events collected only within context manager
with listener as events_2:
run_other_spark_job()
events_2.stage_output_metrics_aggregate() # metrics collected during second job
listener.stage_output_metrics_aggregate() # metrics collected for all jobs
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pyspark_spy-1.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 528760ae3f526b54eebe64331aa6b8f517241e1301dd37b4f5c71acb81df4802 |
|
MD5 | 11e997eab5e21154b249188f51de7f11 |
|
BLAKE2b-256 | 4aa38dcd4ac2e23880a7ed17bddf5a7762eb6a1cafbf098d1db1c6782b1eec07 |