Lets Airflow DAGs run Spark jobs via Livy: sessions and/or batches.
Airflow Livy Operators
Lets Airflow DAGs run Spark jobs via Livy:
- Batches. This mode supports additional verification via Spark/YARN REST API.
See this blog post for more information and detailed comparison of ways to run Spark jobs from Airflow.
Directories and files of interest
airflow_home/plugins: Airflow Livy operators' code.
airflow_home/dags: example DAGs for Airflow.
batches: Spark jobs code, to be used in Livy batches.
sessions: (Optionally) templated Spark code for Livy sessions.
helper.sh: helper shell script. Can be used to run sample DAGs, prep development environment and more. Run it to find out what other commands are available.
How do I...
...run the examples?
- Python 3. Make sure it's installed and in $PATH
- Spark cluster with Livy. Where to get that? You can "mock" it on your machine with my Spark cluster on Docker Compose.
- Optional - this step can be skipped if you're mocking a cluster on your
machine. Open helper.sh. Inside
init_airflow()function you'll see Airflow Connections for Livy, Spark and YARN. Redefine as appropriate.
./helper.sh upto bring up the whole infrastructure. Airflow UI will be available at localhost:8888.
- Ctrl+C to stop Airflow. Then
./helper.sh downto dispose of remaining Airflow processes (shouldn't be required if everything goes well. Run this if you can't start Airflow again due to some non-informative errors) .
... use it in my project?
pip install airflow-livy-operators
This is how you import them:
from airflow_livy.session import LivySessionOperator from airflow_livy.batch import LivyBatchOperator
... set up the development environment?
Alright, you want to contribute and need to be able to run the stuff on your machine, as well as the usual niceness that comes with IDEs (debugging, syntax highlighting).
./helper.sh devto install all dev dependencies.
./helper.sh updevruns Airflow with local operators' code (as opposed to pulling them from PyPi). Useful for development.
- (Pycharm-specific) point PyCharm to your newly-created virtual environment: go to
"Preferences" -> "Project: airflow-livy-operators" -> "Project interpreter", select "Existing environment"and pick python3 executable from venv folder (venv/bin/python3)
./helper.sh cov- run tests with coverage report (will be saved to htmlcov/).
./helper.sh lint- highlight code style errors.
./helper.sh formatto reformat all code (Black + isort)
- (Pycharm-specific) Step-by-step debugging with
airflow testand running PySpark batch jobs locally (with debugging as well) is supported via run configurations under
.idea/runConfigurations. You shouldn't have to do anything to use them - just open the folder in PyCharm as a project.
- An example of how a batch can be ran on local Spark:
python ./batches/join_2_files.py \ "file:////Users/vpanov/data/vpanov/bigdata-docker-compose/data/grades.csv" \ "file:///Users/vpanov/data/vpanov/bigdata-docker-compose/data/ssn-address.tsv" \ -file1_sep=, -file1_header=true \ -file1_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING" \ -file1_join_column=SSN -file2_header=false \ -file2_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Address1 STRING, Address2 STRING" \ -file2_join_column=SSN -output_header=true \ -output_columns="file1.\`Last name\` AS LastName, file1.\`First name\` AS FirstName, file1.SSN, file2.Address1, file2.Address2" # Optionally append to save result to file #-output_path="file:///Users/vpanov/livy_batch_example"
- helper.sh - replace with modern tools (e.g. pipenv + Docker image)
- Disable some of flake8 flags for cleaner code
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
|Filename, size||File type||Python version||Upload date||Hashes|
|Filename, size airflow_livy_operators-0.3-py3-none-any.whl (11.6 kB)||File type Wheel||Python version py3||Upload date||Hashes View|
|Filename, size airflow-livy-operators-0.3.tar.gz (10.5 kB)||File type Source||Python version None||Upload date||Hashes View|
Hashes for airflow_livy_operators-0.3-py3-none-any.whl
Hashes for airflow-livy-operators-0.3.tar.gz