An Airflow and Python SDK for working with the Astro Observe platform
Project description
observe-sdk
An Airflow and Python SDK for working with the Astro Observe platform
Installation
Add the following to requirements.txt:
astro-observe-sdk>=0.0.8
Usage
Metrics
The Observe SDK allows user to emit metrics during Airflow task execution to get tracked in the Observe backend. To do so, use the astro_observe_sdk.log_metric function like so:
import astro_observe_sdk as observe
# then, in the task
@task
def my_task():
observe.log_metric('my_metric', 42)
Capture Lineage from SQL Queries
The Observe SDK provides functionality for logging query executions during Airflow task runs, enabling the tracking of table/dataset lineage in the Observe backend.
The primary entry point is the astro_observe_sdk.log_query function.
This function captures lineage information, including inputs, outputs, and run metadata,
by parsing SQL query (provided explicitly by the user or retrieved via the provided database connector).
By parsing the SQL query and leveraging a query identifier, this function constructs the appropriate OpenLineage job and run facets.
It then emits OpenLineage pair (START and COMPLETE/FAIL) events directly associated with the corresponding Airflow task run,
allowing for detailed visibility into which queries were executed as part of each DAG and task execution.
The function can be invoked multiple times within a single task.
User can call log_query in several ways, depending on how much metadata is available.
Please refer to astro_observe_sdk.log_query function's docstring for a detailed guide and additional examples.
- Log using query metadata:
import astro_observe_sdk as observe
@task
def my_task():
observe.log_query(
query_job_namespace="bigquery",
query_text="SELECT a, b, c FROM users",
default_database="my_project",
default_schema="my_dataset",
query_id="bquxjob_69ed4f1_169ba1f5665",
)
- Log using just supported DB connector (Snowflake or Databricks cursor):
import astro_observe_sdk as observe
from databricks import sql
@task
def my_task():
conn = sql.connect(
server_hostname="adb-123.10.azuredatabricks.net",
access_token="secret-token",
http_path="/sql/1.0/warehouses/abc123",
)
cs = conn.cursor()
cs.execute("SELECT * FROM `test`.some_schema.table1;")
observe.log_query(
db_connector=cs,
# We try to retrieve the below from cursor object if not provided, but we highly recommend passing it explicitly
db_connector_extra_args={"host": "adb-123.10.azuredatabricks.net", "token": "secret-token"}
)
Capture Lineage from Datasets
The Observe SDK also provides functionality for logging dataset lineage directly, without requiring SQL query parsing.
The primary entry point is the astro_observe_sdk.log_lineage function.
This function accepts OpenLineage Dataset objects representing input and output datasets, and emits a RUNNING event
directly associated with the corresponding Airflow task run.
Please refer to astro_observe_sdk.log_lineage function's docstring for a detailed guide and additional examples.
- Simple lineage logging:
import astro_observe_sdk as observe
from openlineage.client.event_v2 import Dataset
@task
def my_task():
observe.log_lineage(
inputs=[
Dataset(namespace="snowflake://account", name="analytics.public.users", facets={}),
],
outputs=[
Dataset(namespace="snowflake://account", name="analytics.public.users_processed", facets={}),
],
)
- Lineage logging with facets (metadata):
import astro_observe_sdk as observe
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import schema_dataset
@task
def my_task():
schema_facet = schema_dataset.SchemaDatasetFacet(
fields=[
schema_dataset.SchemaDatasetFacetFields(name="id", type="INTEGER"),
schema_dataset.SchemaDatasetFacetFields(name="name", type="VARCHAR"),
]
)
observe.log_lineage(
inputs=[
Dataset(
namespace="snowflake://account",
name="analytics.public.users",
facets={"schema": schema_facet},
),
],
outputs=[
Dataset(
namespace="snowflake://account",
name="analytics.public.users_processed",
facets={"schema": schema_facet},
),
],
)
Choosing Between log_query and log_lineage
Both log_query and log_lineage enable lineage tracking, but they serve different use cases:
Use log_query when:
- You have SQL queries and want automatic parsing to extract dataset lineage.
- You're working with supported database connectors (Snowflake, Databricks) and want automatic metadata retrieval.
- You want to track query execution metadata (query ID, query_text, execution time, status).
Use log_lineage when:
- You already have dataset lineage information (e.g., from file operations, APIs, or custom logic).
- You're working with non-SQL data transformations or operations that don't involve SQL queries.
- You want full control over the exact datasets and metadata included in the lineage event.
In summary: log_query is ideal for SQL-based workflows where you want automatic parsing and complete lifecycle tracking,
while log_lineage is better for operations when you already have explicit dataset information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file astro_observe_sdk-0.0.11.tar.gz.
File metadata
- Download URL: astro_observe_sdk-0.0.11.tar.gz
- Upload date:
- Size: 279.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e77ad45a363798a27c18dd9c60b0b5642d86a77ab81341ef91ed77016db26ed6
|
|
| MD5 |
deb823bc283b5b6fcb34e7f98e7905ab
|
|
| BLAKE2b-256 |
e7f233a29dcf9d245f812500d7d3bef77241dc5b93a769470b921bebd2d7b876
|
Provenance
The following attestation bundles were made for astro_observe_sdk-0.0.11.tar.gz:
Publisher:
release.yml on astronomer/observe-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
astro_observe_sdk-0.0.11.tar.gz -
Subject digest:
e77ad45a363798a27c18dd9c60b0b5642d86a77ab81341ef91ed77016db26ed6 - Sigstore transparency entry: 829027446
- Sigstore integration time:
-
Permalink:
astronomer/observe-sdk@d7ea84214ea0e32287994515873d2b6e5c5c4aa8 -
Branch / Tag:
refs/tags/v0.0.11 - Owner: https://github.com/astronomer
-
Access:
internal
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@d7ea84214ea0e32287994515873d2b6e5c5c4aa8 -
Trigger Event:
release
-
Statement type:
File details
Details for the file astro_observe_sdk-0.0.11-py3-none-any.whl.
File metadata
- Download URL: astro_observe_sdk-0.0.11-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6cd89d2a7664ffb6c123adbd812eda8adc6d0563e51bb74bcd7ca1cfe579428
|
|
| MD5 |
a3a4c103a6d52cc253ffcb2e63cf48f1
|
|
| BLAKE2b-256 |
9ee3abb60b8fe359b7a4128896994c5a2dd5a470a51b7f25e5840a8985d8cf50
|
Provenance
The following attestation bundles were made for astro_observe_sdk-0.0.11-py3-none-any.whl:
Publisher:
release.yml on astronomer/observe-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
astro_observe_sdk-0.0.11-py3-none-any.whl -
Subject digest:
b6cd89d2a7664ffb6c123adbd812eda8adc6d0563e51bb74bcd7ca1cfe579428 - Sigstore transparency entry: 829027449
- Sigstore integration time:
-
Permalink:
astronomer/observe-sdk@d7ea84214ea0e32287994515873d2b6e5c5c4aa8 -
Branch / Tag:
refs/tags/v0.0.11 - Owner: https://github.com/astronomer
-
Access:
internal
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@d7ea84214ea0e32287994515873d2b6e5c5c4aa8 -
Trigger Event:
release
-
Statement type: