Indexima Airflow integration
Project description
airflow-indexima
Versions following Semantic Versioning
Overview
Indexima Airflow integration based on pyhive.
This project is used in our prod environment with success. As it a young project, take care of change, any help is welcome :)
Setup
Requirements
- Python 3.6+
Installation
Install this library directly into an activated virtual environment:
$ pip install airflow-indexima
or add it to your Poetry project:
$ poetry add airflow-indexima
or you could use it as an Airflow plugin
Usage
After installation, the package can imported:
$ python >>> import airflow_indexima >>> airflow_indexima.__version__
a simple query
from airflow_indexima.operators import IndeximaQueryRunnerOperator ... with dag: ... op = IndeximaQueryRunnerOperator( task_id = 'my-task-id', sql_query= 'DELETE FROM Client WHERE GRPD = 1', indexima_conn_id='my-indexima-connection' ) ...
a load into indexima
from airflow_indexima.operators.indexima import IndeximaLoadDataOperator ... with dag: ... op = IndeximaLoadDataOperator( task_id = 'my-task-id', indexima_conn_id='my-indexima-connection', target_table='Client', source_select_query='select * from dsi.client', truncate=True, load_path_uri='jdbc:redshift://my-private-instance.com:5439/db_client?ssl=true&user=airflow-user&password=XXXXXXXX' ) ...
customize credential access
If you use another backend to store your password (like AWS SSM), you could define a decorator and use it as a function in your dag.
from airflow.models import Connection from airflow import DAG from airdlow_indexima.uri import define_load_path_factory, get_redshift_load_path_uri def my_decorator(conn:Connection) -> Connection: conn.password = get_ssm_parameter(param_name=f'{conn.conn_id}.{con.login}') return conn dag = DAG( dag_id='my_dag', user_defined_macros={ # we define a macro get_load_path_uri 'get_load_path_uri': define_load_path_factory( conn_id='my-redshift-connection', decorator=my_decorator, factory=get_redshift_load_path_uri) }, ... ) with dag: ... op = IndeximaLoadDataOperator( task_id = 'my-task-id', indexima_conn_id='my-indexima-connection', target_table='Client', source_select_query='select * from dsi.client', truncate=True, load_path_uri='{{ get_load_path_uri() }}' ) ...
a Connection decorator must follow this type: ConnectionDecorator = Callable[[Connection], Connection]
define_load_path_factory
is a function which take:
- a connnection identifier
- a decorator
ConnectionDecorator
- an uri factory
UriGeneratorFactory = Callable[[str, Optional[ConnectionDecorator]], str]
and return a function with no argument which can be called as a macro in dag's operator.
License
Contributing
See Contributing
Project details
Release history Release notifications
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size airflow_indexima-2.1.0-py3-none-any.whl (9.9 kB) | File type Wheel | Python version py3 | Upload date | Hashes View hashes |
Filename, size airflow_indexima-2.1.0.tar.gz (9.0 kB) | File type Source | Python version None | Upload date | Hashes View hashes |
Hashes for airflow_indexima-2.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e1f42b15ab996e8745da2d5d496959b233fb61ee3e938a858239c48a3762794d |
|
MD5 | 20c71d1b62a34429fc29d47571020960 |
|
BLAKE2-256 | 59de765bf74ac8bb384569d36dc97f74e519549a7b0fcc189bf7636c5df63d60 |