Skip to main content

Plugins for Airflow to run Spark jobs via Livy: sessions and batches

Project description

Airflow Livy Plugins

Build Status Code coverage

Plugins for Airflow to run Spark jobs via Livy:

  • Sessions,
  • Batches. This mode supports additional verification via Spark/YARN REST API.

See this blog post for more information and detailed comparison of ways to run Spark jobs from Airflow.

Directories and files of interest

  • airflow_home: example DAGs and plugins for Airflow. Can be used as Airflow home path.
  • batches: Spark jobs code, to be used in Livy batches.
  • sessions: (Optionally) templated Spark code for Livy sessions.
  • airflow.sh: helper shell script. Can be used to run sample DAGs, prep development environment and more. Run it to find out what other commands are available.

How do I...

...run the examples?

Prerequisites:

  • Python 3. Make sure it's installed and in $PATH

Now,

  1. Do you have a Spark cluster with Livy running somewhere?
    1. No. Either get one, or "mock" it with my Spark cluster on Docker Compose.
    2. Yes. You're golden!
  2. Optional - this step can be skipped if you're mocking a cluster on your machine. Open airflow.sh. Inside init_airflow () function you'll see Airflow Connections for Livy, Spark and YARN. Redefine as appropriate.
  3. run ./airflow.sh up to bring up the whole infrastructure. Airflow UI will be available at localhost:8080.
  4. Ctrl+C to stop Airflow. Then ./airflow.sh down to dispose of remaining Airflow processes (shouldn't be needed there if everything goes well).

... use it in my project?

pip install airflow-livy-plugins

Then link or copy the plugin files into $AIRFLOW_HOME/plugins (see how I do that in ./airflow.sh). They'll get loaded into Airflow via Plugin Manager automatically. This is how you import the plugins:

from airflow.operators import LivySessionOperator
from airflow.operators import LivyBatchOperator

Plugins are loaded at run-time so the imports above will look broken in your IDE, but will work fine in Airflow. Take a look at the sample DAGs to see my walkaround :)

... set up the development environment?

Alright, you want to contribute and need to be able to run the stuff on your machine, as well as the usual niceness that comes with IDEs (debugging, syntax highlighting). How do I

  • run ./airflow.sh dev to install all dev dependencies.
  • ./airflow.sh updev runs local Airflow with local plugins (as opposed to pulling them from PyPi)
  • (Pycharm-specific) point PyCharm to your newly-created virtual environment: go to "Preferences" -> "Project: airflow-livy-plugins" -> "Project interpreter", select "Existing environment" and pick python3 executable from venv folder (venv/bin/python3)
  • ./airflow.sh cov - run tests with coverage report (will be saved to htmlcov/).
  • ./airflow.sh lint - highlight code style errors.
  • ./airflow.sh format to reformat all code (Black + isort)

... debug?

  • (Pycharm-specific) Step-by-step debugging with airflow test and running PySpark batch jobs locally (with debugging as well) is supported via run configurations under .idea/runConfigurations. You shouldn't have to do anything to use them - just open the folder in PyCharm as a project.
  • An example of how a batch can be ran on local Spark:
python ./batches/join_2_files.py \
"file:////Users/vpanov/data/vpanov/bigdata-docker-compose/data/grades.csv" \
"file:///Users/vpanov/data/vpanov/bigdata-docker-compose/data/ssn-address.tsv" \
-file1_sep=, -file1_header=true \
-file1_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING" \
-file1_join_column=SSN -file2_header=false \
-file2_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Address1 STRING, Address2 STRING" \
-file2_join_column=SSN -output_header=true \
-output_columns="file1.\`Last name\` AS LastName, file1.\`First name\` AS FirstName, file1.SSN, file2.Address1, file2.Address2" 

# Optionally append to save result to file
#-output_path="file:///Users/vpanov/livy_batch_example" 

TODO

  • airflow.sh - replace with modern tools (e.g. pipenv + Docker image)
  • Disable some of flake8 flags for cleaner code

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-livy-plugins-0.2.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

airflow_livy_plugins-0.2-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file airflow-livy-plugins-0.2.tar.gz.

File metadata

  • Download URL: airflow-livy-plugins-0.2.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.6

File hashes

Hashes for airflow-livy-plugins-0.2.tar.gz
Algorithm Hash digest
SHA256 e0e6c5da41bc0e782ab4026343f59e8dafddd9dceb76a78bb540e2ec27a7d35a
MD5 096835b15795bbcdd92bf21f9d1b8354
BLAKE2b-256 ada384a1e876d01d8d09b2bb45426d19b34117978ab016961ceda61b981496fe

See more details on using hashes here.

File details

Details for the file airflow_livy_plugins-0.2-py3-none-any.whl.

File metadata

  • Download URL: airflow_livy_plugins-0.2-py3-none-any.whl
  • Upload date:
  • Size: 11.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.6

File hashes

Hashes for airflow_livy_plugins-0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d9c36ae1055cdea3cd3f3623fc23f59a7b4f4e4b3b2548f5a2e81d7d1a4849ef
MD5 965dfce910ddd8200bed130a9352b089
BLAKE2b-256 76e023fec6b7aa99becb182bcc4e7b668a9cdb75413c6560870b747d745165f1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page