Skip to main content

An Airflow and Python SDK for working with the Astro Observe platform

Project description

observe-sdk

An Airflow and Python SDK for working with the Astro Observe platform

Installation

Add the following to requirements.txt:

astro-observe-sdk>=0.0.8

Usage

Metrics

The Observe SDK allows user to emit metrics during Airflow task execution to get tracked in the Observe backend. To do so, use the astro_observe_sdk.log_metric function like so:

import astro_observe_sdk as observe

# then, in the task
@task
def my_task():
    observe.log_metric('my_metric', 42)

Capture Lineage from SQL Queries

The Observe SDK provides functionality for logging query executions during Airflow task runs, enabling the tracking of table/dataset lineage in the Observe backend.

The primary entry point is the astro_observe_sdk.log_query function. This function captures lineage information, including inputs, outputs, and run metadata, by parsing SQL query (provided explicitly by the user or retrieved via the provided database connector). By parsing the SQL query and leveraging a query identifier, this function constructs the appropriate OpenLineage job and run facets. It then emits OpenLineage pair (START and COMPLETE/FAIL) events directly associated with the corresponding Airflow task run, allowing for detailed visibility into which queries were executed as part of each DAG and task execution. The function can be invoked multiple times within a single task.

User can call log_query in several ways, depending on how much metadata is available.

Please refer to astro_observe_sdk.log_query function's docstring for a detailed guide and additional examples.

  1. Log using query metadata:
import astro_observe_sdk as observe

@task
def my_task():
    observe.log_query(
        query_job_namespace="bigquery",
        query_text="SELECT a, b, c FROM users",
        default_database="my_project",
        default_schema="my_dataset",
        query_id="bquxjob_69ed4f1_169ba1f5665",
    )
  1. Log using just supported DB connector (Snowflake or Databricks cursor):
import astro_observe_sdk as observe
from databricks import sql

@task
def my_task():
    conn = sql.connect(
        server_hostname="adb-123.10.azuredatabricks.net",
        access_token="secret-token",
        http_path="/sql/1.0/warehouses/abc123",
    )
    cs = conn.cursor()
    cs.execute("SELECT * FROM `test`.some_schema.table1;")
    
    observe.log_query(
        db_connector=cs,
        # We try to retrieve the below from cursor object if not provided, but we highly recommend passing it explicitly
        db_connector_extra_args={"host": "adb-123.10.azuredatabricks.net", "token": "secret-token"}
    )

Capture Lineage from Datasets

The Observe SDK also provides functionality for logging dataset lineage directly, without requiring SQL query parsing. The primary entry point is the astro_observe_sdk.log_lineage function. This function accepts OpenLineage Dataset objects representing input and output datasets, and emits a RUNNING event directly associated with the corresponding Airflow task run.

Please refer to astro_observe_sdk.log_lineage function's docstring for a detailed guide and additional examples.

  1. Simple lineage logging:
import astro_observe_sdk as observe
from openlineage.client.event_v2 import Dataset

@task
def my_task():
    observe.log_lineage(
        inputs=[
            Dataset(namespace="snowflake://account", name="analytics.public.users", facets={}),
        ],
        outputs=[
            Dataset(namespace="snowflake://account", name="analytics.public.users_processed", facets={}),
        ],
    )
  1. Lineage logging with facets (metadata):
import astro_observe_sdk as observe
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import schema_dataset

@task
def my_task():
    schema_facet = schema_dataset.SchemaDatasetFacet(
        fields=[
            schema_dataset.SchemaDatasetFacetFields(name="id", type="INTEGER"),
            schema_dataset.SchemaDatasetFacetFields(name="name", type="VARCHAR"),
        ]
    )
    observe.log_lineage(
        inputs=[
            Dataset(
                namespace="snowflake://account",
                name="analytics.public.users",
                facets={"schema": schema_facet},
            ),
        ],
        outputs=[
            Dataset(
                namespace="snowflake://account",
                name="analytics.public.users_processed",
                facets={"schema": schema_facet},
            ),
        ],
    )

Choosing Between log_query and log_lineage

Both log_query and log_lineage enable lineage tracking, but they serve different use cases:

Use log_query when:

  • You have SQL queries and want automatic parsing to extract dataset lineage.
  • You're working with supported database connectors (Snowflake, Databricks) and want automatic metadata retrieval.
  • You want to track query execution metadata (query ID, query_text, execution time, status).

Use log_lineage when:

  • You already have dataset lineage information (e.g., from file operations, APIs, or custom logic).
  • You're working with non-SQL data transformations or operations that don't involve SQL queries.
  • You want full control over the exact datasets and metadata included in the lineage event.

In summary: log_query is ideal for SQL-based workflows where you want automatic parsing and complete lifecycle tracking, while log_lineage is better for operations when you already have explicit dataset information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astro_observe_sdk-0.0.10.tar.gz (278.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

astro_observe_sdk-0.0.10-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file astro_observe_sdk-0.0.10.tar.gz.

File metadata

  • Download URL: astro_observe_sdk-0.0.10.tar.gz
  • Upload date:
  • Size: 278.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for astro_observe_sdk-0.0.10.tar.gz
Algorithm Hash digest
SHA256 3d2ad3e617f53649f332516244f67ee4d667de1211f92fe19ed1543e2947a1f8
MD5 08c2ce65fa7b45e1f9d8c54854aa097a
BLAKE2b-256 50726a9444ca7d82babd0c2aef7ccad047acd1542f3d522ca3b96a8fa72e71fe

See more details on using hashes here.

Provenance

The following attestation bundles were made for astro_observe_sdk-0.0.10.tar.gz:

Publisher: release.yml on astronomer/observe-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file astro_observe_sdk-0.0.10-py3-none-any.whl.

File metadata

File hashes

Hashes for astro_observe_sdk-0.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 fabb1987e14c8fef5f9aa0f4a79ec78969b9d84a89cfa8b9156cfa33c983bad5
MD5 b00a10880b98252336ab758e481e74e5
BLAKE2b-256 8332ff5dd81e2dd6e027da39bf17811a26181826181764cd48215d860662d249

See more details on using hashes here.

Provenance

The following attestation bundles were made for astro_observe_sdk-0.0.10-py3-none-any.whl:

Publisher: release.yml on astronomer/observe-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page