Database trigger and input/output bindings for Azure Functions Python v2, powered by SQLAlchemy
Project description
Azure Functions DB
Database integration for Azure Functions Python v2 — poll-based change detection trigger and input/output bindings using SQLAlchemy.
Part of the Azure Functions Python DX Toolkit → Bring FastAPI-like developer experience to Azure Functions
Why this exists
Azure Functions Python v2 has no built-in database integration story:
- No DB trigger — unlike Cosmos DB, there is no native trigger for relational databases
- No input/output bindings — no declarative way to read or write DB rows from a function
- Driver confusion — each database requires different drivers, connection strings, and setup
- No change detection — polling, CDC, or outbox patterns must be built from scratch every time
What it does
- Pseudo DB trigger — poll-based change detection with checkpoint, lease, and at-least-once delivery
- Multi-DB support — PostgreSQL, MySQL, and SQL Server via SQLAlchemy dialects
- Single
pip install— one package with optional extras for each database driver - Data injection —
inputinjects query results directly;outputauto-writes return values - Client injection —
inject_reader/inject_writerfor imperative control when needed
Shared Core
azure-functions-db now exposes shared infrastructure for upcoming bindings. Use DbConfig for normalized connection settings and EngineProvider when multiple components should reuse the same lazily created SQLAlchemy engine.
Installation
# Core package (pick your database)
pip install azure-functions-db[postgres]
pip install azure-functions-db[mysql]
pip install azure-functions-db[mssql]
# Multiple databases
pip install azure-functions-db[postgres,mysql]
# All drivers
pip install azure-functions-db[all]
Your Function App dependencies should include:
azure-functions
azure-functions-db[postgres]
Quick Start
Which decorator to use?
| Need | Decorator | Mode |
|---|---|---|
| Read data into handler | input |
Declarative (data injection) |
| Write data to DB | output |
Declarative (data injection) |
| Complex reads (multiple queries) | inject_reader |
Imperative (client injection) |
| Complex writes (transactions) | inject_writer |
Imperative (client injection) |
| React to DB changes | trigger |
Event-driven (pseudo-trigger) |
Input Binding (data injection)
input injects the actual query result into your handler — no client needed.
Row lookup mode — fetch a single row by primary key:
from azure_functions_db import DbBindings
db = DbBindings()
# Static primary key
@db.input("user", url="%DB_URL%", table="users", pk={"id": 42})
def load_user(user: dict | None) -> None:
if user:
print(user["name"])
# Dynamic primary key — resolved from handler kwargs
@db.input("user", url="%DB_URL%", table="users",
pk=lambda req: {"id": req.params["id"]})
def get_user(req, user: dict | None) -> None:
print(user)
Query mode — fetch multiple rows with SQL:
# Multiple rows by SQL query
@db.input("users", url="%DB_URL%",
query="SELECT * FROM users WHERE active = :active",
params={"active": True})
def list_active_users(users: list[dict]) -> None:
for user in users:
print(user["email"])
Output Binding (data injection)
output injects a DbOut instance into your handler — call .set() to write explicitly.
from azure_functions_db import DbBindings, DbOut
db = DbBindings()
# Insert — call .set() with a dict for single row, list[dict] for batch
@db.output("out", url="%DB_URL%", table="orders")
def create_order(out: DbOut) -> str:
out.set({"id": 1, "status": "pending", "total": 99.99})
return "Created"
# Upsert — set action and conflict_columns
@db.output("out", url="%DB_URL%", table="orders",
action="upsert", conflict_columns=["id"])
def upsert_orders(out: DbOut) -> str:
out.set([
{"id": 1, "status": "shipped", "total": 99.99},
{"id": 2, "status": "pending", "total": 49.99},
])
return "Upserted"
The handler's return value is independent of the write — use it for HTTP responses or anything else:
import azure.functions as func
from azure_functions_db import DbBindings, DbOut
db = DbBindings()
@db.output("out", url="%DB_URL%", table="orders")
def create_order(req: func.HttpRequest, out: DbOut) -> func.HttpResponse:
out.set({"id": 1, "status": "pending"})
return func.HttpResponse("Created", status_code=201)
Supported upsert dialects: PostgreSQL, SQLite, MySQL.
Client Injection (imperative escape hatches)
For complex operations (multiple queries, transactions, update/delete), use inject_reader/inject_writer to get a client instance:
from azure_functions_db import DbBindings, DbReader, DbWriter
db = DbBindings()
@db.inject_reader("reader", url="%DB_URL%", table="users")
def complex_read(reader: DbReader) -> None:
user = reader.get(pk={"id": 42})
orders = reader.query("SELECT * FROM orders WHERE user_id = :uid", params={"uid": 42})
@db.inject_writer("writer", url="%DB_URL%", table="orders")
def complex_write(writer: DbWriter) -> None:
writer.insert(data={"id": 1, "status": "pending"})
writer.update(data={"status": "shipped"}, pk={"id": 1})
writer.delete(pk={"id": 1})
Trigger (change detection)
import azure.functions as func
from azure.storage.blob import ContainerClient
from azure_functions_db import BlobCheckpointStore, DbBindings, RowChange, SqlAlchemySource
app = func.FunctionApp()
db = DbBindings()
source = SqlAlchemySource(
url="%ORDERS_DB_URL%",
table="orders",
schema="public",
cursor_column="updated_at",
pk_columns=["id"],
)
checkpoint_store = BlobCheckpointStore(
container_client=ContainerClient.from_connection_string(
conn_str="%AzureWebJobsStorage%",
container_name="db-state",
),
source_fingerprint=source.source_descriptor.fingerprint,
)
@app.function_name(name="orders_poll")
@app.schedule(schedule="0 */1 * * * *", arg_name="timer", use_monitor=True)
@db.trigger(arg_name="events", source=source, checkpoint_store=checkpoint_store)
def orders_poll(timer: func.TimerRequest, events: list[RowChange]) -> None:
for event in events:
print(f"Order {event.pk}: {event.op}")
This is a pseudo-trigger — it requires an actual Azure Functions trigger (e.g. timer) to fire.
See Python API Spec for the full API reference.
Combined: Trigger + Binding
Process database changes and write results to another table. Uses EngineProvider for shared connection pooling.
import azure.functions as func
from azure.storage.blob import ContainerClient
from azure_functions_db import (
BlobCheckpointStore,
DbBindings,
DbOut,
EngineProvider,
RowChange,
SqlAlchemySource,
)
app = func.FunctionApp()
db = DbBindings()
engine_provider = EngineProvider()
source = SqlAlchemySource(
url="%SOURCE_DB_URL%",
table="orders",
cursor_column="updated_at",
pk_columns=["id"],
engine_provider=engine_provider,
)
checkpoint_store = BlobCheckpointStore(
container_client=ContainerClient.from_connection_string(
conn_str="%AzureWebJobsStorage%",
container_name="db-state",
),
source_fingerprint=source.source_descriptor.fingerprint,
)
@app.function_name(name="orders_poll")
@app.schedule(schedule="0 */1 * * * *", arg_name="timer", use_monitor=True)
@db.trigger(arg_name="events", source=source, checkpoint_store=checkpoint_store)
@db.output(
"out",
url="%DEST_DB_URL%",
table="processed_orders",
action="upsert",
conflict_columns=["order_id"],
engine_provider=engine_provider,
)
def orders_poll(timer: func.TimerRequest, events: list[RowChange], out: DbOut) -> None:
out.set([
{
"order_id": event.pk["id"],
"customer": event.after["name"],
"processed_at": str(event.cursor),
}
for event in events
if event.after is not None
])
See examples/trigger_with_binding/ for a complete runnable sample.
Supported Databases
| Database | Extra | Driver |
|---|---|---|
| PostgreSQL | azure-functions-db[postgres] |
psycopg |
| MySQL | azure-functions-db[mysql] |
PyMySQL |
| SQL Server | azure-functions-db[mssql] |
pyodbc |
Scope
- Azure Functions Python v2 programming model
- Timer-triggered functions for poll-based change detection
- SQLAlchemy 2.0+ for database abstraction
- Checkpoint storage via Azure Blob Storage
- Read/write bindings via HTTP/Queue/Event triggers
This package does not implement a native Azure Functions trigger extension. It uses a poll-based approach on top of the existing timer trigger.
Observability
azure-functions-db exposes structured log helpers plus a lightweight MetricsCollector protocol so you can connect your own metrics backend without adding hard dependencies.
from collections.abc import Mapping
from azure_functions_db import MetricsCollector, PollTrigger
class PrintMetricsCollector:
def increment(
self, name: str, value: float = 1, *, labels: Mapping[str, str] | None = None
) -> None:
print("increment", name, value, labels)
def observe(
self, name: str, value: float, *, labels: Mapping[str, str] | None = None
) -> None:
print("observe", name, value, labels)
def set_gauge(
self, name: str, value: float, *, labels: Mapping[str, str] | None = None
) -> None:
print("gauge", name, value, labels)
trigger = PollTrigger(
name="orders",
source=source,
checkpoint_store=checkpoint_store,
metrics=PrintMetricsCollector(),
)
Key Design Decisions
- Pseudo trigger — timer-based polling instead of native C# extension (ADR-001)
- SQLAlchemy-centric — single ORM layer for all databases (ADR-002)
- Blob checkpoint — Azure Blob Storage for checkpoint persistence (ADR-003)
- At-least-once — default delivery guarantee with idempotency support (ADR-004)
- Unified package — trigger + binding in one package (ADR-005)
Duplicate Handling
This package provides at-least-once delivery. Duplicates may occur during process crashes, lease transitions, or commit failures. Handlers must be idempotent. See Semantics — Duplicate Windows for details.
Documentation
- Full docs: yeongseon.github.io/azure-functions-db
- Examples:
examples/ - Architecture
- Semantics
- Python API Spec
- Adapter SDK
Ecosystem
Part of the Azure Functions Python DX Toolkit:
| Package | Role |
|---|---|
| azure-functions-openapi | OpenAPI spec and Swagger UI |
| azure-functions-validation | Request and response validation |
| azure-functions-db | Database trigger and bindings |
| azure-functions-logging | Structured logging and observability |
| azure-functions-doctor | Pre-deploy diagnostic CLI |
| azure-functions-scaffold | Project scaffolding |
| azure-functions-python-cookbook | Recipes and examples |
Disclaimer
This project is an independent community project and is not affiliated with, endorsed by, or maintained by Microsoft.
Azure and Azure Functions are trademarks of Microsoft Corporation.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file azure_functions_db-0.2.0.tar.gz.
File metadata
- Download URL: azure_functions_db-0.2.0.tar.gz
- Upload date:
- Size: 143.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
186bd4028b81236ca68fcd3976231805996e206ea2b314aa3ca07e44410d5d0c
|
|
| MD5 |
a2d793b1a56a93115915691249e95ef4
|
|
| BLAKE2b-256 |
21b305cddb1075b760fa9c008850e6df004038cb11e2c74f5c275d85a68efa12
|
Provenance
The following attestation bundles were made for azure_functions_db-0.2.0.tar.gz:
Publisher:
publish-pypi.yml on yeongseon/azure-functions-db
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
azure_functions_db-0.2.0.tar.gz -
Subject digest:
186bd4028b81236ca68fcd3976231805996e206ea2b314aa3ca07e44410d5d0c - Sigstore transparency entry: 1260682423
- Sigstore integration time:
-
Permalink:
yeongseon/azure-functions-db@03021e856afe6b785fa0614ea4d960fb9a619d3a -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/yeongseon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@03021e856afe6b785fa0614ea4d960fb9a619d3a -
Trigger Event:
push
-
Statement type:
File details
Details for the file azure_functions_db-0.2.0-py3-none-any.whl.
File metadata
- Download URL: azure_functions_db-0.2.0-py3-none-any.whl
- Upload date:
- Size: 44.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f7116c573db145f2e8fa451c8d6228876cd0eb0724991b98aac2d151995beec
|
|
| MD5 |
f1795fe2e5f3a24cf9e0919fcd8736a3
|
|
| BLAKE2b-256 |
008183e143926f78e41de2a20593ff1d15517eb8b000d17ca1ccaeb76c5e39b1
|
Provenance
The following attestation bundles were made for azure_functions_db-0.2.0-py3-none-any.whl:
Publisher:
publish-pypi.yml on yeongseon/azure-functions-db
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
azure_functions_db-0.2.0-py3-none-any.whl -
Subject digest:
7f7116c573db145f2e8fa451c8d6228876cd0eb0724991b98aac2d151995beec - Sigstore transparency entry: 1260682459
- Sigstore integration time:
-
Permalink:
yeongseon/azure-functions-db@03021e856afe6b785fa0614ea4d960fb9a619d3a -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/yeongseon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@03021e856afe6b785fa0614ea4d960fb9a619d3a -
Trigger Event:
push
-
Statement type: