Skip to main content

A SQLAlchemy dialect for Cloudflare's D1 Serverless SQLite Database

Project description

SQLAlchemy      Cloudflare D1

SQLAlchemy Cloudflare D1 Dialect

A SQLAlchemy dialect for Cloudflare's D1 Serverless SQLite Database supporting both the REST API and Python Workers.

Features

  • Full SQLAlchemy ORM and Core support
  • Sync and async engines via D1 REST API (create_engine and create_async_engine)
  • Python Workers support with direct D1 binding (create_engine_from_binding)
  • SQLite/D1 compatible SQL compilation
  • Prepared statement support with parameter binding
  • pandas DataFrame.to_sql() with upsert support
  • JSON column filtering with json_each()
  • Type mapping for D1/SQLite data types

Installation

pip install sqlalchemy-cloudflare-d1

For async SQLAlchemy engine support (create_async_engine):

pip install sqlalchemy-cloudflare-d1[async]

Why two install options? SQLAlchemy's async engine requires greenlet, a C extension that doesn't work in Cloudflare Workers (Pyodide). The base install works everywhere, including Workers. The [async] extra adds greenlet for server-side async engine usage.

Or install from source:

git clone https://github.com/collierking/sqlalchemy-cloudflare-d1.git
cd sqlalchemy-cloudflare-d1
pip install -e ".[async]"

Prerequisites

  1. A Cloudflare account with D1 enabled
  2. A D1 database created via the Cloudflare dashboard or CLI
  3. A Cloudflare API token with D1 permissions

Creating a D1 Database

Using the Cloudflare CLI:

wrangler d1 create my-database

Or via the Cloudflare dashboard.

Creating an API Token

  1. Go to Cloudflare API Tokens
  2. Click "Create Token"
  3. Use the "Custom token" template
  4. Add permissions: Account:D1:Edit
  5. Add your account in "Account Resources"
  6. Click "Continue to summary" and "Create Token"

Usage

Connection String Format

from sqlalchemy import create_engine

# Format: cloudflare_d1://account_id:api_token@database_id
engine = create_engine(
    "cloudflare_d1://your_account_id:your_api_token@your_database_id"
)

Basic Example

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Create engine
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id"
)

# Create base and define model
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# Create tables
Base.metadata.create_all(engine)

# Create session and add data
Session = sessionmaker(bind=engine)
session = Session()

# Add a user
user = User(name="Alice", email="alice@example.com")
session.add(user)
session.commit()

# Query users
users = session.query(User).all()
for user in users:
    print(f"{user.name}: {user.email}")

session.close()

Core API Example

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('email', String(100))
)

# Create table
metadata.create_all(engine)

with engine.connect() as conn:
    # Insert data
    conn.execute(users.insert().values(name="Bob", email="bob@example.com"))

    # Query data
    result = conn.execute(select(users))
    for row in result:
        print(row)

Raw SQL Example

from sqlalchemy import create_engine, text

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

with engine.connect() as conn:
    # Execute raw SQL
    result = conn.execute(text("SELECT * FROM sqlite_master WHERE type='table'"))

    for row in result:
        print(row)

Async Engine Example

For async applications, use create_async_engine (requires pip install sqlalchemy-cloudflare-d1[async]):

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import select, Column, Integer, String, MetaData, Table

# Note the +async suffix in the URL
engine = create_async_engine(
    "cloudflare_d1+async://account_id:api_token@database_id"
)

metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50))
)

async def main():
    # Create tables
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

    # Query data
    async with engine.connect() as conn:
        result = await conn.execute(select(users))
        rows = result.fetchall()
        for row in rows:
            print(row)

    await engine.dispose()

Async Connection (Without SQLAlchemy)

For direct async access without SQLAlchemy overhead:

from sqlalchemy_cloudflare_d1 import AsyncConnection

async with AsyncConnection(
    account_id="your_account_id",
    database_id="your_database_id",
    api_token="your_api_token",
) as conn:
    cursor = await conn.cursor()
    await cursor.execute("SELECT * FROM users WHERE name = ?", ("Alice",))
    rows = await cursor.fetchall()
    print(rows)

Python Workers Example

Inside Cloudflare Python Workers, use create_engine_from_binding() for direct D1 binding access (no REST API calls):

from workers import WorkerEntrypoint
from sqlalchemy import MetaData, Table, select
from sqlalchemy_cloudflare_d1 import create_engine_from_binding

class MyWorker(WorkerEntrypoint):
    async def fetch(self, request):
        # Create engine from D1 binding (defined in wrangler.toml)
        engine = create_engine_from_binding(self.env.DB)

        # Use SQLAlchemy Core as normal
        metadata = MetaData()
        users = Table('users', metadata, autoload_with=engine)

        with engine.connect() as conn:
            result = conn.execute(select(users).limit(10))
            rows = result.fetchall()

        return Response.json({"users": [dict(row) for row in rows]})

For raw cursor access in Workers:

from sqlalchemy_cloudflare_d1 import WorkerConnection

class MyWorker(WorkerEntrypoint):
    async def fetch(self, request):
        conn = WorkerConnection(self.env.DB)
        cursor = conn.cursor()
        await cursor.execute_async("SELECT * FROM users")
        rows = cursor.fetchall()
        conn.close()
        return Response.json({"users": rows})

pandas DataFrame.to_sql()

Insert DataFrames with conflict handling:

import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.dialects.sqlite import insert as sqlite_insert

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie"],
    "score": [85, 92, 78]
})

# Basic insert
df.to_sql("scores", con=engine, if_exists="append", index=False)

# Upsert with OR REPLACE (updates existing rows on conflict)
def sqlite_upsert(table, conn, keys, data_iter):
    from sqlalchemy.dialects.sqlite import insert as sqlite_insert
    sa_table = getattr(table, "table", table)
    rows = [dict(zip(keys, row)) for row in data_iter]
    if rows:
        stmt = sqlite_insert(sa_table).values(rows).prefix_with("OR REPLACE")
        conn.execute(stmt)

df.to_sql("scores", con=engine, if_exists="append", index=False, method=sqlite_upsert)

JSON Column Filtering

Query JSON arrays stored in TEXT columns using SQLite's json_each():

import json
from sqlalchemy import MetaData, Table, Column, Integer, String, select, exists, func

metadata = MetaData()
posts = Table('posts', metadata,
    Column('id', Integer, primary_key=True),
    Column('title', String(100)),
    Column('tags', String),  # JSON array stored as TEXT: '["python", "sql"]'
)

with engine.connect() as conn:
    # Find posts tagged with "python"
    je = func.json_each(posts.c.tags).table_valued("value").alias("je")
    stmt = (
        select(posts.c.title)
        .where(exists(
            select(1).select_from(je).where(je.c.value == "python")
        ))
    )
    result = conn.execute(stmt)
    for row in result:
        print(row.title)

Configuration

Connection Parameters

You can pass additional parameters via the connection string or engine creation:

from sqlalchemy import create_engine

# Via connection string query parameters
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id?timeout=60"
)

# Via connect_args
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id",
    connect_args={
        "timeout": 60,
    }
)

Environment Variables

You can also use environment variables:

import os
from sqlalchemy import create_engine

engine = create_engine(
    f"cloudflare_d1://{os.getenv('CF_ACCOUNT_ID')}:"
    f"{os.getenv('CF_API_TOKEN')}@{os.getenv('CF_DATABASE_ID')}"
)

Limitations

This dialect has some limitations due to D1's REST API nature:

  1. No transactions: D1 REST API doesn't support explicit transactions. Each query is auto-committed.
  2. No isolation levels: Connection isolation levels are not supported.
  3. Limited concurrency: Connections are HTTP-based, not persistent database connections.
  4. No stored procedures: D1 doesn't support stored procedures or custom functions.
  5. Rate limiting: Subject to Cloudflare API rate limits.

Known Issues Fixed

  • Empty result sets (v0.3.0+): cursor.description is now correctly populated even when queries return zero rows, fixing NoSuchColumnError exceptions in SQLAlchemy.

Type Mapping

SQLAlchemy Type D1/SQLite Type Notes
Integer INTEGER
String(n) VARCHAR(n)
Text TEXT
Float REAL
Numeric NUMERIC
Boolean INTEGER Stored as 0/1
DateTime TEXT ISO format string
Date TEXT ISO format string
Time TEXT ISO format string

Error Handling

The dialect will raise appropriate SQLAlchemy exceptions:

from sqlalchemy.exc import SQLAlchemyError, OperationalError

try:
    result = conn.execute("SELECT * FROM nonexistent_table")
except OperationalError as e:
    print(f"Database error: {e}")
except SQLAlchemyError as e:
    print(f"SQLAlchemy error: {e}")

Development

For detailed development instructions, see .github/DEVELOPMENT.md.

Quick Start

git clone https://github.com/collierking/sqlalchemy-cloudflare-d1.git
cd sqlalchemy-cloudflare-d1

# Install dependencies and setup pre-commit hooks
make install
make setup_hooks

# Run tests and linting
make check

# Build package
make build

Development Tools

  • Ruff: Fast Python linter and formatter
  • mypy: Static type checking
  • codespell: Spell checking
  • pre-commit: Automated pre-commit checks
  • pytest: Testing framework with socket control

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Related Projects

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlalchemy_cloudflare_d1-0.3.2.tar.gz (395.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlalchemy_cloudflare_d1-0.3.2-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file sqlalchemy_cloudflare_d1-0.3.2.tar.gz.

File metadata

  • Download URL: sqlalchemy_cloudflare_d1-0.3.2.tar.gz
  • Upload date:
  • Size: 395.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for sqlalchemy_cloudflare_d1-0.3.2.tar.gz
Algorithm Hash digest
SHA256 a655be002026acb2c7b677b20565fdef736910a4f167db156384917fdb00c8d3
MD5 7679993c6a5e1b11547bb5f1d16ec0e5
BLAKE2b-256 f8816a0bedcda15727207f4a8307c29c985cf86285f303d27543739b13da34d1

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_cloudflare_d1-0.3.2.tar.gz:

Publisher: release.yml on CollierKing/sqlalchemy-cloudflare-d1

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqlalchemy_cloudflare_d1-0.3.2-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlalchemy_cloudflare_d1-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cbc72f650d913469f80c1d5fa9a3cd2e7f60effbe542ea39ff7b94d10ac0e0cb
MD5 01b2ca5ba8e7605fe13985dbcf90ae9c
BLAKE2b-256 068afc4a5462266c53ebcf3b9f3d088c0e80517633107ae872ca48447755438c

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_cloudflare_d1-0.3.2-py3-none-any.whl:

Publisher: release.yml on CollierKing/sqlalchemy-cloudflare-d1

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page