DuckDB (duckdb.org) provider for Apache Airflow
Project description
airflow-provider-duckdb
A DuckDB provider for Airflow. This provider exposes a hook/connection that returns a DuckDB connection.
Installation
pip install apache-airflow-providers-duckdb
Connection
The connection type is duckdb
. It supports setting the following parameters:
file
(optional): The path to the DuckDB database file. If not set, operations will be done in-memory.
Example connection strings:
duckdb://:memory:
duckdb:///tmp/duckdb.db
Usage
import pandas as pd
import pendulum
from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook
@dag(
schedule=None,
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
)
def duckdb_transform():
@task
def create_df() -> pd.DataFrame:
"""
Create a dataframe with some sample data
"""
df = pd.DataFrame(
{
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [7, 8, 9],
}
)
return df
@task
def simple_select(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to select a subset of the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# execute a simple query
res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()
return res
@task
def add_col(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to add a column to the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# add a column
conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")
# get the table
return conn.execute("SELECT * FROM tb").df()
@task
def aggregate(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to aggregate the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# aggregate
return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()
create_df_res = create_df()
simple_select_res = simple_select(create_df_res)
add_col_res = add_col(simple_select_res)
aggregate_res = aggregate(add_col_res)
duckdb_transform()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Close
Hashes for airflow-provider-duckdb-0.0.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 99f13d94b39b7c38404087c30099d2c15bf59dfe03c57fc318182e7290d9a25a |
|
MD5 | 7b13886bed3855a193aa994fd1d78926 |
|
BLAKE2b-256 | 4588b0256da303d2d239affae48607d31d67cef96aa517845cb3f271c75b5735 |
Close
Hashes for airflow_provider_duckdb-0.0.1-py3.10.egg
Algorithm | Hash digest | |
---|---|---|
SHA256 | b05f47f677209d4e22972fcb05d855d915c41b3fb11a833c555e7031e51cb7b5 |
|
MD5 | 89ddbf28ad92ae7b89b3c00f41a52433 |
|
BLAKE2b-256 | 3bdf46118f7d0c7c146254b93d0d9843a370ec939ac437dee94d15ec4886d400 |
Close
Hashes for airflow_provider_duckdb-0.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 81ad88963ba4e9a9c9cb63bfabcc80dc83c51494a0dd35127ef32c911ab9bf1c |
|
MD5 | 8af085d479d478126ac3b5c891bb6b73 |
|
BLAKE2b-256 | 6cfd130d9a2cd7fa326e206ca1115aa948e3686f8d706f7f1c6a02aea3e9ee72 |