Skip to main content

A SQLAlchemy dialect for Cloudflare's D1 Serverless SQLite Database

Project description

SQLAlchemy      Cloudflare D1

SQLAlchemy Cloudflare D1 Dialect

A SQLAlchemy dialect for Cloudflare's D1 Serverless SQLite Database supporting both the REST API and Python Workers.

Features

  • Full SQLAlchemy ORM and Core support
  • Sync and async engines via D1 REST API (create_engine and create_async_engine)
  • Python Workers support with direct D1 binding (create_engine_from_binding)
  • SQLite/D1 compatible SQL compilation
  • Prepared statement support with parameter binding
  • pandas DataFrame.to_sql() with upsert support
  • JSON column filtering with json_each()
  • Type mapping for D1/SQLite data types

Installation

pip install sqlalchemy-cloudflare-d1

For async SQLAlchemy engine support (create_async_engine):

pip install sqlalchemy-cloudflare-d1[async]

Why two install options? SQLAlchemy's async engine requires greenlet, a C extension that doesn't work in Cloudflare Workers (Pyodide). The base install works everywhere, including Workers. The [async] extra adds greenlet for server-side async engine usage.

Or install from source:

git clone https://github.com/collierking/sqlalchemy-cloudflare-d1.git
cd sqlalchemy-cloudflare-d1
pip install -e ".[async]"

Prerequisites

  1. A Cloudflare account with D1 enabled
  2. A D1 database created via the Cloudflare dashboard or CLI
  3. A Cloudflare API token with D1 permissions

Creating a D1 Database

Using the Cloudflare CLI:

wrangler d1 create my-database

Or via the Cloudflare dashboard.

Creating an API Token

  1. Go to Cloudflare API Tokens
  2. Click "Create Token"
  3. Use the "Custom token" template
  4. Add permissions: Account:D1:Edit
  5. Add your account in "Account Resources"
  6. Click "Continue to summary" and "Create Token"

Usage

Connection String Format

from sqlalchemy import create_engine

# Format: cloudflare_d1://account_id:api_token@database_id
engine = create_engine(
    "cloudflare_d1://your_account_id:your_api_token@your_database_id"
)

Basic Example

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Create engine
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id"
)

# Create base and define model
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# Create tables
Base.metadata.create_all(engine)

# Create session and add data
Session = sessionmaker(bind=engine)
session = Session()

# Add a user
user = User(name="Alice", email="alice@example.com")
session.add(user)
session.commit()

# Query users
users = session.query(User).all()
for user in users:
    print(f"{user.name}: {user.email}")

session.close()

Core API Example

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('email', String(100))
)

# Create table
metadata.create_all(engine)

with engine.connect() as conn:
    # Insert data
    conn.execute(users.insert().values(name="Bob", email="bob@example.com"))

    # Query data
    result = conn.execute(select(users))
    for row in result:
        print(row)

Raw SQL Example

from sqlalchemy import create_engine, text

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

with engine.connect() as conn:
    # Execute raw SQL
    result = conn.execute(text("SELECT * FROM sqlite_master WHERE type='table'"))

    for row in result:
        print(row)

Async Engine Example

For async applications, use create_async_engine (requires pip install sqlalchemy-cloudflare-d1[async]):

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import select, Column, Integer, String, MetaData, Table

# Note the +async suffix in the URL
engine = create_async_engine(
    "cloudflare_d1+async://account_id:api_token@database_id"
)

metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50))
)

async def main():
    # Create tables
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

    # Query data
    async with engine.connect() as conn:
        result = await conn.execute(select(users))
        rows = result.fetchall()
        for row in rows:
            print(row)

    await engine.dispose()

Async Connection (Without SQLAlchemy)

For direct async access without SQLAlchemy overhead:

from sqlalchemy_cloudflare_d1 import AsyncConnection

async with AsyncConnection(
    account_id="your_account_id",
    database_id="your_database_id",
    api_token="your_api_token",
) as conn:
    cursor = await conn.cursor()
    await cursor.execute("SELECT * FROM users WHERE name = ?", ("Alice",))
    rows = await cursor.fetchall()
    print(rows)

Python Workers Example

Inside Cloudflare Python Workers, use create_engine_from_binding() for direct D1 binding access (no REST API calls):

from workers import WorkerEntrypoint
from sqlalchemy import MetaData, Table, select
from sqlalchemy_cloudflare_d1 import create_engine_from_binding

class MyWorker(WorkerEntrypoint):
    async def fetch(self, request):
        # Create engine from D1 binding (defined in wrangler.toml)
        engine = create_engine_from_binding(self.env.DB)

        # Use SQLAlchemy Core as normal
        metadata = MetaData()
        users = Table('users', metadata, autoload_with=engine)

        with engine.connect() as conn:
            result = conn.execute(select(users).limit(10))
            rows = result.fetchall()

        return Response.json({"users": [dict(row) for row in rows]})

For raw cursor access in Workers:

from sqlalchemy_cloudflare_d1 import WorkerConnection

class MyWorker(WorkerEntrypoint):
    async def fetch(self, request):
        conn = WorkerConnection(self.env.DB)
        cursor = conn.cursor()
        await cursor.execute_async("SELECT * FROM users")
        rows = cursor.fetchall()
        conn.close()
        return Response.json({"users": rows})

pandas DataFrame.to_sql()

Insert DataFrames with conflict handling:

import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.dialects.sqlite import insert as sqlite_insert

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie"],
    "score": [85, 92, 78]
})

# Basic insert
df.to_sql("scores", con=engine, if_exists="append", index=False)

# Upsert with OR REPLACE (updates existing rows on conflict)
def sqlite_upsert(table, conn, keys, data_iter):
    from sqlalchemy.dialects.sqlite import insert as sqlite_insert
    sa_table = getattr(table, "table", table)
    rows = [dict(zip(keys, row)) for row in data_iter]
    if rows:
        stmt = sqlite_insert(sa_table).values(rows).prefix_with("OR REPLACE")
        conn.execute(stmt)

df.to_sql("scores", con=engine, if_exists="append", index=False, method=sqlite_upsert)

JSON Column Filtering

Query JSON arrays stored in TEXT columns using SQLite's json_each():

import json
from sqlalchemy import MetaData, Table, Column, Integer, String, select, exists, func

metadata = MetaData()
posts = Table('posts', metadata,
    Column('id', Integer, primary_key=True),
    Column('title', String(100)),
    Column('tags', String),  # JSON array stored as TEXT: '["python", "sql"]'
)

with engine.connect() as conn:
    # Find posts tagged with "python"
    je = func.json_each(posts.c.tags).table_valued("value").alias("je")
    stmt = (
        select(posts.c.title)
        .where(exists(
            select(1).select_from(je).where(je.c.value == "python")
        ))
    )
    result = conn.execute(stmt)
    for row in result:
        print(row.title)

Configuration

Connection Parameters

You can pass additional parameters via the connection string or engine creation:

from sqlalchemy import create_engine

# Via connection string query parameters
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id?timeout=60"
)

# Via connect_args
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id",
    connect_args={
        "timeout": 60,
    }
)

Environment Variables

You can also use environment variables:

import os
from sqlalchemy import create_engine

engine = create_engine(
    f"cloudflare_d1://{os.getenv('CF_ACCOUNT_ID')}:"
    f"{os.getenv('CF_API_TOKEN')}@{os.getenv('CF_DATABASE_ID')}"
)

Limitations

This dialect has some limitations due to D1's REST API nature:

  1. No transactions: D1 REST API doesn't support explicit transactions. Each query is auto-committed.
  2. No isolation levels: Connection isolation levels are not supported.
  3. Limited concurrency: Connections are HTTP-based, not persistent database connections.
  4. No stored procedures: D1 doesn't support stored procedures or custom functions.
  5. Rate limiting: Subject to Cloudflare API rate limits.

Known Issues Fixed

  • Empty result sets (v0.3.0+): cursor.description is now correctly populated even when queries return zero rows, fixing NoSuchColumnError exceptions in SQLAlchemy.

Type Mapping

SQLAlchemy Type D1/SQLite Type Notes
Integer INTEGER
String(n) VARCHAR(n)
Text TEXT
Float REAL
Numeric NUMERIC
Boolean INTEGER Stored as 0/1, auto-converted via D1Boolean
DateTime TEXT ISO 8601 string, auto-converted via D1DateTime
Date TEXT ISO 8601 string, auto-converted via D1Date
Time TEXT ISO 8601 string, auto-converted via D1Time
LargeBinary BLOB Base64-encoded, auto-converted via D1LargeBinary

Error Handling

The dialect will raise appropriate SQLAlchemy exceptions:

from sqlalchemy.exc import SQLAlchemyError, OperationalError

try:
    result = conn.execute("SELECT * FROM nonexistent_table")
except OperationalError as e:
    print(f"Database error: {e}")
except SQLAlchemyError as e:
    print(f"SQLAlchemy error: {e}")

Development

For detailed development instructions, see .github/DEVELOPMENT.md.

Quick Start

git clone https://github.com/collierking/sqlalchemy-cloudflare-d1.git
cd sqlalchemy-cloudflare-d1

# Install dependencies and setup pre-commit hooks
make install
make setup_hooks

# Run tests and linting
make check

# Build package
make build

Development Tools

  • Ruff: Fast Python linter and formatter
  • mypy: Static type checking
  • codespell: Spell checking
  • pre-commit: Automated pre-commit checks
  • pytest: Testing framework with socket control

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Related Projects

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlalchemy_cloudflare_d1-0.3.8.tar.gz (412.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlalchemy_cloudflare_d1-0.3.8-py3-none-any.whl (27.3 kB view details)

Uploaded Python 3

File details

Details for the file sqlalchemy_cloudflare_d1-0.3.8.tar.gz.

File metadata

  • Download URL: sqlalchemy_cloudflare_d1-0.3.8.tar.gz
  • Upload date:
  • Size: 412.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for sqlalchemy_cloudflare_d1-0.3.8.tar.gz
Algorithm Hash digest
SHA256 e72f7f3d40b6dd231277a9a59947fca972848b36bab5cab11b4c0de250ebeede
MD5 84d48535671e3ff27ac77b76753ff13c
BLAKE2b-256 16fafd1f2ea0231e37902d3aca1fc50d3c696d0a79190cec6a16db356808b7f7

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_cloudflare_d1-0.3.8.tar.gz:

Publisher: release.yml on CollierKing/sqlalchemy-cloudflare-d1

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqlalchemy_cloudflare_d1-0.3.8-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlalchemy_cloudflare_d1-0.3.8-py3-none-any.whl
Algorithm Hash digest
SHA256 2a12d45101af000d1d2c0bded317bad5636b7ad75ab70ffcb8e37b85d925070d
MD5 1fbc5495fcdb8a98ab59c95ba8756c3d
BLAKE2b-256 9f4fdf2c0cab07be7f2d74f5ca1d60814ab3eb4d47cb34eb9f15793567f6466b

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_cloudflare_d1-0.3.8-py3-none-any.whl:

Publisher: release.yml on CollierKing/sqlalchemy-cloudflare-d1

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page