Decorator-based framework for defining Databricks jobs and tasks as Python code.
Project description
databricks-bundle-decorators
Decorator-based framework for defining Databricks jobs and tasks as Python code. Define pipelines using @task, @job, and job_cluster() — they compile into Databricks Asset Bundle resources.
Why databricks-bundle-decorators?
Writing Databricks jobs in raw YAML is tedious and disconnects task logic from orchestration configuration. databricks-bundle-decorators lets you express both in Python:
- AirFlow TaskFlow-inspired pattern — define
@taskfunctions inside a@jobbody; dependencies are captured automatically from call arguments. - IoManager pattern — large data (DataFrames, datasets) flows through permanent storage (Delta tables, Unity Catalog volumes) - multi-hop architecture.
- Explicit task values — small scalars (
str,int,float,bool) can be passed between tasks viaset_task_value/get_task_value, like you would with Airflow XComs. - Deploy-time codegen — when you run
databricks bundle deploy, the framework imports your Python files, discovers all@job/@taskdefinitions, and generates Databricks Job configurations. The result is a databricks job, with all tasks and dependencies set up. - Runtime dispatch — when Databricks runs the job (on schedule or manually), each task executes on a cluster via the
dbxdec-runentry point, which loads upstream data through IoManagers and calls your task function.
Installation
uv add databricks-bundle-decorators
Quickstart
1. Scaffold your pipeline project
uv init my-pipeline && cd my-pipeline
uv add databricks-bundle-decorators[azure] # or [aws], [gcp], [polars]
uv run dbxdec init
dbxdec init creates:
| File | Purpose |
|---|---|
resources/__init__.py |
load_resources() entry point for databricks bundle deploy |
src/<package>/pipelines/__init__.py |
Auto-discovery module that imports all pipeline files |
src/<package>/pipelines/example.py |
Starter pipeline with @task, @job, job_cluster() |
databricks.yaml |
Databricks Asset Bundle configuration (if not present) |
pyproject.toml |
Updated with the pipeline package entry point |
2. Define your pipeline
# src/my_pipeline/pipelines/github_events.py
import polars as pl
from databricks_bundle_decorators import job, job_cluster, params, task, set_task_value
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager
staging_io = PolarsParquetIoManager(
base_path="abfss://datalake@mystorageaccount.dfs.core.windows.net/staging",
# storage_options={"account_name": "...", "account_key": "..."},
)
small_cluster = job_cluster(
name="small_cluster",
spark_version="16.4.x-scala2.12",
node_type_id="Standard_E8ds_v4",
num_workers=1,
)
@job(
tags={"source": "github", "type": "api"},
schedule="0 * * * *",
params={"url": "https://api.github.com/events"},
cluster="small_cluster",
)
def github_events():
@task(io_manager=staging_io)
def extract():
import requests
r = requests.get(params["url"])
df = pl.DataFrame(r.json())
set_task_value("row_count", len(df))
return df
@task
def transform(raw_df):
print(raw_df.head(10))
df = extract()
transform(df)
3. Deploy
databricks bundle deploy --target dev
How It Works
Deploy time (databricks bundle deploy)
When you run databricks bundle deploy, the Databricks CLI imports your Python pipeline files. This triggers the @job and @task decorators, which register your tasks and their dependencies into an internal DAG — no task code actually runs yet. The framework then generates Databricks Job definitions from this DAG and uploads them to your workspace.
The result: a Databricks Job appears in the UI with all your tasks, their dependency edges, cluster configs, and parameters fully wired up.
your_pipeline.py
@job / @task / job_cluster()
▼
Framework builds task DAG from decorator metadata
▼
codegen → Databricks Job definition
▼
databricks bundle deploy → Job created in workspace
Runtime (when the job runs on Databricks)
When the job is triggered (on schedule or manually), Databricks launches each task as a separate python_wheel_task on a cluster. For each task:
- The
dbxdec-runentry point starts. - It looks up the upstream tasks and calls
IoManager.read()to fetch their outputs. - It injects the loaded data as arguments to your task function and calls it.
- If the task has an IoManager, it calls
IoManager.write()to persist the return value for downstream tasks.
Databricks triggers job
→ launches each task as python_wheel_task
▼
dbxdec-run entry point
▼
IoManager.read() upstream data → call your task function
▼
IoManager.write() return value for downstream tasks
API Reference
@task
Registers a function as a Databricks task.
@task
def my_task():
...
@task(io_manager=my_io_manager)
def my_task_with_io():
return some_dataframe
| Parameter | Type | Description |
|---|---|---|
io_manager |
IoManager | None |
Controls how the return value is persisted and read by downstream tasks. |
**kwargs |
TaskConfig |
SDK-native Task fields (max_retries, timeout_seconds, retry_on_timeout, etc.). |
@job
Registers a function as a Databricks job.
The @job body runs once when Python imports the file (not on Databricks). Its purpose is to let the framework discover which tasks exist and how they depend on each other. Inside the body, @task functions don't execute your business logic — they return lightweight TaskProxy objects that record dependency edges. Think of the @job body as a declaration, not execution.
@job(
tags={"team": "data-eng"},
schedule="0 * * * *",
params={"url": "https://api.example.com"},
cluster="small_cluster",
)
def my_job():
@task
def extract(): ... # Not called yet — just registered
@task
def transform(data): ... # Not called yet — just registered
data = extract() # Returns a TaskProxy (not real data)
transform(data) # Records: transform depends on extract
When this file is imported during databricks bundle deploy, the framework sees the DAG: extract → transform. Your actual extract() and transform() code only runs later when Databricks executes the job.
| Parameter | Type | Description |
|---|---|---|
params |
dict[str, str] | None |
Default job-level parameters, accessible via from databricks_bundle_decorators import params. |
cluster |
str | None |
Name of a job_cluster() to use as the shared job cluster. |
**kwargs |
JobConfig |
SDK-native Job fields (tags, schedule, max_concurrent_runs, etc.). |
Task namespacing: Tasks inside a @job body are registered under qualified keys (job_name.task_name), preventing name collisions across jobs.
job_cluster()
Registers a reusable ephemeral job-cluster configuration and returns its name.
small_cluster = job_cluster(
name="gpu_cluster",
spark_version="14.0.x-gpu-ml-scala2.12",
node_type_id="Standard_NC6s_v3",
num_workers=4,
)
| Parameter | Type | Description |
|---|---|---|
name |
str |
Cluster name, referenced from @job(cluster=…). |
**kwargs |
ClusterConfig |
SDK-native ClusterSpec fields (spark_version, node_type_id, num_workers, etc.). |
IoManager
Abstract base class for inter-task data persistence. The producing task declares its IoManager; downstream tasks receive data automatically.
IoManager (attached to producer)
┌──────────┐
│ write() │ ← called after producer runs, persists return value
│ read() │ ← called before consumer runs, injects data as argument
└──────────┘
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
class MyCustomIoManager(IoManager):
def __init__(self, catalog: str, schema: str):
self.catalog = catalog
self.schema = schema
def write(self, context: OutputContext, obj) -> None:
table = f"{self.catalog}.{self.schema}.{context.task_key}"
obj.write_delta(table, mode="overwrite")
def read(self, context: InputContext):
table = f"{self.catalog}.{self.schema}.{context.upstream_task_key}"
return pl.read_delta(table)
Downstream tasks are storage-agnostic — they receive plain Python objects and don't need to know the storage backend.
Built-in IoManagers
| IoManager | Backend | Import |
|---|---|---|
PolarsParquetIoManager |
Polars Parquet on any cloud or local filesystem | from databricks_bundle_decorators.io_managers import PolarsParquetIoManager |
PolarsParquetIoManager dispatches automatically based on type:
| Return / parameter type | Write method | Read method |
|---|---|---|
pl.DataFrame |
write_parquet |
read_parquet |
pl.LazyFrame (or unannotated) |
sink_parquet |
scan_parquet |
IoManager vs Task Values
| Mechanism | Use case | Size limit |
|---|---|---|
IoManager |
DataFrames, datasets, large objects | Unlimited (external storage) |
set_task_value / get_task_value |
Row counts, status flags, small strings | < 48 KB |
Context Objects
OutputContext (passed to write()): job_name, task_key, run_id
InputContext (passed to read()): job_name, task_key, upstream_task_key, run_id, expected_type
The expected_type field contains the downstream parameter's type annotation (e.g. polars.DataFrame or polars.LazyFrame), allowing IoManagers to return the appropriate type. It is None when no annotation is present.
Task Values
For small scalar data between tasks without permanent storage:
from databricks_bundle_decorators.task_values import set_task_value, get_task_value
@task
def producer():
set_task_value("row_count", 42)
@task
def consumer():
count = get_task_value("producer", "row_count")
Maps to dbutils.jobs.taskValues at runtime, with a local dict fallback for testing.
Parameters
Job-level parameters are accessible via the global params dict:
from databricks_bundle_decorators import params
@task
def my_task():
url = params["url"]
Pipeline Discovery
Pipeline packages register via entry points:
[project.entry-points."databricks_bundle_decorators.pipelines"]
my_pipeline = "my_pipeline.pipelines"
The referenced module should import all modules containing @task/@job decorators:
# my_pipeline/pipelines/__init__.py
import importlib, pkgutil
for _loader, _name, _is_pkg in pkgutil.walk_packages(__path__):
importlib.import_module(f"{__name__}.{_name}")
CLI
uv run dbxdec init # Scaffold a pipeline project in the current directory
Packaging Model
┌──────────────────────────────┐ ┌────────────────────────┐
│ databricks-bundle-decorators│ │ my-pipeline (repo) │
│ (library, PyPI) │◄────│ │
│ │ │ pyproject.toml │
│ @task, @job, job_cluster() │ │ src/my_pipeline/ │
│ IoManager ABC │ │ pipelines/ │
│ codegen, runtime, discovery │ │ resources/__init__.py │
│ dbxdec CLI │ │ databricks.yaml │
└──────────────────────────────┘ └────────────────────────┘
The framework is a reusable library. Pipeline repos contain only business logic — upgrading is a single dependency bump.
Development
git clone https://github.com/<org>/databricks-bundle-decorators.git
cd databricks-bundle-decorators
uv sync
uv run pytest tests/ -v
Project Structure
├── pyproject.toml
├── examples/
│ ├── databricks.yaml
│ ├── resources/__init__.py
│ └── example_pipeline.py
├── src/databricks_bundle_decorators/
│ ├── __init__.py # Public API exports
│ ├── cli.py # dbxdec init command
│ ├── codegen.py # Registry → Job objects
│ ├── context.py # Global params dict
│ ├── decorators.py # @task, @job, job_cluster(), TaskProxy
│ ├── discovery.py # Entry-point pipeline discovery
│ ├── io_manager.py # IoManager ABC, OutputContext, InputContext
│ ├── registry.py # Global registries, DuplicateResourceError
│ ├── runtime.py # dbxdec-run entry point
│ ├── sdk_types.py # JobConfig, TaskConfig, ClusterConfig TypedDicts
│ └── task_values.py # set_task_value / get_task_value
└── tests/
Release
See RELEASING.md for the PyPI release process.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file databricks_bundle_decorators-0.2.1.tar.gz.
File metadata
- Download URL: databricks_bundle_decorators-0.2.1.tar.gz
- Upload date:
- Size: 20.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a31c42c1e6dd450f2b2ecb621a612587d3a08c6a0808ceda532ba7428ba716e5
|
|
| MD5 |
409147691e266f9c398241ef7cb0c62d
|
|
| BLAKE2b-256 |
50ec3e02bcdb4eed3d35f11d07ba34b61d88b52ca047e9930e3a0f9d9f9dd025
|
Provenance
The following attestation bundles were made for databricks_bundle_decorators-0.2.1.tar.gz:
Publisher:
publish.yaml on boccileonardo/databricks-bundle-decorators
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databricks_bundle_decorators-0.2.1.tar.gz -
Subject digest:
a31c42c1e6dd450f2b2ecb621a612587d3a08c6a0808ceda532ba7428ba716e5 - Sigstore transparency entry: 973068747
- Sigstore integration time:
-
Permalink:
boccileonardo/databricks-bundle-decorators@c52c107232bc95b093fa41f611ac08b8ff2b5ef9 -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/boccileonardo
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@c52c107232bc95b093fa41f611ac08b8ff2b5ef9 -
Trigger Event:
release
-
Statement type:
File details
Details for the file databricks_bundle_decorators-0.2.1-py3-none-any.whl.
File metadata
- Download URL: databricks_bundle_decorators-0.2.1-py3-none-any.whl
- Upload date:
- Size: 27.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f6e3e1c9f9fdc925707877efdd167e928a29311cbf76fd3e3376ed402bd54333
|
|
| MD5 |
725f24a4c762ffde9511f35788aa198d
|
|
| BLAKE2b-256 |
62f2cf9a979bda0f98e1c6b98a0f7f0a573b731056beb328a7646b8992578526
|
Provenance
The following attestation bundles were made for databricks_bundle_decorators-0.2.1-py3-none-any.whl:
Publisher:
publish.yaml on boccileonardo/databricks-bundle-decorators
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databricks_bundle_decorators-0.2.1-py3-none-any.whl -
Subject digest:
f6e3e1c9f9fdc925707877efdd167e928a29311cbf76fd3e3376ed402bd54333 - Sigstore transparency entry: 973068828
- Sigstore integration time:
-
Permalink:
boccileonardo/databricks-bundle-decorators@c52c107232bc95b093fa41f611ac08b8ff2b5ef9 -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/boccileonardo
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@c52c107232bc95b093fa41f611ac08b8ff2b5ef9 -
Trigger Event:
release
-
Statement type: