Skip to main content

An Airflow and Python SDK for working with the Astro Observe platform

Project description

observe-sdk

An Airflow and Python SDK for working with the Astro Observe platform

Installation

Add the following to requirements.txt:

astro-observe-sdk>=0.0.8

Usage

Metrics

The Observe SDK allows user to emit metrics during Airflow task execution to get tracked in the Observe backend. To do so, use the astro_observe_sdk.log_metric function like so:

import astro_observe_sdk as observe

# then, in the task
@task
def my_task():
    observe.log_metric('my_metric', 42)

Capture Lineage from SQL Queries

The Observe SDK provides functionality for logging query executions during Airflow task runs, enabling the tracking of table/dataset lineage in the Observe backend.

The primary entry point is the astro_observe_sdk.log_query function. This function captures lineage information, including inputs, outputs, and run metadata, by parsing SQL query (provided explicitly by the user or retrieved via the provided database connector). By parsing the SQL query and leveraging a query identifier, this function constructs the appropriate OpenLineage job and run facets. It then emits OpenLineage pair (START and COMPLETE/FAIL) events directly associated with the corresponding Airflow task run, allowing for detailed visibility into which queries were executed as part of each DAG and task execution. The function can be invoked multiple times within a single task.

User can call log_query in several ways, depending on how much metadata is available.

Please refer to astro_observe_sdk.log_query function's docstring for a detailed guide and additional examples.

  1. Log using query metadata:
import astro_observe_sdk as observe

@task
def my_task():
    observe.log_query(
        query_job_namespace="bigquery",
        query_text="SELECT a, b, c FROM users",
        default_database="my_project",
        default_schema="my_dataset",
        query_id="bquxjob_69ed4f1_169ba1f5665",
    )
  1. Log using just supported DB connector (Snowflake or Databricks cursor):
import astro_observe_sdk as observe
from databricks import sql

@task
def my_task():
    conn = sql.connect(
        server_hostname="adb-123.10.azuredatabricks.net",
        access_token="secret-token",
        http_path="/sql/1.0/warehouses/abc123",
    )
    cs = conn.cursor()
    cs.execute("SELECT * FROM `test`.some_schema.table1;")
    
    observe.log_query(
        db_connector=cs,
        # We try to retrieve the below from cursor object if not provided, but we highly recommend passing it explicitly
        db_connector_extra_args={"host": "adb-123.10.azuredatabricks.net", "token": "secret-token"}
    )

Capture Lineage from Datasets

The Observe SDK also provides functionality for logging dataset lineage directly, without requiring SQL query parsing. The primary entry point is the astro_observe_sdk.log_lineage function. This function accepts OpenLineage Dataset objects representing input and output datasets, and emits a RUNNING event directly associated with the corresponding Airflow task run.

Please refer to astro_observe_sdk.log_lineage function's docstring for a detailed guide and additional examples.

  1. Simple lineage logging:
import astro_observe_sdk as observe
from openlineage.client.event_v2 import Dataset

@task
def my_task():
    observe.log_lineage(
        inputs=[
            Dataset(namespace="snowflake://account", name="analytics.public.users", facets={}),
        ],
        outputs=[
            Dataset(namespace="snowflake://account", name="analytics.public.users_processed", facets={}),
        ],
    )
  1. Lineage logging with facets (metadata):
import astro_observe_sdk as observe
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import schema_dataset

@task
def my_task():
    schema_facet = schema_dataset.SchemaDatasetFacet(
        fields=[
            schema_dataset.SchemaDatasetFacetFields(name="id", type="INTEGER"),
            schema_dataset.SchemaDatasetFacetFields(name="name", type="VARCHAR"),
        ]
    )
    observe.log_lineage(
        inputs=[
            Dataset(
                namespace="snowflake://account",
                name="analytics.public.users",
                facets={"schema": schema_facet},
            ),
        ],
        outputs=[
            Dataset(
                namespace="snowflake://account",
                name="analytics.public.users_processed",
                facets={"schema": schema_facet},
            ),
        ],
    )

Choosing Between log_query and log_lineage

Both log_query and log_lineage enable lineage tracking, but they serve different use cases:

Use log_query when:

  • You have SQL queries and want automatic parsing to extract dataset lineage.
  • You're working with supported database connectors (Snowflake, Databricks) and want automatic metadata retrieval.
  • You want to track query execution metadata (query ID, query_text, execution time, status).

Use log_lineage when:

  • You already have dataset lineage information (e.g., from file operations, APIs, or custom logic).
  • You're working with non-SQL data transformations or operations that don't involve SQL queries.
  • You want full control over the exact datasets and metadata included in the lineage event.

In summary: log_query is ideal for SQL-based workflows where you want automatic parsing and complete lifecycle tracking, while log_lineage is better for operations when you already have explicit dataset information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astro_observe_sdk-0.0.11.tar.gz (279.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

astro_observe_sdk-0.0.11-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file astro_observe_sdk-0.0.11.tar.gz.

File metadata

  • Download URL: astro_observe_sdk-0.0.11.tar.gz
  • Upload date:
  • Size: 279.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for astro_observe_sdk-0.0.11.tar.gz
Algorithm Hash digest
SHA256 e77ad45a363798a27c18dd9c60b0b5642d86a77ab81341ef91ed77016db26ed6
MD5 deb823bc283b5b6fcb34e7f98e7905ab
BLAKE2b-256 e7f233a29dcf9d245f812500d7d3bef77241dc5b93a769470b921bebd2d7b876

See more details on using hashes here.

Provenance

The following attestation bundles were made for astro_observe_sdk-0.0.11.tar.gz:

Publisher: release.yml on astronomer/observe-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file astro_observe_sdk-0.0.11-py3-none-any.whl.

File metadata

File hashes

Hashes for astro_observe_sdk-0.0.11-py3-none-any.whl
Algorithm Hash digest
SHA256 b6cd89d2a7664ffb6c123adbd812eda8adc6d0563e51bb74bcd7ca1cfe579428
MD5 a3a4c103a6d52cc253ffcb2e63cf48f1
BLAKE2b-256 9ee3abb60b8fe359b7a4128896994c5a2dd5a470a51b7f25e5840a8985d8cf50

See more details on using hashes here.

Provenance

The following attestation bundles were made for astro_observe_sdk-0.0.11-py3-none-any.whl:

Publisher: release.yml on astronomer/observe-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page