Skip to main content

AccelData Torch Airflow SDK.

Project description

TORCH-AIRFLOW-SDK

Torch airflow sdk provides support for observability of airflow dags in torch catalog. With the use of torch airflow sdk, user can e2e observability on airflow dag run in torch UI. Every dag is associated with pipeline in torch.
Make sure while configuring airflow, 3 environmental needs to be set up in airflow env. docker container.

  • TORCH_CATALOG_URL - URL of the torch catalog
  • TORCH_ACCESS_KEY - API access key generated from torch UI
  • TORCH_SECRET_KEY - API secret key generated from torch UI

First of all, install below mentioned 2 pypi package to expose ETL in torch.

pip install torch-sdk

Read more about torch-sdk from here

pip install torch-airflow-sdk

Read more about torch-airflow-sdk from here

Create Pipeline

First of all, to create a pipeline in torch, the user needs to create a pipeline using torch-sdk. To know more about pipeline, check torch-sdk documentation that contains detailed information about torch pipeline usage.

from torch_sdk.models.pipeline import CreatePipeline, PipelineMetadata
from torch_sdk.torch_client import TorchClient

torchClient = TorchClient(url="https://torch.acceldata.local:5443",
                       access_key="OY2VVIN2N6LJ", secret_key="da6bDBimQfXSMsyyhlPVJJfk7Zc2gs")

pipeline = CreatePipeline(
    uid='customer.orders.monthly.agg',
    name='Customer Orders Monthly aggregate',
    description='Pipeline to Aggregate the customer orders over 1 year',
    meta=PipelineMetadata(
        owner='vaishvik', team='torch', codeLocation='...'),
    context={
        'associated_tables': 'pipeline.customer, pipeline.orders, pipeline.customer_orders, pipeline.customer_orders_monthly_agg'}
)

pipeline_res = torchClient.create_pipeline(pipeline=pipeline)

Create DAG

This changed in version 0.0.30
In airflow DAG code, import torch dag instead of airflow dag. All the parameters will be the same as standard apache airflow dag. But there will be 2 additional parameters override_success_callback, override_failure_callback. override_success_callback can be set to True if we do not want the pipeline run to be ended at the end of the successful run of the DAG. Similarly, override_failure_callback can be set to True if we do not want the pipeline run to be ended at the end of the unsuccessful run of the DAG. These can be useful if few steps of the pipeline are being executed outside of Airflow DAG.

from torch_airflow_sdk.dag import DAG
dag = DAG(
   dag_id='pipeline_demo_final',
   schedule_interval='@daily',
   default_args=default_args,
   start_date=datetime(2020, 2, 2),
   catchup=False,
   on_failure_callback= failure_callback,
   on_success_callback= success_callback,
   override_success_callback=False,
   override_failure_callback=False,
)

Create Job and Span using decorator

This was added in version 0.0.34
To create a job and span in the pipeline, the user needs to decorate the python function with a job decorator as shown in the below example. Pass some parameters (job uid, span_uid, xcom_to_event_mapper_ids, bounded_by_span, metadata object and input-output asset list) inside the decorator. Make sure, inside a Dataset the type of the object which will have source (data source name inside torch catalog) and asset_uid (asset path from its root) as parameters. span_uid and xcom_to_event_mapper_ids are optional parameters used to create span. If span_uid is not passed a span corresponding to the job will be created with job_uid passed. job_uid: job uid of the pipeline(If job_uid is not passed uid is created using the dag_id, task id and function name), inputs: input arrays of the task, outputs: output array of the job, metadata: metadata of the job, xcom_to_event_mapper_ids: list of xcom keys used to send xcom variables in span event). bounded_by_span True if you want to create a span for the current task.

from torch_airflow_sdk.decorators.job import job
from torch_sdk.models.job import JobMetadata, Dataset
@job(job_uid='monthly.order.aggregate.job',
   inputs=[Dataset('POSTGRES_LOCAL_DS', 'pipeline.pipeline.customer_orders')],
   outputs=[Dataset('POSTGRES_LOCAL_DS', 'pipeline.pipeline.customer_orders_monthly_agg')],
   metadata=JobMetadata(name = 'Vaishvik_brahmbhatt', team = 'backend', code_location ='https://github.com/acme/reporting/report.scala'),
   span_uid='customer.orders.datagen.span',
   xcom_to_event_mapper_ids = ['run_id', 'event_id'],
   bounded_by_span=True
   )
def monthly_order_aggregate(**context):
    pass

Create Span Using Decorator

This changed in version 0.0.30
To create a span for a python function, the user can decorate a python function with a span decorator that contains span uid as parameters. To decorate function with span make sure, it has **context parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function. To get the parent span context, use the key name span_context_parent in xcom pull of the task instance. It’s value will be span context instance which can be used to create child spans and send custom events (As shown in below example.)

from torch_airflow_sdk.decorators.span import span
from torch_sdk.events.generic_event import GenericEvent
@span(span_uid='customer.orders.datagen.span',
      associated_job_uids = ['monthly.order.aggregate.transfer'],  xcom_to_event_mapper_ids = ['run_id', 'event_id'] )
def data_gen(**context):
   datagen_span_context = context['span_context_parent']
   customer_datagen_span = datagen_span_context.create_child_span(
       uid="customer.data.gen", 
      context_data= {'client_time': str(datetime.now()) }
   )
   customer_datagen_span.send_event(
      GenericEvent(
         context_data={
            'client_time': str(datetime.now()), 
            'row_count': len(rows)
         }, 
         event_uid="order.customer.join.result"
      )
   )
   customer_datagen_span.end(
       context_data={'client_time': str(datetime.now()), 'customers_count': len(customer_ids) }
   )

Custom Operators

Torch airflow sdk contains 4 custom operators.

TorchInitializer Operator :

This changed in version 0.0.34
The user needs to add a task with a given operator at the root of your dag. This operator will create a new pipeline. Additionally, this will create new pipeline run and root span for that dag run of the airflow dag. Those will not be created if create_pipeline is set to False. This can be useful if pipeline/pipeline run has been created outside of Airflow DAG. In such case pipeline_run_id can be passed if any particular run need to be continued. Otherwise, latest pipeline run will be continued in this DAG. We can pass root span name for the pipeline using span_name parameter. we can pass metadata for pipeline (PipelineMetadata) using meta parameter

from torch_airflow_sdk.operators.torch_initialiser_operator import TorchInitializer
from torch_sdk.models.pipeline import PipelineMetadata

torch_initializer_task = TorchInitializer(
   task_id='torch_pipeline_initializer',
   pipeline_uid='customer.orders.monthly.agg.demo',
   pipeline_name='CUSTOMERS ORDERS MONTHLY AGG',
   create_pipeline=True,
   span_name='customer.orders.monthly.agg.demo.span',
   meta=PipelineMetadata(owner='test', team='testing', codeLocation='...'),
   dag=dag
)
SpanOperator Operator :

This changed in version 0.0.30
SpanOperator Operator will execute any std operator being passed as operator parameter and send span start and end event it. Just wrap the std operator with a span operator. Make sure that the wrapped operator is not added in the DAG. If the operator is wrapped with a span operator, the span operator will take care of that operator task inside its execution. It will have some required parameters ( span_uid : uid of the span, operator : standard operator task that needs to be wrapped with span, xcom_to_event_mapper_ids: list of xcom keys used to send xcom variables in span event). Other parameters will be the same as the airflow standard base operator.

WARNING: Do not specify the dag parameter in std airflow operator being passed as an argument to SpanOperator as the execution of operator task is taken care of by SpanOperator.
from torch_airflow_sdk.operators.span_operator import SpanOperator

get_order_agg_for_q4 = PostgresOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   postgres_conn_id='example_db',
   sql="select * from information_schema.attributess",
)

get_order_agg_for_q4 = SpanOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   span_uid='monthly.order.agg.q4.span',
   operator=get_order_agg_for_q4,
   associated_job_uids = ['monthly.order.aggregate.transfer'],  
   xcom_to_event_mapper_ids = ['run_id', 'event_id'] ,
   dag=dag
)
JobOperator Operator :

This changed in version 0.0.34
JobOperator Operator will execute any std operator being passed as operator parameter and create a job and send span start and end event. Just wrap the std operator with a Job operator. Make sure that the wrapped operator is not added in the DAG. If the operator is wrapped with a Job operator, the Job operator will take care of that operator task inside its execution. It will have some parameters ( span_uid : uid of the span(If span_uid is not passed a span corresponding to the job will be created with job_uid passed.), operator : standard operator task for which a job needs to be created amd has to be wrapped with span, job_uid: job uid of the pipeline(If job_uid is not passed uid is created using the dag_id, task id and operator name), inputs: input arrays of the task, outputs: output array of the job, metadata: metadata of the job, xcom_to_event_mapper_ids: list of xcom keys used to send xcom variables in span event). bounded_by_span True if you want to create a span for the current task. Other parameters will be the same as the airflow standard base operator.

WARNING: Do not specify the dag parameter in std airflow operator being passed as an argument to JobOperator as the execution of operator task is taken care of by JobOperator.
from torch_airflow_sdk.operators.job_operator import JobOperator

get_order_agg_for_q4 = PostgresOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   postgres_conn_id='example_db',
   sql="select * from information_schema.attributess",
)

get_order_agg_for_q4 = JobOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   job_uid='customer.order.join.job',
   inputs=[Dataset('POSTGRES_LOCAL_DS', 'pipeline.pipeline.orders'), Dataset('POSTGRES_LOCAL_DS', 'pipeline.pipeline.customers')] ,
   outputs=[Dataset('POSTGRES_LOCAL_DS', 'pipeline.pipeline.customer_orders')],
   metadata=JobMetadata('name', 'team', 'code_location'),
   span_uid='monthly.order.agg.q4.span',
   operator=get_order_agg_for_q4,
   xcom_to_event_mapper_ids = ['run_id', 'event_id'] ,
   bounded_by_span = True,
   dag=dag
)

This changed in version 0.0.34

ExecutePolicyOperator Operator :

ExecutePolicyOperator is used to execute a policy by passing rule_type and rule_id. If sync parameter is set to True it will return only after the execution ends. If sync parameter is set to False it will return immediately after starting the execution. Allowed values for rule_type and failure_strategy can be used from torch_sdk.constants.

from torch_airflow_sdk.operators.execute_policy_operator import ExecutePolicyOperator
from torch_sdk.constants import FailureStrategy, DATA_QUALITY
operator_task = ExecutePolicyOperator(
    task_id='torch_pipeline_operator_test',
    rule_type=DATA_QUALITY,
    rule_id=46,
    sync=True,
    failure_strategy=FailureStrategy.DoNotFail,
    dag=dag
)

In case you need to query the result in another task you need to pull the execution id from xcom by passing the rule name in the {rule_type}_{rule_id}_execution_id. In this example the rule type is const.DATA_QUALITY and rule_id 46 After getting the execution_id you need to call get_rule_result on torch_client by passing rule_type, execution_id and failure_strategy which is optional. By default failure_strategy is set to not fail but just return error message.

from torch_sdk.torch_client import TorchClient
from torch_airflow_sdk.initialiser import torch_credentials
from torch_sdk.constants import FailureStrategy, DATA_QUALITY, RuleExecutionStatus

def ruleoperator_result(**context):
    xcom_key = f'{DATA_QUALITY}_46_execution_id'
    task_instance = context['ti']
    # get the rule_name and execution id - then pull them in xcom
    execution_id = task_instance.xcom_pull(key=xcom_key)
    if execution_id is not None:
        torch_client = TorchClient(**torch_credentials)
        result = torch_client.get_rule_result(rule_type=DATA_QUALITY, execution_id=execution_id, 
         failure_strategy=FailureStrategy.DoNotFail)
    if result.execution.resultStatus==RuleExecutionStatus.ERRORED:
        print(result.execution.executionError)

In case you need to query the current status in another task you need to pull the execution id from xcom by passing the rule name in the {rule_type}_{rule_id}_execution_id. In this example the rule type is const.DATA_QUALITY and rule_id 46.

Then call get_rule_status function. This will return the resultStatus of the execution.

from torch_sdk.torch_client import TorchClient
from torch_airflow_sdk.initialiser import torch_credentials
import torch_sdk.constants as const
def ruleoperator_status(**context):
    xcom_key = f'{const.DATA_QUALITY}_46_execution_id'
    task_instance = context['ti']
    # get the rule_name and execution id - then pull them in xcom
    execution_id = task_instance.xcom_pull(key=xcom_key)
    if execution_id is not None:
        torch_client = TorchClient(**torch_credentials)
        result = torch_client.get_rule_status(rule_type=const.DATA_QUALITY, execution_id=execution_id)
        if result==const.RuleExecutionStatus.ERRORED:
            print("Rule execution encountered an error.")

Version Log

0.1.0 (17/06/2021)

  • Torch airflow sdk - Wrapper on apache airflow
  • Torch airflow sdk provides support for observability of airflow dags in torch catalog. With the use of torch airflow sdk, user can e2e observability on airflow dag run in torch UI.
  • Support for airflow 1.0 and 2.0.

0.1.3 (2/02/2022)

  • Added support for pipeline creation by passing uid to the initializer operator
  • Job and span decorator's additional parameters
  • Minor bug fixes

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

torch_airflow_sdk-0.0.34.tar.gz (10.4 MB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page