Dagster IO managers and type handlers for databases
Project description
dagster-db
Dagster IO managers and type handlers for databases. Wraps the standard IO managers with useful functions that can be scpecific to each type handler, and provides better metadata out of the box.
- Apply custom generic transformations to ensure all outputs comply with database.
- Apply custom validation checks before deleting from / writing to the database.
- Add custom metadata.
Use polars, pandas or execute a jinja-templated SQL query on the database
with the custom SqlQuery class which builds dagsters powerful table slice
logic into an io-manager ready framework.
Use TypeHandlers out of the box, or extend to implement custom behaviours.
duckdb
Installation
uv add dagster-db[duckdb]
Definition
import dagster as dg
from dagster_db import build_custom_duckdb_io_manager
custom_io_manager = build_custom_duckdb_io_manager().configured({"database": "./.tmp/database.duckdb"})
defs = dg.Definitions(
...,
resources={"io_manager": custom_io_manager},
)
Usage
import dagster as dg
import polars as pl
from dagster_db import SqlQuery
@dg.asset
def my_asset(context: dg.AssetExecutionContext) -> pl.DataFrame:
return pl.DataFrame({"a": [1, 2, 3]})
@dg.asset
def my_asset_downstream(
context: dg.AssetExecutionContext,
my_asset: SqlQuery,
) -> SqlQuery:
return SqlQuery("SELECT *, a+1 AS b FROM {{ my_asset }}", my_asset)
Why should I use dagster-db instead of just querying via database resources?
Partitioned assets
If you have a partitioned asset, then when you use it in a downstream asset, it will need to be filtered for the partition we are running for (via the partition mapping). The IO manager already handles this using table slice functionality.
e.g. so if you have a date partitioned asset my_asset, when you create a SQL
query: SELECT * FROM {{ my_asset }} in a downstream asset, we get the
partition selection for free. The load_input methods can render this into
SELECT * FROM (SELECT * FROM my_asset WHERE partition_expr >= ... AND partition_expr < ...),
whereas we'd have to do this manually using the resources.
Standardised features and metadata
When you use any IO manager in dagster, dagster truncates the table for you, before inserting the records. Using database resources, you would have to make all of these database calls yourself.
@dg.asset(deps=[my_asset_upstream])
def my_asset_downstream(duckdb: DuckDbResource):
my_asset_downstream_query = "SELECT *, True AS new_col FROM my_asset_upstream"
duckdb.sql("DELETE FROM my_asset_downstream")
duckdb.sql(f"INSERT INTO my_asset_downstream ({my_asset_downstream_query})")
vs.
@dg.asset()
def my_asset_downstream(my_asset_upstream: SqlQuery):
return SqlQuery("SELECT *, True AS new_col FROM {{ my_asset_upstream }}", my_asset_upstream=my_asset_upstream)
You also get the opportunity to add features to your IO manager, such as adding useful metadata, primary key validation, etc. that apply to every asset without having to call manually within asset code in each asset.
Therefore, tt allows the continued separation of IO code and business logic which is such a great feature of dagster.
Different databases in different environments
Many workflows may consist of a duckdb database for local development, but bigquery or
postgresql for production.
These clients and resources have completely different method names and arguments.
The best place to handle these differences, would be in TypeHandlers, which would
clear your asset of further IO code.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_db-0.3.2.tar.gz.
File metadata
- Download URL: dagster_db-0.3.2.tar.gz
- Upload date:
- Size: 119.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04a8d136a0bfe28bf44ab4c01be231350d4da319181931755a08981e950ce55d
|
|
| MD5 |
77dff819a1ed9106c49c9fbbf6efb980
|
|
| BLAKE2b-256 |
d59f20933fed035449eb9595a695abadde2cecd362e42cbe1a834e815f07bdcb
|
File details
Details for the file dagster_db-0.3.2-py3-none-any.whl.
File metadata
- Download URL: dagster_db-0.3.2-py3-none-any.whl
- Upload date:
- Size: 16.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
865baa9cbb32d858989ba44555c30e0bba8d23573965808ef5e9e8315987a0aa
|
|
| MD5 |
799912956bcb9e7f0001a79b85d9ba66
|
|
| BLAKE2b-256 |
3e6c1a16321a844d3d64d7a555e5ff133c05145d597187029425afee47dfcf1b
|