Skip to main content

Accel-data Torch SDK. Acceldata Torch is a complete solution to observe the quality of the data present in your data lake and warehouse. Using Torch, you can ensure that high-quality data backs your business decisions.

Reason this release was yanked:

Currently the alpha versions are being updated

Project description

Pipeline APIs

Acceldata Torch is a complete solution to observe the quality of the data present in your data lake and warehouse. Using Torch, you can ensure that high-quality data backs your business decisions. Torch provides you with tools to measure the quality of data in a data catalog and to never miss significant data sources. All users including analysts, data scientists, and developers, can rely on Torch to observe the data flowing in the warehouse or data lake and can rest assured that there is no loss of data.
Torch SDK is used to trigger torch catalog and pipeline APIs. By creating a Torch client, all the torch apis can be accessed.

Install torch-sdk pypi package in a python environment.

pip install torch-sdk

Read more about torch-sdk from here

Create Torch Client

Torch client is used to send data to the torch servers. It consists of various methods to communicate with the torch server. Torch client have access to catalog and pipeline APIs. To create a torch client, torch url and API keys are required. To create torch API keys, go to torch ui’s settings and generate keys for the client.

from torch_sdk.torch_client import TorchClient

torch_client = TorchClient(url='https://acceldata.host.dev:9999', access_key='******',
                         secret_key='*****************')

Pipeline API

There are various pipeline APIs are supported through torch sdk. Pipeline APIs like create pipeline, add jobs and spans, initiate pipeline run et cetera. Torch sdk is able to send various event during span life cycle. Hence, torch sdk has full control over the pipelines.

Create Pipeline And Job

Pipeline represents the ETL pipeline in its entirety and will contain Asset nodes and Jobs associated. The complete pipeline definition forms the Lineage graph for all the data assets.
Job Node or Process Node represents an entity that does some job in the ETL workflow. From this representation, Job’s input is some assets, and output is few other assets. Torch will use the set of Jobs definition in the workflow to create the Lineage, and the will also track version changes for the Pipeline.

To create pipeline and jobs, first create creation object with required parameter. And with use of supported methods by sdk, can do corresponding operation on torch server side.

from torch_sdk.models.job import CreateJob, JobMetadata, Dataset
from torch_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus

# Create pipeline
pipeline = CreatePipeline(
    uid='monthly_reporting_pipeline',
    name='Monthly reporting Pipeline',
    description='Pipeline to create monthly reporting tables',
    meta=PipelineMetadata('Vaishvik', 'torch_sdk_code', '...'),
    context={'key1': 'value1'}
)
pipeline_response = torch_client.create_pipeline(pipeline=pipeline)

# Create Job
job = CreateJob(
    uid='monthly_sales_aggregate',
    name='Monthly Sales Aggregate',
    description='Generates the monthly sales aggregate tables for the complete year',
    inputs=[Dataset('datasource-name', 'database.schema.table_1')],
    outputs=[Dataset('datasource-name', 'database.schema.table_2')],
    meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'),
    context={'key21': 'value21'}
)
job_response = pipeline_response.create_job(job)
Create Pipeline Run And Generate Spans And Send Span Events

Pipeline run indicates the execution of the pipeline. The same pipeline can be executed multiple times and each execution (run) has new snapshot version. Each pipeline run has hierarchical span's group. A Span is a way to group a bunch of metrics, and they are hierarchical. It can be as granular as possible. The APIs will support creating a span object from a pipeline object, and then hierarchical spans are started from parent spans. A Span typically encompasses a process or a task and can be granular. This hierarchical system is powerful enough to model extremely complex pipeline observability flows. Optionally, a span can also be associated with a Job. This way, we can track starting and completion of Job, including the failure tracking. Start and stop are implicitly tracked for a span.

Torch sdk has also support for create new pipeline run, add spans in it. During the span life cycle, sdk is able to send some customs and standard span events to collect pipeline run metrics for observability.

from torch_sdk.events.generic_event import GenericEvent
from datetime import datetime

# create a pipeline run of the pipeline
pipeline_run = pipeline_response.create_pipeline_run()

# create span in the pipeline run
span_context = pipeline_run.create_span(uid='monthly.generate.data.span')

# check current span is root or not
span_context.is_root()

# end the span 
span_context.end()

# check if the current span has children or not
span_context.has_children()

# create a child span
child_span_context = span_context.create_child_span('monthly.generate.customer.span')

# send custom event
child_span_context.send_event(
    GenericEvent(context_data={'client_time': str(datetime.now()), 'row_count': 100}, 
                 event_uid="order.customer.join.result")
)


# abort span
child_span_context.abort()

# failed span
child_span_context.failed()

# update a pipeline run of the pipeline
updatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'},
                                                               result=PipelineRunResult.SUCCESS,
                                                               status=PipelineRunStatus.COMPLETED)
Get Latest Pipeline Run

Torch sdk can get the latest pipeline run of the pipeline. with use of the latest pipeline run instance, user can continue ETL pipeline and add spans, jobs, events too. Hence, torch sdk has complete access on the torch pipeline service.

pipeline = torch_client.get_pipeline('monthly.reporting.pipeline')

#Datasource APIs

Torch SDK has full access on catalog APIs as well. Using torch sdk, one can create datasource and version it with associated assets, relations.

Create Datasource

Torch sdk has access to create or update existing datasource. Torch has support for virtual datasource as well for ML purpose and some non-virtual/real as well for example relational databases, file based databases et cetera. To create datasource, source type details are required. To get all source types supported in torch, use get_all_source_types() method.

from torch_sdk.models.datasource import CreateDataSource, SourceType

datasource = CreateDataSource(
    name='Feature_bag_datasource_sdk',
    sourceType=SourceType(21, 'FEATURE_BAG'),
    description='feature bag assembly creation using python sdk',
    isVirtual=True
)
datasource_response = torch_client.create_datasource(datasource)
Create New Version Of Datasource

Torch sdk can version the datasource as well. Torch sdk can initiate new version the datasource and return latest instance of it. It has also method to get current latest snapshot version.

# get data source
datasource_response = torch_client.get_datasource('Feature_bag_datasource')

# create new version of the datasource
new_snapshot_version = datasource_response.initialise_snapshot(uid='Habcfc38-9daa-4842-b008-f7fb3dd8439a')

# get current snapshot data
current_snapshot_version = datasource_response.get_current_snapshot()
Create Asset And Relations B/w Them

You can create/update assets and relations between them. With use of the torch sdk, user can create assets in datasource and can also define relations between assets. To get asset types supported in torch, use get_asset_types() method. Torch sdk has methods to get existing relations and assets in the given datasource.

from torch_sdk.models.create_asset import AssetMetadata

# get asset by id/uid
datasource_response = torch_client.get_datasource('Feature_bag_datasource')

# create assets
asset_1 = datasource_response.create_asset(uid='Feature_bag_datasource.feature_1',
                                            metadata=[AssetMetadata('STRING', 'abcd', 'pqr', 'sds')],
                                            asset_type_id=22,
                                            description='feature 1 asset.',
                                            name='car feature'
                                                )
asset_2 = datasource_response.create_asset(uid='Feature_bag_datasource.feature_2',
                                            metadata=[AssetMetadata('STRING', 'abcd', 'pqr', 'sds')],
                                            asset_type_id=22,
                                            description='feature asset 2',
                                            name='bike feature'
                                                )

# create asset relation
toAssetUUID = 'postgres-assembly-5450.ad_catalog.ad_catalog.qrtz_simple_triggers'
relationType = RelationType.SIBLING
asset_relation_1_to_2 = asset_1.create_asset_relation(relation_type=relationType, to_asset_uuid=toAssetUUID)

# get asset by id/uid
asset = datasource_response.get_asset(id=1)
asset = datasource_response.get_asset(uid='Feature_bag_datasource.feature_1')

Version Log

0.1.0 (14/06/2021)

  • Torch python sdk
  • Support for flow APIs and catalog APIs of the torch

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

torch_sdk-0.1.2.tar.gz (41.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page