Skip to main content

DB-API 2.0 client and SQLAlchemy dialect for rqlite distributed SQLite clusters

Project description

tangled-pyrqlite - python client

PyPI PyPI Downloads Supported Versions License: MIT

A pure Python client for rqlite distributed SQLite clusters, providing:

  • DB-API 2.0 - Standard Python database API (PEP 249)
  • SQLAlchemy dialect - Full ORM support via SQLAlchemy 2.0
  • Parameterized queries - Safe, SQL injection-proof query execution
  • Serializable transaction support - Atomic batch operations using locking mechanism — bring your own distributed locking implementation (Redis, Valkey)

Installation

# Using uv (recommended)
uv add tangled-pyrqlite

# Using pip
pip install tangled-pyrqlite

Quick Start

Starting rqlite Server

Before using the client, start an rqlite server:

Podman (recommended - no root required):

podman rm -f rqlite-test
podman run -d --name rqlite-test -p 4001:4001 docker.io/rqlite/rqlite

Docker:

docker rm -f rqlite-test
docker run -d --name rqlite-test -p 4001:4001 rqlite/rqlite

DB-API 2.0 Usage

import rqlite
from rqlite import ReadConsistency, ThreadLock

# Basic connection (uses LINEARIZABLE consistency by default)
conn = rqlite.connect(host="localhost", port=4001)
cursor = conn.cursor()

# With custom read consistency and lock for transaction support
conn = rqlite.connect(
    host="localhost",
    port=4001,
    read_consistency=ReadConsistency.WEAK,  # or "weak" string
    lock=ThreadLock()  # Suppresses transaction warnings
)
cursor = conn.cursor()

# Create table
cursor.execute("""
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        email TEXT UNIQUE
    )
""")
conn.commit()

# Insert with positional parameters (recommended)
cursor.execute(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    ("Alice", "alice@example.com")
)

# Insert with named parameters (also supported)
cursor.execute(
    "INSERT INTO users (name, email) VALUES (:name, :email)",
    {"name": "Bob", "email": "bob@example.com"}
)
conn.commit()

# Query with positional parameters
cursor.execute("SELECT * FROM users WHERE name=?", ("Alice",))
row = cursor.fetchone()
print(row)  # (1, "Alice", "alice@example.com")

# Fetch all
cursor.execute("SELECT * FROM users")
for row in cursor:
    print(row)

# Close
cursor.close()
conn.close()

# Or use context managers
with rqlite.connect() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM users")
        for row in cursor:
            print(row)

Parameter Binding

The client supports both parameter styles per DB-API 2.0 standard:

Positional parameters (?) - Recommended:

cursor.execute("SELECT * FROM users WHERE id=? AND name=?", (42, "Alice"))

Named parameters (:name) - Also supported:

cursor.execute(
    "SELECT * FROM users WHERE id=:id AND name=:name",
    {"id": 42, "name": "Alice"}
)

Note for SQLAlchemy users: SQLAlchemy automatically uses positional parameters (?) for all queries. The ORM and Core layers handle parameter binding before reaching the dialect, so you don't need to worry about parameter format when using SQLAlchemy.

SQLAlchemy Usage

from sqlalchemy import Integer, String, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
from rqlite import ReadConsistency, ThreadLock


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(nullable=False)
    email: Mapped[str | None] = mapped_column(unique=True)


# Basic engine (uses LINEARIZABLE consistency by default)
engine = create_engine("rqlite://localhost:4001")

# With custom read consistency and lock via connect_args
engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={
        "read_consistency": ReadConsistency.WEAK,  # or "weak" string
        "lock": ThreadLock()  # Suppresses transaction warnings
    }
)

# Or use URL query parameter for read_consistency only
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")

# Create tables (echo=True shows SQL)
engine = create_engine("rqlite://localhost:4001", echo=True)
Base.metadata.create_all(engine)

# Use with Session
with Session(engine) as session:
    user = User(name="Charlie", email="charlie@example.com")
    session.add(user)
    session.commit()
    
    # Query
    user = session.query(User).filter_by(name="Charlie").first()
    print(user.name)  # Charlie

Features

Read Consistency Levels

rqlite supports multiple read consistency levels to balance between data freshness and performance. The client defaults to LINEARIZABLE for guaranteed fresh reads.

Level Speed Freshness Best For
LINEARIZABLE (default) Moderate Guaranteed fresh Critical reads requiring latest data
WEAK Fast Usually current (sub-second staleness possible) General-purpose reads
NONE Fastest No guarantee Read-only nodes, max performance
STRONG Slow Guaranteed fresh + applied Testing only
AUTO Varies Varies Mixed node type clusters

Usage:

import rqlite
from rqlite import ReadConsistency

# Use LINEARIZABLE (default) for guaranteed fresh reads
conn = rqlite.connect()

# Use WEAK for faster reads with possible sub-second staleness
# Supports both enum and string:
conn = rqlite.connect(read_consistency=ReadConsistency.WEAK)
conn = rqlite.connect(read_consistency="weak")

# Use NONE for read-only nodes or maximum performance
conn = rqlite.connect(read_consistency="none")

SQLAlchemy:

from sqlalchemy import create_engine

# Via URL query parameter
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")

# Via connect_args
from rqlite import ReadConsistency

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"read_consistency": ReadConsistency.WEAK}
)

See rqlite documentation for detailed explanations of each consistency level.

DB-API 2.0 Compliance

Feature Status Notes
connect() Returns Connection
Connection.cursor() Returns Cursor
Connection.commit() Queues then sends statements
Connection.rollback() Discards queued statements
Connection.close() Clears queue, closes resources
Cursor.execute() Supports positional and named params
Cursor.executemany() ⚠️ Executes sequentially
Cursor.fetchall() Returns all rows as tuples
Cursor.fetchmany() Respects arraysize
Cursor.fetchone() Returns single row or None
Cursor.description Column metadata after SELECT
Cursor.rowcount ⚠️ Only for write operations
Cursor.lastrowid Available after INSERT

SQLAlchemy Support

Feature Status Notes
Core SELECT/INSERT/UPDATE/DELETE Full support
ORM Models Full support
Relationships Via SQLite dialect
Sessions Standard SQLAlchemy sessions
Transactions ⚠️ Limited (see below)
Reflection ⚠️ Basic table/column introspection

Transaction Model

rqlite's transaction model differs from traditional databases:

How It Works

  1. Queue-based: Statements are queued locally until commit() is called
  2. Atomic batch: All queued statements sent in single HTTP request with ?transaction=true
  3. All-or-nothing: Either all statements succeed or none do

Important Limitations

  • ❌ No savepoints
  • ⚠️ Explicit BEGIN/COMMIT/ROLLBACK SQL is ignored (use Python API)
  • ❌ No native transaction isolation levels
  • ⚠️ rowcount not available for SELECT statements

Connection URLs

DB-API 2.0

# Basic connection (uses LINEARIZABLE consistency by default)
conn = rqlite.connect(host="localhost", port=4001)

# With authentication
conn = rqlite.connect(
    host="localhost",
    port=4001,
    username="admin",
    password="secret"
)

# Custom timeout
conn = rqlite.connect(host="localhost", port=4001, timeout=60.0)

# Custom read consistency (enum or string)
conn = rqlite.connect(host="localhost", port=4001, read_consistency="weak")

# Or using the enum:
from rqlite import ReadConsistency

conn = rqlite.connect(
    host="localhost",
    port=4001,
    read_consistency=ReadConsistency.WEAK
)

# With lock for transaction support (suppresses warnings)
from rqlite import ThreadLock

conn = rqlite.connect(
    host="localhost",
    port=4001,
    lock=ThreadLock()
)

# Combining read_consistency and lock
conn = rqlite.connect(
    host="localhost",
    port=4001,
    read_consistency=ReadConsistency.WEAK,
    lock=ThreadLock()
)

Redis Distributed Lock (optional)

For true cross-process transaction serialization, use the built-in Redis-backed locks. Install with the redis extra:

uv add tangled-pyrqlite[redis]

Start a Redis server:

podman rm -f redis-test
podman run -d --name redis-test -p 6379:6379 docker.io/redis

Sync DB-API 2.0 with RedisLock:

import rqlite
from rqlite import RedisLock

# Connect with Redis distributed lock
lock = RedisLock(name="transfer", timeout=10.0)
conn = rqlite.connect(host="localhost", port=4001, lock=lock)
cursor = conn.cursor()

# Use lock for serialized bank transfer
with lock:
    cursor.execute("SELECT balance FROM accounts WHERE id=?", (1,))
    balance = cursor.fetchone()[0]
    cursor.execute("UPDATE accounts SET balance=? WHERE id=?", (balance - 100, 1))
    conn.commit()

Async DB-API 2.0 with AioRedisLock:

import asyncio
import rqlite
from rqlite import AioRedisLock

async def transfer():
    lock = AioRedisLock(name="transfer", timeout=10.0)
    conn = rqlite.async_connect(lock=lock)
    cursor = await conn.cursor()

    async with lock:
        await cursor.execute("SELECT balance FROM accounts WHERE id=?", (1,))
        balance = cursor.fetchone()[0]
        await cursor.execute(
            "UPDATE accounts SET balance=? WHERE id=?",
            (balance - 100, 1),
        )
        await conn.commit()

    await cursor.close()
    await conn.close()

asyncio.run(transfer())

SQLAlchemy with RedisLock:

from sqlalchemy import create_engine
from rqlite import RedisLock

lock = RedisLock(name="sa_lock", timeout=10.0)
engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"lock": lock}
)

Available locks:

Lock Sync/Async Scope Use case
ThreadLock sync In-process threads Single process, thread-safe transactions
threading.Lock sync In-process threads Direct stdlib lock
RedisLock sync Cross-process (distributed) Multi-process, ACID isolation
AioLock async In-process tasks Single process, async-safe transactions
AioRedisLock async Cross-process (distributed) Multi-process async, ACID isolation

For full examples see: examples/sync_redis_lock_basic_usage.py, examples/async_redis_lock_basic_usage.py, examples/sync_redis_lock_distributed_transfer.py.


### SQLAlchemy

**Note:** For SQLAlchemy, custom parameters like `read_consistency` and `lock` must be passed via `connect_args` dictionary, not directly to `create_engine()`. This is because SQLAlchemy validates kwargs before passing them to the dialect.

```python
# Basic (uses LINEARIZABLE consistency by default)
engine = create_engine("rqlite://localhost:4001")

# With authentication
engine = create_engine("rqlite://admin:secret@localhost:4001")

# Enable SQL echo for debugging
engine = create_engine("rqlite://localhost:4001", echo=True)

# Custom read consistency via URL query parameter
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")

# Custom read consistency via connect_args
from rqlite import ReadConsistency

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"read_consistency": ReadConsistency.WEAK}
)

# With lock for transaction support (via connect_args)
from rqlite import ThreadLock

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"lock": ThreadLock()}
)

# Both read_consistency and lock together
engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={
        "read_consistency": ReadConsistency.WEAK,
        "lock": ThreadLock()
    }
)

Error Handling

import rqlite

try:
    conn = rqlite.connect()
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM nonexistent_table")
except rqlite.ProgrammingError as e:
    print(f"SQL error: {e}")
except rqlite.OperationalError as e:
    print(f"Connection error: {e}")
except rqlite.DatabaseError as e:
    print(f"Database error: {e}")

Exception Hierarchy

  • Error - Base exception
    • InterfaceError - Interface-related errors
    • DatabaseError - Database errors
      • DataError - Data-related errors
      • OperationalError - Connection/operation errors
      • IntegrityError - Constraint violations
      • InternalError - Internal database errors
      • ProgrammingError - SQL syntax errors
      • NotSupportedError - Unsupported operations

Examples

See the examples/ directory for complete working examples.

Running Sync Examples

Without lock (shows transaction warnings):

# Sync ThreadLock DB-API 2.0 examples with warnings
uv run python -B examples/sync_thread_lock_basic_usage.py

# Sync ThreadLock SQLAlchemy ORM examples with warnings
uv run python -B examples/sync_thread_lock_sqlalchemy_orm.py

With lock (no transaction warnings):

# Sync ThreadLock DB-API 2.0 examples without warnings
uv run python -B examples/sync_thread_lock_basic_usage.py --with-lock

# Sync ThreadLock SQLAlchemy ORM examples without warnings
uv run python -B examples/sync_thread_lock_sqlalchemy_orm.py --with-lock

The -B flag disables byte-code generation for cleaner output.

Example Files

File Description
sync_thread_lock_basic_usage.py Sync DB-API 2.0 CRUD with ThreadLock
async_aio_lock_basic_usage.py Async DB-API 2.0 examples with AioLock
sync_thread_lock_sqlalchemy_orm.py Sync SQLAlchemy ORM usage
async_aio_lock_sqlalchemy_orm.py Async SQLAlchemy ORM usage
sync_redis_lock_basic_usage.py Sync Redis distributed lock examples
async_redis_lock_basic_usage.py Async Redis distributed lock examples
sync_valkey_lock_basic_usage.py Sync Valkey distributed lock examples
async_valkey_lock_basic_usage.py Async Valkey distributed lock examples
sync_redis_lock_distributed_transfer.py Cross-process bank transfer demo (proves Redis lock works)

Locking Mechanism

The examples demonstrate the optional locking mechanism for transaction support:

  • Without lock: Shows warnings about explicit BEGIN/COMMIT/ROLLBACK SQL not being supported
  • With lock (--with-lock): Uses ThreadLock to suppress warnings, allowing explicit transaction SQL

For more details, see the Locking Mechanism section below.

Locking Mechanism

rqlite provides an optional locking mechanism to support transactions and suppress warnings about explicit transaction SQL commands (BEGIN/COMMIT/ROLLBACK/SAVEPOINT).

Why Use Locks?

By default, when using the rqlite library without a lock, you will receive a UserWarning:

UserWarning: Explicit BEGIN/COMMIT/ROLLBACK/SAVEPOINT SQL is not supported.

This warning is expected behavior and indicates that:

  • You are aware of rqlite's transaction model (queue-based, atomic batch)
  • You understand that explicit transaction SQL commands are not supported in the traditional sense
  • You are using the Python API (commit(), rollback()) for transaction control

This is fine if you understand how rqlite transactions work. However, if you need true ACID compliance with proper isolation guarantees, it is recommended to use a lock (e.g., ThreadLock()). The lock:

  • Suppresses the warning
  • Indicates intentional handling of transaction limitations
  • Provides thread-safety for concurrent operations

When you provide a lock, these warnings are suppressed, indicating that you're aware of the limitations and handling transactions appropriately.

Available Lock Classes

  1. ThreadLock (recommended) - Thread-safe wrapper around threading.Lock
  2. threading.Lock - Use directly (satisfies LockProtocol)
  3. Custom locks - Any class implementing LockProtocol

Usage Examples

DB-API 2.0 with ThreadLock:

import rqlite
from rqlite import ThreadLock

# Connect with lock to suppress warnings
conn = rqlite.connect(lock=ThreadLock())
cursor = conn.cursor()

# No warning about explicit transaction SQL
cursor.execute("BEGIN")
cursor.execute("INSERT INTO users (name) VALUES ('Alice')")
cursor.execute("COMMIT")

Using threading.Lock directly:

import rqlite
import threading

conn = rqlite.connect(lock=threading.Lock())
# Same behavior as ThreadLock

SQLAlchemy with lock:

from sqlalchemy import create_engine
from rqlite import ThreadLock

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"lock": ThreadLock()}
)
# No warnings when using explicit transaction SQL

Custom lock implementation:

import rqlite
from rqlite import LockProtocol

class MyLock:
    """Custom lock satisfying LockProtocol."""
    def __init__(self): pass
    def acquire(self, blocking=True, timeout=-1): return True
    def release(self): pass
    def __enter__(self): return self
    def __exit__(
        self,
        exc_type: type[Exception] | None,
        exc_val: Exception | None,
        exc_tb: object,
    ) -> None: pass

conn = rqlite.connect(lock=MyLock())

LockProtocol

Any lock implementation must satisfy the LockProtocol:

from typing import Protocol

class LockProtocol(Protocol):
    def __init__(self) -> None: ...
    def acquire(self, blocking: bool = ..., timeout: float = ...) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> "LockProtocol": ...
    def __exit__(
        self,
        exc_type: type[Exception] | None,
        exc_val: Exception | None,
        exc_tb: object,
    ) -> None: ...

Abstract Lock Class

The rqlite.Lock class is abstract and should NOT be instantiated directly. Use ThreadLock or threading.Lock instead:

from rqlite import Lock

# ❌ Don't do this - raises NotImplementedError
lock = Lock()

# ✅ Do this instead
from rqlite import ThreadLock
lock = ThreadLock()

Development

Setup

uv sync

Run Tests

# Start fresh rqlite first
podman rm -f rqlite-test
podman run -d --name rqlite-test -p 4001:4001 docker.io/rqlite/rqlite

# Run tests
pytest -v

# With coverage
pytest --cov=rqlite --cov-report=term-missing

Linting & Type Checking

uv run ruff check .
uv run ty check

Architecture

rqlite/
├── __init__.py              # Package init, exports
├── connection.py            # Connection class (DB-API 2.0 sync)
├── cursor.py                # Cursor class (DB-API 2.0 sync)
├── types.py                 # Type helpers & sync locks (ThreadLock, LockProtocol)
├── exceptions.py            # Exception classes
├── async_connection.py      # Async Connection class
├── async_cursor.py          # Async Cursor class
├── async_types.py           # Async locks (AioLock, AsyncLockProtocol)
├── redis_lock.py            # Redis distributed lock (sync, RedisLock)
├── async_redis_lock.py      # Async Redis distributed lock (AioRedisLock)
├── valkey_lock.py           # Valkey distributed lock (sync, ValkeyLock)
├── async_valkey_lock.py     # Async Valkey distributed lock (AioValkeyLock)
└── sqlalchemy/              # SQLAlchemy dialect
    ├── __init__.py          # Dialect exports
    ├── dialect.py           # RQLiteDialect implementation (sync)
    └── async_dialect.py     # AioRQLiteDialect implementation (async)

References

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tangled_pyrqlite-0.1.5.tar.gz (75.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tangled_pyrqlite-0.1.5-py3-none-any.whl (57.0 kB view details)

Uploaded Python 3

File details

Details for the file tangled_pyrqlite-0.1.5.tar.gz.

File metadata

  • Download URL: tangled_pyrqlite-0.1.5.tar.gz
  • Upload date:
  • Size: 75.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tangled_pyrqlite-0.1.5.tar.gz
Algorithm Hash digest
SHA256 d02295ec6787fc6191aee257e0c399d0faafcd944d395fc663534752ecc778bb
MD5 6fa150139657bcc458b7d7cdff15a6bd
BLAKE2b-256 e487e63dbec2cb8aa699bf77b24ac7f2a6942e9f793d38f5a56f8e4f7084ea4d

See more details on using hashes here.

File details

Details for the file tangled_pyrqlite-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: tangled_pyrqlite-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 57.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tangled_pyrqlite-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 03ef8d4781186dddf2f2cd1e33edfc050c29bc9a70074b5f4d588ee0f2faed38
MD5 bec2daeac9b474bdb5bb09ab9d6b34ca
BLAKE2b-256 b70d4a3e0c01d36604764ba7a68f3b778c57011388917276ee3861410f8ce923

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page