Skip to main content

Indexima Airflow integration

Project description

airflow-indexima

Unix Build Status PyPI Version PyPI License

Versions following Semantic Versioning

Overview

Indexima Airflow integration based on pyhive.

This project is used in our prod environment with success. As it a young project, take care of change, any help is welcome :)

Setup

Requirements

  • Python 3.6+

Installation

Install this library directly into an activated virtual environment:

$ pip install airflow-indexima

or add it to your Poetry project:

$ poetry add airflow-indexima

or you could use it as an Airflow plugin

Usage

After installation, the package can imported:

$ python
>>> import airflow_indexima
>>> airflow_indexima.__version__

See Api documentation

a simple query

from airflow_indexima.operators import IndeximaQueryRunnerOperator

...

with dag:
    ...
    op = IndeximaQueryRunnerOperator(
        task_id = 'my-task-id',
        sql_query= 'DELETE FROM Client WHERE GRPD = 1',
        indexima_conn_id='my-indexima-connection'
    )
    ...

a load into indexima

from airflow_indexima.operators.indexima import IndeximaLoadDataOperator

...

with dag:
    ...
    op = IndeximaLoadDataOperator(
        task_id = 'my-task-id',
        indexima_conn_id='my-indexima-connection',
        target_table='Client',
        source_select_query='select * from dsi.client',
        truncate=True,
        load_path_uri='jdbc:redshift://my-private-instance.com:5439/db_client?ssl=true&user=airflow-user&password=XXXXXXXX'
    )
    ...

customize credential access

If you use another backend to store your password (like AWS SSM), you could define a decorator and use it as a function in your dag.

from airflow.models import Connection
from airflow import DAG

from airdlow_indexima.uri import define_load_path_factory, get_redshift_load_path_uri


def my_decorator(conn:Connection) -> Connection:
    conn.password = get_ssm_parameter(param_name=f'{conn.conn_id}.{con.login}')
    return conn


dag = DAG(
    dag_id='my_dag',
    user_defined_macros={
        # we define a macro get_load_path_uri
        'get_load_path_uri': define_load_path_factory(
            conn_id='my-redshift-connection',
            decorator=my_decorator,
            factory=get_redshift_load_path_uri)
        },
    ...
)

with dag:
    ...
    op = IndeximaLoadDataOperator(
        task_id = 'my-task-id',
        indexima_conn_id='my-indexima-connection',
        target_table='Client',
        source_select_query='select * from dsi.client',
        truncate=True,
        load_path_uri='{{ get_load_path_uri() }}'
    )
    ...

a Connection decorator must follow this type: ConnectionDecorator = Callable[[Connection], Connection]

define_load_path_factory is a function which take:

  • a connnection identifier
  • a decorator ConnectionDecorator
  • an uri factory UriGeneratorFactory = Callable[[str, Optional[ConnectionDecorator]], str]

and return a function with no argument which can be called as a macro in dag's operator.

License

The MIT License (MIT)

Contributing

See Contributing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for airflow-indexima, version 2.1.0
Filename, size File type Python version Upload date Hashes
Filename, size airflow_indexima-2.1.0-py3-none-any.whl (9.9 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size airflow_indexima-2.1.0.tar.gz (9.0 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page