Indexima Airflow integration
Project description
airflow-indexima
Versions following Semantic Versioning
Overview
Indexima Airflow integration based on pyhive.
This project is used in our prod environment with success. As it a young project, take care of change, any help is welcome :)
Setup
Requirements
- Python 3.6+
Installation
Install this library directly into an activated virtual environment:
$ pip install airflow-indexima
or add it to your Poetry project:
$ poetry add airflow-indexima
or you could use it as an Airflow plugin
Usage
After installation, the package can imported:
$ python
>>> import airflow_indexima
>>> airflow_indexima.__version__
a simple query
from airflow_indexima.operators import IndeximaQueryRunnerOperator
...
with dag:
...
op = IndeximaQueryRunnerOperator(
task_id = 'my-task-id',
sql_query= 'DELETE FROM Client WHERE GRPD = 1',
indexima_conn_id='my-indexima-connection'
)
...
a load into indexima
from airflow_indexima.operators.indexima import IndeximaLoadDataOperator
...
with dag:
...
op = IndeximaLoadDataOperator(
task_id = 'my-task-id',
indexima_conn_id='my-indexima-connection',
target_table='Client',
source_select_query='select * from dsi.client',
truncate=True,
load_path_uri='jdbc:redshift://my-private-instance.com:5439/db_client?ssl=true&user=airflow-user&password=XXXXXXXX'
)
...
customize credential access
If you use another backend to store your password (like AWS SSM), you could define a decorator and use it as a function in your dag.
from airflow.models import Connection
from airflow import DAG
from airdlow_indexima.uri import define_load_path_factory, get_redshift_load_path_uri
def my_decorator(conn:Connection) -> Connection:
conn.password = get_ssm_parameter(param_name=f'{conn.conn_id}.{con.login}')
return conn
dag = DAG(
dag_id='my_dag',
user_defined_macros={
# we define a macro get_load_path_uri
'get_load_path_uri': define_load_path_factory(
conn_id='my-redshift-connection',
decorator=my_decorator,
factory=get_redshift_load_path_uri)
},
...
)
with dag:
...
op = IndeximaLoadDataOperator(
task_id = 'my-task-id',
indexima_conn_id='my-indexima-connection',
target_table='Client',
source_select_query='select * from dsi.client',
truncate=True,
load_path_uri='{{ get_load_path_uri() }}'
)
...
a Connection decorator must follow this type: ConnectionDecorator = Callable[[Connection], Connection]
define_load_path_factory
is a function which take:
- a connnection identifier
- a decorator
ConnectionDecorator
- an uri factory
UriGeneratorFactory = Callable[[str, Optional[ConnectionDecorator]], str]
and return a function with no argument which can be called as a macro in dag's operator.
License
Contributing
See Contributing
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for airflow_indexima-2.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e1f42b15ab996e8745da2d5d496959b233fb61ee3e938a858239c48a3762794d |
|
MD5 | 20c71d1b62a34429fc29d47571020960 |
|
BLAKE2b-256 | 59de765bf74ac8bb384569d36dc97f74e519549a7b0fcc189bf7636c5df63d60 |