Skip to main content

Collect and aggregate on spark events for profitz. In 🐍 way!

Project description

pyspark-spy

pyspark version python version Build Status

Collect and aggregate on spark events for profitz. In 🐍 way!

Installation

pip install pyspark-spy

How to

You register a listener

import pyspark_spy
listener = pyspark_spy.PersistingSparkListener()
pyspark_spy.register_listener(spark_context, listener)

Execute your spark job as usual

spark_context.range(1, 100).count()

And you have all metrics collected!

print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)

Look Ma, no actions!

Tested on python 3.5 - 3.7 and pyspark 2.3 - 3.0

Available listeners

  • pyspark_spy.interface.SparkListener - Base listener class. It defines on_spark_event(event_name, java_event) method that you can implement yourself for custom logic when any event is received.

  • LoggingSparkListener - just logs event names received into supplied or automatically created logger.

  • StdoutSparkListener - writes event names into stdout

  • PersistingSparkListener - saves spark events into internal buffer

  • ContextSparkListener - same as PersistingSparkListener but also allows you to record only events occured within python context manager scope. More on that later

PersistingSparkListener

Spark events collected (as java objects):

  • applicationEnd
  • applicationStart
  • blockManagerRemoved
  • blockUpdated
  • environmentUpdate
  • executorAdded
  • executorMetricsUpdate
  • executorRemoved
  • jobEnd
  • jobStart
  • otherEvent
  • stageCompleted
  • stageSubmitted
  • taskEnd
  • taskGettingResult
  • taskStart
  • unpersistRDD
listener.java_events['executorMetricsUpdate'] # -> List of py4j java objects

View all possible spark events and their fields https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/scheduler/SparkListener.html

Events converted to python objects:

  • jobEnd
  • stageCompleted
listener.python_events['jobEnd']  # -> List of java events converted to typed namedtuples.
listener.jobEnd  # same

Available aggregations

Only in PersistingSparkListener and ContextSparkListener

  • stage_input_metrics_aggregate - sums up all stageCompleted event inputMetrics into one
print(listener.stage_input_metrics_aggregate())
InputMetrics(bytesRead=21574, recordsRead=584)
  • stage_output_metrics_aggregate - sums up all stageCompleted event outputMetrics into one
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)

ContextSparkListener

To collect events from different actions and to build separate aggregations, use ContextSparkListener.

listener = ContextSparkListener()
register_listener(sc, listener)

with listener as events: # events is basically another listener
    run_spark_job()
events.stage_output_metrics_aggregate()  # events collected only within context manager

with listener as events_2:
    run_other_spark_job()
events_2.stage_output_metrics_aggregate()  # metrics collected during second job

listener.stage_output_metrics_aggregate() # metrics collected for all jobs

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark-spy-1.0.2.tar.gz (6.5 kB view hashes)

Uploaded Source

Built Distribution

pyspark_spy-1.0.2-py3-none-any.whl (7.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page