Skip to main content

Provider for using Tecton with Airflow.

Project description

[preview] airflow-tecton

Contains an Apache Airflow provider that allows you to author Tecton workflows inside Airflow.

Two basic capabilities are supported:

  1. Submitting materialization jobs
  2. Waiting for Feature View/Feature Service data to materialize.

Changelog

  • 0.1.1 Improved Tecton connection stability by adding retries for requests

  • 0.1.0 Added 2 new operators to support triggering Feature Table ingestion jobs

  • 0.0.3 Added support for allow_overwrite setting in the operators

  • 0.0.2 Removed type annotations that caused compatibility issues with Airflow versions below 2.4.

  • 0.0.1 Initial release

Installation and Configuration

Installation

You can install this package via pip install airflow-tecton. Note that this requires apache-airflow>=2.0.

For a deployed environment, add airflow-tecton to your requirements.txt or wherever you configure installed packages.

You can confirm a successful installation by running airflow providers list, which should show airflow-tecton in the list.

Configuration

This provider uses operators that interface with Tecton's API, thus it requires you set up Airflow to connect to Tecton.

You can add a new connection by going to Connections under the Admin tab in the Airflow UI. From there, hit the + button and select Tecton in the connection type dropdown. From there, you can configure the connection name, Tecton URL, and Tecton API key. Note that the default connection name is tecton_default, so we recommend starting with this as a connection name to minimize configuration.

Usage

Configuring a Feature View for manual triggering

A BatchFeatureView and a StreamFeatureView can be configured for manual triggering only. To do so, set batch_trigger=BatchTriggerType.MANUAL. When set to manual, Tecton will not automatically create any batch materialization jobs for the Feature View. As of Tecton 0.6, any FeatureView can be manually triggered, but this is recommended mostly for manual usage.

For a StreamFeatureView, only batch materialization job scheduling will be impacted by the batch_trigger setting. Streaming materialization job scheduling will still be managed by Tecton.

Here’s an example of a BatchFeatureView configured for manual triggering.

from tecton import batch_feature_view, FilteredSource, Aggregation, BatchTriggerType
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta

@batch_feature_view(
    sources=[FilteredSource(transactions_batch)],
    entities=[user],
    mode='spark_sql',
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(column='transaction', function='count', time_window=timedelta(days=1)),
        Aggregation(column='transaction', function='count', time_window=timedelta(days=30)),
        Aggregation(column='transaction', function='count', time_window=timedelta(days=90))
    ],
    online=False,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    tags={'release': 'production'},
    owner='matt@tecton.ai',
    description='User transaction totals over a series of time windows, updated daily.',
    batch_trigger=BatchTriggerType.MANUAL # Use manual triggers
)
def user_transaction_counts(transactions):
    return f'''
        SELECT
            user_id,
            1 as transaction,
            timestamp
        FROM
            {transactions}
        '''

If a Data Source input to the Feature View has data_delay set, then that delay will still be factored in to constructing training data sets but does not impact when the job can be triggered with the materialization API.

Materialization Job Submission

There are two methods available to submit materialization jobs:

  1. TectonTriggerOperator: This triggers a materialization job for a Feature View. Tecton will retry any failing jobs automatically. Note that completion of this operator only means submission succeeded. To wait for completion, you must combine this with TectonSensor.
  2. TectonJobOperator: This triggers a materialization job for a Feature View with no retries. Additionally, when this operator is terminated, it will make a best effort to clean up the execution of the materialization job. Using this operator allows you to use standard Airflow keyword arguments to configure retry behavior. Additionally, this operator is synchronous, meaning that when the operator has succeeded, the underlying job has succeeded.

Both of these require the following arguments:

  1. workspace - the workspace name of the Feature View you intend to materialize
  2. feature_view - the name of the Feature View you intend to materialize
  3. online - whether the job should materialize to the online store. This requires that your FeatureView also has online materialization enabled.
  4. offline - whether the job should materialize to the offline store. This requires that your FeatureView also has offline materialization enabled.

The time interval of the materialization job is configured automatically using Airflow templates. By default, it is from the data_interval_start to the data_interval_end of your DAG run. These can overridden if necessary.

Example Usage

from airflow_tecton import TectonJobOperator, TectonTriggerOperator

TectonJobOperator(
    task_id="tecton_job",
    workspace="my_workspace",
    feature_view="my_fv",
    online=False,
    offline=True,
    retries=3,
)


TectonTriggerOperator(
    task_id="trigger_tecton",
    workspace="my_workspace",
    feature_view="my_fv",
    online=True,
    offline=True,
)

Waiting For Materialization

TectonSensor

This enables you to wait for Materialization to complete for both Feature Views and Feature Services. Common uses are for monitoring as well as kicking off a training job after daily materialization completes.

Example Usage

from airflow_tecton import TectonSensor

TectonSensor(
    task_id="wait_for_fs_online",
    workspace="my_workspace",
    feature_service="my_fs",
    online=True,
    offline=False,
)

TectonSensor(
    task_id="wait_for_fv",
    workspace="my_workspace",
    feature_view="my_fv",
    online=True,
    offline=True,
)

Examples

See example dags here.

Development

Pre-commit

This repo uses pre-commit. Run pre-commit install in the repo root to configure pre-commit hooks. Pre-commit hooks take care of running unit tests as well as linting files.

Run unit tests manually

Run python -m pytest tests/ in the repo root.

License

This is licensed with the Apache 2.0 License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_tecton-0.1.1.tar.gz (18.1 kB view details)

Uploaded Source

Built Distribution

airflow_tecton-0.1.1-py3-none-any.whl (25.9 kB view details)

Uploaded Python 3

File details

Details for the file airflow_tecton-0.1.1.tar.gz.

File metadata

  • Download URL: airflow_tecton-0.1.1.tar.gz
  • Upload date:
  • Size: 18.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.11

File hashes

Hashes for airflow_tecton-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e60d29cf0e6a45f6ecc0e30536590e26f6eaa95fdaf3bf2b5a5519a53870db0a
MD5 5c45d4af1e9d679652bf9edad7606229
BLAKE2b-256 f9f7391b532c3a18078899fe34e67fa07a36c89da275ed75a00e283ae5306404

See more details on using hashes here.

File details

Details for the file airflow_tecton-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_tecton-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 330aee9e199aba0456aef988fb3a0d3c363ac946e602a1743df1f8e1695099a0
MD5 43cdbe873a94cdaff3058ec64960c79d
BLAKE2b-256 82b02dea460a7c6f5823fac48885e0ca3f7c99d6eea9dc94e859afc3d3b82f46

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page