Skip to main content

Python SDK for Bruin CLI — query databases, parse context, and access connections with zero boilerplate.

Project description

Bruin Python SDK

The official Python SDK for Bruin CLI. Query databases, access connections, and read pipeline context — all with zero boilerplate.

from bruin import query, get_connection, context

# One-liner: query any database Bruin manages
df = query("SELECT * FROM users WHERE created_at > '{{start_date}}'")

# Access pipeline context
print(context.start_date)    # datetime.date(2024, 6, 1)
print(context.pipeline)      # "my_pipeline"
print(context.asset_name)    # "my_asset"

# Get a typed database client
conn = get_connection("my_bigquery")
client = conn.client  # google.cloud.bigquery.Client, ready to use

Installation

Add bruin-sdk to the requirements.txt that sits next to your Python assets:

bruin-sdk
pandas

For specific database connections, install the corresponding extras:

bruin-sdk[bigquery]     # Google BigQuery
bruin-sdk[snowflake]    # Snowflake
bruin-sdk[postgres]     # PostgreSQL / Redshift
bruin-sdk[redshift]     # Redshift (alias for postgres extra)
bruin-sdk[mssql]        # Microsoft SQL Server
bruin-sdk[mysql]        # MySQL
bruin-sdk[duckdb]       # DuckDB
bruin-sdk[sheets]       # Google Sheets (for GCP connections)
bruin-sdk[all]          # Everything

Quick Start

Before (manual boilerplate)

""" @bruin
name: my_asset
secrets:
    - key: bigquery_conn
@bruin """

import os
import json
from google.cloud import bigquery

# Parse connection JSON from env var
raw = json.loads(os.environ["bigquery_conn"])
sa_info = json.loads(raw["service_account_json"])

# Create client manually
client = bigquery.Client.from_service_account_info(
    sa_info, project=raw["project_id"]
)

# Execute query
start = os.environ["BRUIN_START_DATE"]
df = client.query(f"SELECT * FROM users WHERE dt >= '{start}'").to_dataframe()

After (with SDK)

""" @bruin
name: my_asset
connection: bigquery_conn
@bruin """

from bruin import query, context

df = query(f"SELECT * FROM users WHERE dt >= '{context.start_date}'")

API Reference

context

A module-level object that provides access to all BRUIN_* environment variables as properly typed Python values. Each property reads the env var fresh on every access — no caching, no stale values.

from bruin import context
Property Type Env Var Description
context.start_date date | None BRUIN_START_DATE Pipeline run start date
context.end_date date | None BRUIN_END_DATE Pipeline run end date
context.start_datetime datetime | None BRUIN_START_DATETIME Start date with time
context.end_datetime datetime | None BRUIN_END_DATETIME End date with time
context.execution_date date | None BRUIN_EXECUTION_DATE Execution date
context.run_id str | None BRUIN_RUN_ID Unique run identifier
context.pipeline str | None BRUIN_PIPELINE Pipeline name
context.asset_name str | None BRUIN_ASSET Current asset name
context.connection str | None BRUIN_CONNECTION Asset's default connection
context.is_full_refresh bool BRUIN_FULL_REFRESH True when --full-refresh flag is set
context.vars dict BRUIN_VARS Pipeline variables (types preserved from JSON Schema)

All properties return None when the corresponding env var is missing (except is_full_refresh which returns False, and vars which returns {}).

from bruin import context

# Dates
print(context.start_date)       # datetime.date(2024, 6, 1)
print(context.end_date)         # datetime.date(2024, 6, 2)

# Pipeline variables (types preserved from pipeline.yml JSON Schema)
segment = context.vars["segment"]     # str: "enterprise"
horizon = context.vars["horizon"]     # int: 30
cohorts = context.vars["cohorts"]     # list[dict]

# Conditional logic
if context.is_full_refresh:
    df = query("SELECT * FROM users")
else:
    df = query(f"SELECT * FROM users WHERE dt >= '{context.start_date}'")

query(sql, connection=None)

Execute SQL and return results.

from bruin import query

Parameters:

Parameter Type Default Description
sql str (required) SQL statement to execute
connection str | None None Connection name. When None, uses the asset's default connection (BRUIN_CONNECTION)

Returns: pandas.DataFrame for data-returning statements (SELECT, WITH, SHOW, DESCRIBE, EXPLAIN), None for DDL/DML (CREATE, INSERT, UPDATE, DELETE, DROP, etc.).

# Uses the asset's default connection (from the `connection:` field in asset definition)
df = query("SELECT * FROM users")

# Explicit connection name
df = query("SELECT * FROM users", connection="my_bigquery")

# DDL/DML returns None
query("CREATE TABLE temp_users AS SELECT * FROM users")
query("INSERT INTO audit_log VALUES ('ran_asset', NOW())")

# Works with any supported database
df_bq = query("SELECT * FROM users", connection="my_bigquery")
df_sf = query("SELECT * FROM users", connection="my_snowflake")
df_pg = query("SELECT * FROM users", connection="my_postgres")

Every query is automatically annotated with @bruin.config metadata for observability and cost tracking.


get_connection(name)

Get a typed connection object with a lazy database client.

from bruin import get_connection

Parameters:

Parameter Type Description
name str Connection name as defined in .bruin.yml and injected via secrets

Returns: Connection or GCPConnection depending on the connection type.

conn = get_connection("my_bigquery")
conn.name    # "my_bigquery"
conn.type    # "google_cloud_platform"
conn.raw     # dict — the parsed connection JSON
conn.client  # Lazy-initialized database client

Connection types

Type .client returns Install extra
google_cloud_platform bigquery.Client bruin-sdk[bigquery]
snowflake snowflake.connector.Connection bruin-sdk[snowflake]
postgres psycopg2.connection bruin-sdk[postgres]
redshift psycopg2.connection bruin-sdk[redshift]
mssql pymssql.Connection bruin-sdk[mssql]
mysql mysql.connector.Connection bruin-sdk[mysql]
duckdb duckdb.DuckDBPyConnection bruin-sdk[duckdb]
generic N/A (raises error)

Client creation is lazy — the actual database connection is only established when .client is first accessed.

GCP connections

GCP connections have extra methods since one connection can access multiple Google services:

conn = get_connection("my_gcp")

# BigQuery (most common — also available as .client)
bq_client = conn.bigquery()
df = bq_client.query("SELECT 1").to_dataframe()

# Google Sheets
sheets_client = conn.sheets()  # requires bruin-sdk[sheets]

# Cloud Storage
gcs_client = conn.storage()  # requires google-cloud-storage

# Raw credentials for any Google API
creds = conn.credentials  # google.oauth2.Credentials

Generic connections

Generic connections hold a raw string value (like an API key or webhook URL). They don't have a database client:

conn = get_connection("slack_webhook")
conn.type    # "generic"
conn.raw     # "https://hooks.slack.com/services/T00/B00/xxx"
conn.client  # raises ConnectionTypeError

Connection.query(sql)

Connections also have a .query() method — an alternative to the top-level query():

conn = get_connection("my_bigquery")

# These are equivalent:
df = conn.query("SELECT * FROM users")
df = query("SELECT * FROM users", connection="my_bigquery")

Same return behavior: DataFrame for SELECT, None for DDL/DML.


Exceptions

All SDK exceptions inherit from BruinError:

from bruin.exceptions import (
    BruinError,              # Base class
    ConnectionNotFoundError, # Connection name not found or env var missing
    ConnectionParseError,    # Invalid JSON in connection env var
    ConnectionTypeError,     # Unsupported or generic connection type
    QueryError,              # SQL execution failed
)
try:
    df = query("SELECT * FROM users", connection="missing")
except ConnectionNotFoundError as e:
    print(e)
    # Connection 'missing' not found. Available connections: my_bigquery, my_snowflake.

Missing optional dependencies give clear install instructions:

conn = get_connection("my_snowflake")
conn.client
# ImportError: Install bruin-sdk[snowflake] to use Snowflake connections:
#   pip install 'bruin-sdk[snowflake]'

Asset Setup

The SDK reads connection data from environment variables that Bruin injects. To make a connection available in your Python asset, use the secrets key in your asset definition:

""" @bruin
name: my_asset
secrets:
    - key: my_bigquery
@bruin """

from bruin import get_connection

conn = get_connection("my_bigquery")

For the default connection (used by query() when no connection argument is given), set the connection field:

""" @bruin
name: my_asset
connection: my_bigquery
secrets:
    - key: my_bigquery
@bruin """

from bruin import query

# Uses my_bigquery automatically
df = query("SELECT * FROM users")

Examples

Incremental load with date filtering

""" @bruin
name: analytics.daily_events
connection: my_bigquery
secrets:
    - key: my_bigquery
@bruin """

from bruin import query, context

if context.is_full_refresh:
    df = query("SELECT * FROM raw.events")
else:
    df = query(f"""
        SELECT * FROM raw.events
        WHERE event_date BETWEEN '{context.start_date}' AND '{context.end_date}'
    """)

print(f"Loaded {len(df)} events")

Cross-database ETL

""" @bruin
name: sync.postgres_to_bigquery
secrets:
    - key: my_postgres
    - key: my_bigquery
@bruin """

from bruin import query, get_connection

# Read from Postgres
df = query("SELECT * FROM users WHERE active = true", connection="my_postgres")

# Write to BigQuery
bq = get_connection("my_bigquery")
df.to_gbq(
    "staging.active_users",
    project_id=bq.raw["project_id"],
    credentials=bq.credentials,
    if_exists="replace",
)

Using pipeline variables

# pipeline.yml
name: marketing
variables:
  segment:
    type: string
    default: "enterprise"
  lookback_days:
    type: integer
    default: 30
""" @bruin
name: marketing.segment_report
connection: my_snowflake
secrets:
    - key: my_snowflake
@bruin """

from bruin import query, context

segment = context.vars["segment"]
lookback = context.vars["lookback_days"]

df = query(f"""
    SELECT * FROM customers
    WHERE segment = '{segment}'
    AND created_at >= DATEADD(day, -{lookback}, CURRENT_DATE())
""")

print(f"Found {len(df)} {segment} customers in last {lookback} days")

DDL operations

""" @bruin
name: setup.create_tables
connection: my_postgres
secrets:
    - key: my_postgres
@bruin """

from bruin import query

# DDL returns None
query("CREATE TABLE IF NOT EXISTS audit_log (event TEXT, ts TIMESTAMP)")
query("INSERT INTO audit_log VALUES ('setup_complete', NOW())")

# SELECT returns DataFrame
df = query("SELECT COUNT(*) as cnt FROM audit_log")
print(f"Audit log has {df['cnt'][0]} entries")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bruin_sdk-0.1.0.tar.gz (165.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bruin_sdk-0.1.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file bruin_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: bruin_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 165.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for bruin_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4c84834c451e643c0291e6b999c6f3b31d302ebe562ab88aa8f016a7d288ca4e
MD5 ae1a02c58adf3aaa11bfd74cf5a63276
BLAKE2b-256 774fb72cddd15c4ca21299f18ad3fdebffa3cf89abb44aa66688c55d873dad00

See more details on using hashes here.

File details

Details for the file bruin_sdk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: bruin_sdk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for bruin_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5257ffa51c5038864ce7711212fe9a5775efb87a9323587dfee03add25feba84
MD5 589980be586895354d330f26a11ee646
BLAKE2b-256 4b73c9c30d2d6d6b6b2b991928fe8fc4000041877a0f8eb17cfc4d834d3069e2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page