Skip to main content

Database trigger and input/output bindings for Azure Functions Python v2, powered by SQLAlchemy

Project description

Azure Functions DB

PyPI Python Version CI Release codecov pre-commit Docs License: MIT

Database integration for Azure Functions Python v2 — trigger, input/output bindings, and change detection for any database with a SQLAlchemy dialect.


Part of the Azure Functions Python DX Toolkit → Bring FastAPI-like developer experience to Azure Functions

Why this exists

Azure Functions Python v2 has no built-in database integration story:

  • No DB trigger — unlike Cosmos DB, there is no native trigger for relational databases
  • No input/output bindings — no declarative way to read or write DB rows from a function
  • Driver confusion — each database requires different drivers, connection strings, and setup
  • No change detection — polling, CDC, or outbox patterns must be built from scratch every time

What it does

  • Pseudo DB trigger — poll-based change detection with checkpoint, lease, and at-least-once delivery
  • Any SQLAlchemy database — PostgreSQL, MySQL, SQL Server out of the box; Oracle, CockroachDB, DuckDB, and any other dialect with one extra pip install
  • Single pip install — one package with optional extras for each database driver
  • Data injectioninput injects query results directly; output auto-writes return values
  • Client injectioninject_reader/inject_writer for imperative control when needed

Choose your integration path

Path When to use What to do
Built-in extras PostgreSQL, MySQL, or SQL Server pip install azure-functions-db[postgres] and go
Bring your own SQLAlchemy database Oracle, CockroachDB, DuckDB, or any other RDBMS with a SQLAlchemy dialect Install the driver, use the SQLAlchemy connection URL
Custom trigger source (triggers only) Non-SQL sources (MongoDB, Kafka, REST APIs) Implement the SourceAdapter Protocol for db.trigger()

Bring your own database

The bindings and SqlAlchemySource are designed to work with any database that has a SQLAlchemy dialect. The built-in extras just bundle common drivers for convenience.

Three steps:

  1. Install the driver — e.g. pip install oracledb for Oracle
  2. Use the SQLAlchemy URL — e.g. url="oracle+oracledb://user:pass@host/db"
  3. Pass engine options if needed — use engine_kwargs for driver-specific settings
from azure_functions_db import DbBindings

db = DbBindings()

@db.input("rows", url="oracle+oracledb://user:pass@host:1521/mydb",
          query="SELECT * FROM orders WHERE status = :status",
          params={"status": "pending"})
def read_oracle_orders(rows: list[dict]) -> None:
    for row in rows:
        print(row)

The same applies to triggers — SqlAlchemySource accepts any SQLAlchemy URL:

from azure_functions_db import SqlAlchemySource

source = SqlAlchemySource(
    url="oracle+oracledb://user:pass@host:1521/mydb",
    table="orders",
    cursor_column="updated_at",
    pk_columns=["id"],
)

Note: The built-in extras (PostgreSQL, MySQL, SQL Server) are the tested path. Other dialects work through SQLAlchemy compatibility but are not explicitly tested by this project. Exact connection URL syntax varies by driver — check your driver's documentation.

See examples/byod_oracle/ for a complete runnable Function App and examples/usage_byod.py for a minimal standalone script.

Custom trigger source

If your data source has no SQLAlchemy dialect, implement the SourceAdapter protocol and pass it directly to db.trigger(source=...). This applies only to the trigger feature. See the Adapter SDK for the full contract.

Shared Core

azure-functions-db now exposes shared infrastructure for upcoming bindings. Use DbConfig for normalized connection settings and EngineProvider when multiple components should reuse the same lazily created SQLAlchemy engine.

Installation

# Core package (pick your database)
pip install azure-functions-db[postgres]
pip install azure-functions-db[mysql]
pip install azure-functions-db[mssql]

# Multiple databases
pip install azure-functions-db[postgres,mysql]

# All drivers
pip install azure-functions-db[all]

Your Function App dependencies should include:

azure-functions
azure-functions-db[postgres]

Quick Start

Which decorator to use?

Need Decorator Mode
Read data into handler input Declarative (data injection)
Write data to DB output Declarative (data injection)
Complex reads (multiple queries) inject_reader Imperative (client injection)
Complex writes (transactions) inject_writer Imperative (client injection)
React to DB changes trigger Event-driven (pseudo-trigger)

Input Binding (data injection)

input injects the actual query result into your handler — no client needed.

Row lookup mode — fetch a single row by primary key:

from azure_functions_db import DbBindings

db = DbBindings()

# Static primary key
@db.input("user", url="%DB_URL%", table="users", pk={"id": 42})
def load_user(user: dict | None) -> None:
    if user:
        print(user["name"])

# Dynamic primary key — resolved from handler kwargs
@db.input("user", url="%DB_URL%", table="users",
             pk=lambda req: {"id": req.params["id"]})
def get_user(req, user: dict | None) -> None:
    print(user)

Query mode — fetch multiple rows with SQL:

# Multiple rows by SQL query
@db.input("users", url="%DB_URL%",
             query="SELECT * FROM users WHERE active = :active",
             params={"active": True})
def list_active_users(users: list[dict]) -> None:
    for user in users:
        print(user["email"])

Output Binding (data injection)

output injects a DbOut instance into your handler — call .set() to write explicitly.

from azure_functions_db import DbBindings, DbOut

db = DbBindings()

# Insert — call .set() with a dict for single row, list[dict] for batch
@db.output("out", url="%DB_URL%", table="orders")
def create_order(out: DbOut) -> str:
    out.set({"id": 1, "status": "pending", "total": 99.99})
    return "Created"

# Upsert — set action and conflict_columns
@db.output("out", url="%DB_URL%", table="orders",
              action="upsert", conflict_columns=["id"])
def upsert_orders(out: DbOut) -> str:
    out.set([
        {"id": 1, "status": "shipped", "total": 99.99},
        {"id": 2, "status": "pending", "total": 49.99},
    ])
    return "Upserted"

The handler's return value is independent of the write — use it for HTTP responses or anything else:

import azure.functions as func
from azure_functions_db import DbBindings, DbOut

db = DbBindings()

@db.output("out", url="%DB_URL%", table="orders")
def create_order(req: func.HttpRequest, out: DbOut) -> func.HttpResponse:
    out.set({"id": 1, "status": "pending"})
    return func.HttpResponse("Created", status_code=201)

Supported upsert dialects: PostgreSQL, SQLite, MySQL.

Client Injection (imperative escape hatches)

For complex operations (multiple queries, transactions, update/delete), use inject_reader/inject_writer to get a client instance:

from azure_functions_db import DbBindings, DbReader, DbWriter

db = DbBindings()

@db.inject_reader("reader", url="%DB_URL%", table="users")
def complex_read(reader: DbReader) -> None:
    user = reader.get(pk={"id": 42})
    orders = reader.query("SELECT * FROM orders WHERE user_id = :uid", params={"uid": 42})

@db.inject_writer("writer", url="%DB_URL%", table="orders")
def complex_write(writer: DbWriter) -> None:
    writer.insert(data={"id": 1, "status": "pending"})
    writer.update(data={"status": "shipped"}, pk={"id": 1})
    writer.delete(pk={"id": 1})

Trigger (change detection)

import azure.functions as func
from azure.storage.blob import ContainerClient
from azure_functions_db import BlobCheckpointStore, DbBindings, RowChange, SqlAlchemySource

app = func.FunctionApp()
db = DbBindings()

source = SqlAlchemySource(
    url="%ORDERS_DB_URL%",
    table="orders",
    schema="public",
    cursor_column="updated_at",
    pk_columns=["id"],
)

checkpoint_store = BlobCheckpointStore(
    container_client=ContainerClient.from_connection_string(
        conn_str="%AzureWebJobsStorage%",
        container_name="db-state",
    ),
    source_fingerprint=source.source_descriptor.fingerprint,
)

@app.function_name(name="orders_poll")
@app.schedule(schedule="0 */1 * * * *", arg_name="timer", use_monitor=True)
@db.trigger(arg_name="events", source=source, checkpoint_store=checkpoint_store)
def orders_poll(timer: func.TimerRequest, events: list[RowChange]) -> None:
    for event in events:
        print(f"Order {event.pk}: {event.op}")

This is a pseudo-trigger — it requires an actual Azure Functions trigger (e.g. timer) to fire.

See Python API Spec for the full API reference.

Combined: Trigger + Binding

Process database changes and write results to another table. Uses EngineProvider for shared connection pooling.

import azure.functions as func
from azure.storage.blob import ContainerClient

from azure_functions_db import (
    BlobCheckpointStore,
    DbBindings,
    DbOut,
    EngineProvider,
    RowChange,
    SqlAlchemySource,
)

app = func.FunctionApp()
db = DbBindings()

engine_provider = EngineProvider()

source = SqlAlchemySource(
    url="%SOURCE_DB_URL%",
    table="orders",
    cursor_column="updated_at",
    pk_columns=["id"],
    engine_provider=engine_provider,
)

checkpoint_store = BlobCheckpointStore(
    container_client=ContainerClient.from_connection_string(
        conn_str="%AzureWebJobsStorage%",
        container_name="db-state",
    ),
    source_fingerprint=source.source_descriptor.fingerprint,
)

@app.function_name(name="orders_poll")
@app.schedule(schedule="0 */1 * * * *", arg_name="timer", use_monitor=True)
@db.trigger(arg_name="events", source=source, checkpoint_store=checkpoint_store)
@db.output(
    "out",
    url="%DEST_DB_URL%",
    table="processed_orders",
    action="upsert",
    conflict_columns=["order_id"],
    engine_provider=engine_provider,
)
def orders_poll(timer: func.TimerRequest, events: list[RowChange], out: DbOut) -> None:
    out.set([
        {
            "order_id": event.pk["id"],
            "customer": event.after["name"],
            "processed_at": str(event.cursor),
        }
        for event in events
        if event.after is not None
    ])

See examples/trigger_with_binding/ for a complete runnable sample.

Built-in Extras

These databases have pre-packaged driver dependencies. Install the matching extra and you're ready to go.

Database Extra Driver
PostgreSQL azure-functions-db[postgres] psycopg
MySQL azure-functions-db[mysql] PyMySQL
SQL Server azure-functions-db[mssql] pyodbc

Any other database with a SQLAlchemy dialect works too — just install the driver yourself. See Choose your integration path.

Scope

  • Azure Functions Python v2 programming model
  • Timer-triggered functions for poll-based change detection
  • SQLAlchemy 2.0+ for database abstraction
  • Checkpoint storage via Azure Blob Storage
  • Read/write bindings via HTTP/Queue/Event triggers

This package does not implement a native Azure Functions trigger extension. It uses a poll-based approach on top of the existing timer trigger.

Observability

azure-functions-db exposes structured log helpers plus a lightweight MetricsCollector protocol so you can connect your own metrics backend without adding hard dependencies.

from collections.abc import Mapping

from azure_functions_db import MetricsCollector, PollTrigger


class PrintMetricsCollector:
    def increment(
        self, name: str, value: float = 1, *, labels: Mapping[str, str] | None = None
    ) -> None:
        print("increment", name, value, labels)

    def observe(
        self, name: str, value: float, *, labels: Mapping[str, str] | None = None
    ) -> None:
        print("observe", name, value, labels)

    def set_gauge(
        self, name: str, value: float, *, labels: Mapping[str, str] | None = None
    ) -> None:
        print("gauge", name, value, labels)


trigger = PollTrigger(
    name="orders",
    source=source,
    checkpoint_store=checkpoint_store,
    metrics=PrintMetricsCollector(),
)

Key Design Decisions

  • Pseudo trigger — timer-based polling instead of native C# extension (ADR-001)
  • SQLAlchemy-centric — single ORM layer for all databases (ADR-002)
  • Blob checkpoint — Azure Blob Storage for checkpoint persistence (ADR-003)
  • At-least-once — default delivery guarantee with idempotency support (ADR-004)
  • Unified package — trigger + binding in one package (ADR-005)

Duplicate Handling

This package provides at-least-once delivery. Duplicates may occur during process crashes, lease transitions, or commit failures. Handlers must be idempotent. See Semantics — Duplicate Windows for details.

Documentation

Ecosystem

Part of the Azure Functions Python DX Toolkit:

Package Role
azure-functions-openapi OpenAPI spec generation and Swagger UI
azure-functions-validation Request/response validation and serialization
azure-functions-db Database bindings for SQL, PostgreSQL, MySQL, SQLite, and Cosmos DB
azure-functions-langgraph LangGraph deployment adapter for Azure Functions
azure-functions-scaffold Project scaffolding CLI
azure-functions-logging Structured logging and observability
azure-functions-doctor Pre-deploy diagnostic CLI
azure-functions-durable-graph Manifest-first graph runtime with Durable Functions (experimental)
azure-functions-python-cookbook Recipes and examples

Disclaimer

This project is an independent community project and is not affiliated with, endorsed by, or maintained by Microsoft.

Azure and Azure Functions are trademarks of Microsoft Corporation.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azure_functions_db-0.2.1.tar.gz (159.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

azure_functions_db-0.2.1-py3-none-any.whl (45.1 kB view details)

Uploaded Python 3

File details

Details for the file azure_functions_db-0.2.1.tar.gz.

File metadata

  • Download URL: azure_functions_db-0.2.1.tar.gz
  • Upload date:
  • Size: 159.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for azure_functions_db-0.2.1.tar.gz
Algorithm Hash digest
SHA256 d80942f4a87286b4e004a8c773032d762820cdd6ac6a6675c89ed94988d26772
MD5 9962cad69a2a2dbcee014195b9c15c4c
BLAKE2b-256 e44b7fe91533c1a5f575ed9668091c0c98168f748000f7663f9ab49394c79d71

See more details on using hashes here.

Provenance

The following attestation bundles were made for azure_functions_db-0.2.1.tar.gz:

Publisher: publish-pypi.yml on yeongseon/azure-functions-db

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file azure_functions_db-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for azure_functions_db-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d815fdd4c966824714254ca017afd58565ec1c9feebe8e83c7c2f48cb39292ab
MD5 b8ed82e5bb6f027f9882b6ee10e33bea
BLAKE2b-256 2484436e0cd06484e20f07f1b14a656a3405a82a74f92deac8cd50d3cb4696bb

See more details on using hashes here.

Provenance

The following attestation bundles were made for azure_functions_db-0.2.1-py3-none-any.whl:

Publisher: publish-pypi.yml on yeongseon/azure-functions-db

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page