Skip to main content

Dagster IO managers and type handlers for databases

Project description

dagster-db

Dagster IO managers and type handlers for databases. Wraps the standard IO managers with useful functions that can be scpecific to each type handler, and provides better metadata out of the box.

  • Apply custom generic transformations to ensure all outputs comply with database.
  • Apply custom validation checks before deleting from / writing to the database.
  • Add custom metadata.

Use polars, pandas or execute a jinja-templated SQL query on the database with the custom SqlQuery class which builds dagsters powerful table slice logic into an io-manager ready framework.

Use TypeHandlers out of the box, or extend to implement custom behaviours.

duckdb

Installation

uv add dagster-db[duckdb]

Definition

import dagster as dg
from dagster_db import build_custom_duckdb_io_manager
custom_io_manager = build_custom_duckdb_io_manager().configured({"database": "./.tmp/database.duckdb"})

defs = dg.Definitions(
    ...,
    resources={"io_manager": custom_io_manager},
)

Usage

import dagster as dg
import polars as pl
from dagster_db import SqlQuery

@dg.asset
def my_asset(context: dg.AssetExecutionContext) -> pl.DataFrame:
    return pl.DataFrame({"a": [1, 2, 3]})

@dg.asset
def my_asset_downstream(
    context: dg.AssetExecutionContext,
    my_asset: SqlQuery,
) -> SqlQuery:
    return SqlQuery("SELECT *, a+1 AS b FROM {{ my_asset }}", my_asset)

Why should I use dagster-db instead of just querying via database resources?

Partitioned assets

If you have a partitioned asset, then when you use it in a downstream asset, it will need to be filtered for the partition we are running for (via the partition mapping). The IO manager already handles this using table slice functionality.

e.g. so if you have a date partitioned asset my_asset, when you create a SQL query: SELECT * FROM {{ my_asset }} in a downstream asset, we get the partition selection for free. The load_input methods can render this into SELECT * FROM (SELECT * FROM my_asset WHERE partition_expr >= ... AND partition_expr < ...), whereas we'd have to do this manually using the resources.

Standardised features and metadata

When you use any IO manager in dagster, dagster truncates the table for you, before inserting the records. Using database resources, you would have to make all of these database calls yourself.

@dg.asset(deps=[my_asset_upstream])
def my_asset_downstream(duckdb: DuckDbResource):
    my_asset_downstream_query = "SELECT *, True AS new_col FROM my_asset_upstream"
    duckdb.sql("DELETE FROM my_asset_downstream")
    duckdb.sql(f"INSERT INTO my_asset_downstream ({my_asset_downstream_query})")

vs.

@dg.asset()
def my_asset_downstream(my_asset_upstream: SqlQuery):
    return SqlQuery("SELECT *, True AS new_col FROM {{ my_asset_upstream }}", my_asset_upstream=my_asset_upstream)

You also get the opportunity to add features to your IO manager, such as adding useful metadata, primary key validation, etc. that apply to every asset without having to call manually within asset code in each asset.

Therefore, tt allows the continued separation of IO code and business logic which is such a great feature of dagster.

Different databases in different environments

Many workflows may consist of a duckdb database for local development, but bigquery or postgresql for production. These clients and resources have completely different method names and arguments. The best place to handle these differences, would be in TypeHandlers, which would clear your asset of further IO code.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_db-0.3.2.tar.gz (119.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_db-0.3.2-py3-none-any.whl (16.9 kB view details)

Uploaded Python 3

File details

Details for the file dagster_db-0.3.2.tar.gz.

File metadata

  • Download URL: dagster_db-0.3.2.tar.gz
  • Upload date:
  • Size: 119.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.9

File hashes

Hashes for dagster_db-0.3.2.tar.gz
Algorithm Hash digest
SHA256 04a8d136a0bfe28bf44ab4c01be231350d4da319181931755a08981e950ce55d
MD5 77dff819a1ed9106c49c9fbbf6efb980
BLAKE2b-256 d59f20933fed035449eb9595a695abadde2cecd362e42cbe1a834e815f07bdcb

See more details on using hashes here.

File details

Details for the file dagster_db-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: dagster_db-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 16.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.9

File hashes

Hashes for dagster_db-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 865baa9cbb32d858989ba44555c30e0bba8d23573965808ef5e9e8315987a0aa
MD5 799912956bcb9e7f0001a79b85d9ba66
BLAKE2b-256 3e6c1a16321a844d3d64d7a555e5ff133c05145d597187029425afee47dfcf1b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page