Skip to main content

Indexima Airflow integration

Project description

airflow-indexima

Unix Build Status PyPI Version PyPI License

Versions following Semantic Versioning

Overview

Indexima Airflow integration based on pyhive.

This project is used in our prod environment with success. As it a young project, take care of change, any help is welcome :)

Setup

Requirements

  • Python 3.6+

Installation

Install this library directly into an activated virtual environment:

$ pip install airflow-indexima

or add it to your Poetry project:

$ poetry add airflow-indexima

Usage

After installation, the package can imported:

$ python
>>> import airflow_indexima
>>> airflow_indexima.__version__

See Api documentation

a simple query

from airflow_indexima.operators import IndeximaQueryRunnerOperator

...

with dag:
    ...
    op = IndeximaQueryRunnerOperator(
        task_id = 'my-task-id',
        sql_query= 'DELETE FROM Client WHERE GRPD = 1',
        indexima_conn_id='my-indexima-connection'
    )
    ...

a load into indexima

from airflow_indexima.operators.indexima import IndeximaLoadDataOperator

...

with dag:
    ...
    op = IndeximaLoadDataOperator(
        task_id = 'my-task-id',
        indexima_conn_id='my-indexima-connection',
        target_table='Client',
        source_select_query='select * from dsi.client',
        truncate=True,
        load_path_uri='jdbc:redshift://my-private-instance.com:5439/db_client?ssl=true&user=airflow-user&password=XXXXXXXX'
    )
    ...

customize credential access

If you use another backend to store your password (like AWS SSM), you could define a decorator and use it as a function in your dag.

from airflow.models import Connection
from airflow import DAG

from airdlow_indexima.uri import define_load_path_factory, get_redshift_load_path_uri


def my_decorator(conn:Connection) -> Connection:
    conn.password = get_ssm_parameter(param_name=f'{conn.conn_id}.{con.login}')
    return conn


dag = DAG(
    dag_id='my_dag',
    user_defined_macros={
        # we define a macro get_load_path_uri
        'get_load_path_uri': define_load_path_factory(
            conn_id='my-redshift-connection',
            decorator=my_decorator,
            factory=get_redshift_load_path_uri)
        },
    ...
)

with dag:
    ...
    op = IndeximaLoadDataOperator(
        task_id = 'my-task-id',
        indexima_conn_id='my-indexima-connection',
        target_table='Client',
        source_select_query='select * from dsi.client',
        truncate=True,
        load_path_uri='{{ get_load_path_uri() }}'
    )
    ...

a Connection decorator must follow this type: ConnectionDecorator = Callable[[Connection], Connection]

define_load_path_factory is a function which take:

  • a connnection identifier
  • a decorator ConnectionDecorator
  • an uri factory UriGeneratorFactory = Callable[[str, Optional[ConnectionDecorator]], str]

and return a function with no argument which can be called as a macro in dag's operator.

License

The MIT License (MIT)

Contributing

See Contributing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_indexima-2.0.6.tar.gz (8.4 kB view hashes)

Uploaded Source

Built Distribution

airflow_indexima-2.0.6-py3-none-any.whl (8.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page