Skip to main content

Astro SDK allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.

Project description

astro

workflows made easy

Python versions License Development Status PyPI downloads Contributors Commit activity pre-commit.ci status CI codecov

Astro Python SDK is a Python SDK for rapid development of extract, transform, and load workflows in Apache Airflow. It allows you to express your workflows as a set of data dependencies without having to worry about ordering and tasks. The Astro Python SDK is maintained by Astronomer.

Prerequisites

  • Apache Airflow >= 2.1.0.

Install

The Astro Python SDK is available at PyPI. Use the standard Python installation tools.

To install a cloud-agnostic version of the SDK, run:

pip install astro-sdk-python

You can also install dependencies for using the SDK with popular cloud providers:

pip install astro-sdk-python[amazon,google,snowflake,postgres]

Quickstart

  1. Ensure that your Airflow environment is set up correctly by running the following commands:

    export AIRFLOW_HOME=`pwd`
    export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
    airflow db init
    

    Note: AIRFLOW__CORE__ENABLE_XCOM_PICKLING needs to be enabled for astro-sdk-python.

    Currently, custom XCom backends are limited to data types that are json serializable. Since Dataframes are not json serializable, we need to enable XCom pickling to store dataframes.

    The data format used by pickle is Python-specific. This has the advantage that there are no restrictions imposed by external standards such as JSON or XDR (which can’t represent pointer sharing); however it means that non-Python programs may not be able to reconstruct pickled Python objects.

    Read more: enable_xcom_pickling and pickle:

  2. Create a SQLite database for the example to run with:

    # The sqlite_default connection has different host for MAC vs. Linux
    export SQL_TABLE_NAME=`airflow connections get sqlite_default -o     yaml | grep host | awk '{print $2}'`
    sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
    
  3. Copy the following workflow into a file named calculate_popular_movies.py and add it to the dags directory of your Airflow project:

    from datetime import datetime
    from airflow import DAG
    from astro import sql as aql
    from astro.files import File
    from astro.sql.table import Table
    
    @aql.transform()
    def top_five_animations(input_table: Table):
        return """
            SELECT title, rating
            FROM {{input_table}}
            WHERE genre1=='Animation'
            ORDER BY rating desc
            LIMIT 5;
        """
    
    with DAG(
        "calculate_popular_movies",
        schedule_interval=None,
        start_date=datetime(2000, 1, 1),
        catchup=False,
    ) as dag:
        imdb_src = File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv")
        imdb_movies = Table(name="imdb_movies", conn_id="sqlite_default")
        imdb_movies = aql.load_file(imdb_src, imdb_movies)
    
        top_animations = Table(name="top_animation")
        top_animations = top_five_animations(input_table=imdb_movies, output_table=top_animations)
    
  4. Run the example DAG:

    airflow dags test calculate_popular_movies `date -Iseconds`
    
  5. Check the result of your DAG by running:

    sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
    

    You should see the following output:

    $ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
    Toy Story 3 (2010)|8.3
    Inside Out (2015)|8.2
    How to Train Your Dragon (2010)|8.1
    Zootopia (2016)|8.1
    How to Train Your Dragon 2 (2014)|7.9
    

Supported technologies

Databases
Google BigQuery
Postgres
Snowflake
SQLite
File types
CSV
JSON
NDJSON
Parquet
File stores
Amazon S3
Filesystem
Google GCS

Available operations

The following are some key functions available in the SDK:

  • load_file: Load a given file into a SQL table
  • transform: Applies a SQL select statement to a source table and saves the result to a destination table
  • drop_table: Drops a SQL table
  • run_raw_sql: Run any SQL statement without handling its output
  • append: Insert rows from the source SQL table into the destination SQL table, if there are no conflicts
  • merge: Insert rows from the source SQL table into the destination SQL table, depending on conflicts:
    • ignore: Do not add rows that already exist
    • update: Replace existing rows with new ones
  • export_file: Export SQL table rows into a destination file
  • dataframe: Export given SQL table into in-memory Pandas data-frame

For a full list of available operators, see the SDK reference documentation.

Documentation

The documentation is a work in progress--we aim to follow the Diátaxis system:

  • Getting Started: A hands-on introduction to the Astro Python SDK
  • How-to guides: Simple step-by-step user guides to accomplish specific tasks
  • Reference guide: Commands, modules, classes and methods
  • Explanation: Clarification and discussion of key decisions when designing the project

Changelog

The Astro Python SDK follows semantic versioning for releases. Check the changelog for the latest changes.

Release managements

To learn more about our release philosophy and steps, see Managing Releases.

Contribution guidelines

All contributions, bug reports, bug fixes, documentation improvements, enhancements, and ideas are welcome.

Read the Contribution Guideline for a detailed overview on how to contribute.

Contributors and maintainers should abide by the Contributor Code of Conduct.

License

Apache Licence 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astro-sdk-python-1.1.0.tar.gz (61.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

astro_sdk_python-1.1.0-py3-none-any.whl (85.2 kB view details)

Uploaded Python 3

File details

Details for the file astro-sdk-python-1.1.0.tar.gz.

File metadata

  • Download URL: astro-sdk-python-1.1.0.tar.gz
  • Upload date:
  • Size: 61.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.13

File hashes

Hashes for astro-sdk-python-1.1.0.tar.gz
Algorithm Hash digest
SHA256 17b59ced674ba9a4aed72fdcab6e2527f85ba9923eaf2c0b64ae099d7424efda
MD5 c4fa66b70bf52da38c6a839bbf94abb2
BLAKE2b-256 2cee3a143928e1c9c67205a74d2e746c9f58ce127e3aacbb539dd801bbfb79bb

See more details on using hashes here.

File details

Details for the file astro_sdk_python-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for astro_sdk_python-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 50c13c663ffb7ab2e99a59863a44d0eeef392523eb8059a5b0060cdd8fd493d1
MD5 92483d3897845d3d575dd319b635465d
BLAKE2b-256 2906a74d63409fbecaf159dd91b7ce6de09bae652be1789f39010b8548096a39

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page