Skip to main content

DB-API 2.0 client and SQLAlchemy dialect for rqlite distributed SQLite clusters

Project description

tangled-pyrqlite - python client

PyPI PyPI Downloads Supported Versions License: MIT

A pure Python client for rqlite distributed SQLite clusters, providing:

  • DB-API 2.0 - Standard Python database API (PEP 249)
  • SQLAlchemy dialect - Full ORM support via SQLAlchemy 2.0
  • Parameterized queries - Safe, SQL injection-proof query execution
  • Serializable transaction support - Atomic batch operations using locking mechanism - bring your own distributed locking implementation Redis, Valkey, etcd3

Installation

# Using uv (recommended)
uv add tangled-pyrqlite

# Using pip
pip install tangled-pyrqlite

Quick Start

Starting rqlite Server

Before using the client, start an rqlite server:

Podman (recommended - no root required):

podman rm -f rqlite-test
podman run -d --name rqlite-test -p 4001:4001 docker.io/rqlite/rqlite

Docker:

docker rm -f rqlite-test
docker run -d --name rqlite-test -p 4001:4001 rqlite/rqlite

DB-API 2.0 Usage

import rqlite
from rqlite import ReadConsistency, ThreadLock

# Basic connection (uses LINEARIZABLE consistency by default)
conn = rqlite.connect(host="localhost", port=4001)
cursor = conn.cursor()

# With custom read consistency and lock for transaction support
conn = rqlite.connect(
    host="localhost",
    port=4001,
    read_consistency=ReadConsistency.WEAK,  # or "weak" string
    lock=ThreadLock()  # Suppresses transaction warnings
)
cursor = conn.cursor()

# Create table
cursor.execute("""
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        email TEXT UNIQUE
    )
""")
conn.commit()

# Insert with positional parameters (recommended)
cursor.execute(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    ("Alice", "alice@example.com")
)

# Insert with named parameters (also supported)
cursor.execute(
    "INSERT INTO users (name, email) VALUES (:name, :email)",
    {"name": "Bob", "email": "bob@example.com"}
)
conn.commit()

# Query with positional parameters
cursor.execute("SELECT * FROM users WHERE name=?", ("Alice",))
row = cursor.fetchone()
print(row)  # (1, "Alice", "alice@example.com")

# Fetch all
cursor.execute("SELECT * FROM users")
for row in cursor:
    print(row)

# Close
cursor.close()
conn.close()

# Or use context managers
with rqlite.connect() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM users")
        for row in cursor:
            print(row)

Parameter Binding

The client supports both parameter styles per DB-API 2.0 standard:

Positional parameters (?) - Recommended:

cursor.execute("SELECT * FROM users WHERE id=? AND name=?", (42, "Alice"))

Named parameters (:name) - Also supported:

cursor.execute(
    "SELECT * FROM users WHERE id=:id AND name=:name",
    {"id": 42, "name": "Alice"}
)

Note for SQLAlchemy users: SQLAlchemy automatically uses positional parameters (?) for all queries. The ORM and Core layers handle parameter binding before reaching the dialect, so you don't need to worry about parameter format when using SQLAlchemy.

SQLAlchemy Usage

from sqlalchemy import Integer, String, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
from rqlite import ReadConsistency, ThreadLock


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(nullable=False)
    email: Mapped[str | None] = mapped_column(unique=True)


# Basic engine (uses LINEARIZABLE consistency by default)
engine = create_engine("rqlite://localhost:4001")

# With custom read consistency and lock via connect_args
engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={
        "read_consistency": ReadConsistency.WEAK,  # or "weak" string
        "lock": ThreadLock()  # Suppresses transaction warnings
    }
)

# Or use URL query parameter for read_consistency only
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")

# Create tables (echo=True shows SQL)
engine = create_engine("rqlite://localhost:4001", echo=True)
Base.metadata.create_all(engine)

# Use with Session
with Session(engine) as session:
    user = User(name="Charlie", email="charlie@example.com")
    session.add(user)
    session.commit()
    
    # Query
    user = session.query(User).filter_by(name="Charlie").first()
    print(user.name)  # Charlie

Features

Read Consistency Levels

rqlite supports multiple read consistency levels to balance between data freshness and performance. The client defaults to LINEARIZABLE for guaranteed fresh reads.

Level Speed Freshness Best For
LINEARIZABLE (default) Moderate Guaranteed fresh Critical reads requiring latest data
WEAK Fast Usually current (sub-second staleness possible) General-purpose reads
NONE Fastest No guarantee Read-only nodes, max performance
STRONG Slow Guaranteed fresh + applied Testing only
AUTO Varies Varies Mixed node type clusters

Usage:

import rqlite
from rqlite import ReadConsistency

# Use LINEARIZABLE (default) for guaranteed fresh reads
conn = rqlite.connect()

# Use WEAK for faster reads with possible sub-second staleness
# Supports both enum and string:
conn = rqlite.connect(read_consistency=ReadConsistency.WEAK)
conn = rqlite.connect(read_consistency="weak")

# Use NONE for read-only nodes or maximum performance
conn = rqlite.connect(read_consistency="none")

SQLAlchemy:

from sqlalchemy import create_engine

# Via URL query parameter
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")

# Via connect_args
from rqlite import ReadConsistency

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"read_consistency": ReadConsistency.WEAK}
)

See rqlite documentation for detailed explanations of each consistency level.

DB-API 2.0 Compliance

Feature Status Notes
connect() Returns Connection
Connection.cursor() Returns Cursor
Connection.commit() Queues then sends statements
Connection.rollback() Discards queued statements
Connection.close() Clears queue, closes resources
Cursor.execute() Supports positional and named params
Cursor.executemany() ⚠️ Executes sequentially
Cursor.fetchall() Returns all rows as tuples
Cursor.fetchmany() Respects arraysize
Cursor.fetchone() Returns single row or None
Cursor.description Column metadata after SELECT
Cursor.rowcount ⚠️ Only for write operations
Cursor.lastrowid Available after INSERT

SQLAlchemy Support

Feature Status Notes
Core SELECT/INSERT/UPDATE/DELETE Full support
ORM Models Full support
Relationships Via SQLite dialect
Sessions Standard SQLAlchemy sessions
Transactions ⚠️ Limited (see below)
Reflection ⚠️ Basic table/column introspection

Transaction Model

rqlite's transaction model differs from traditional databases:

How It Works

  1. Queue-based: Statements are queued locally until commit() is called
  2. Atomic batch: All queued statements sent in single HTTP request with ?transaction=true
  3. All-or-nothing: Either all statements succeed or none do

Important Limitations

  • ❌ No savepoints
  • ⚠️ Explicit BEGIN/COMMIT/ROLLBACK SQL is ignored (use Python API)
  • ❌ No native transaction isolation levels
  • ⚠️ rowcount not available for SELECT statements

Connection URLs

DB-API 2.0

# Basic connection (uses LINEARIZABLE consistency by default)
conn = rqlite.connect(host="localhost", port=4001)

# With authentication
conn = rqlite.connect(
    host="localhost",
    port=4001,
    username="admin",
    password="secret"
)

# Custom timeout
conn = rqlite.connect(host="localhost", port=4001, timeout=60.0)

# Custom read consistency (enum or string)
conn = rqlite.connect(host="localhost", port=4001, read_consistency="weak")

# Or using the enum:
from rqlite import ReadConsistency

conn = rqlite.connect(
    host="localhost",
    port=4001,
    read_consistency=ReadConsistency.WEAK
)

# With lock for transaction support (suppresses warnings)
from rqlite import ThreadLock

conn = rqlite.connect(
    host="localhost",
    port=4001,
    lock=ThreadLock()
)

# Combining read_consistency and lock
conn = rqlite.connect(
    host="localhost",
    port=4001,
    read_consistency=ReadConsistency.WEAK,
    lock=ThreadLock()
)

Redis Distributed Lock (optional)

For true cross-process transaction serialization, use the built-in Redis-backed locks. Install with the redis extra:

uv add tangled-pyrqlite[redis]

Start a Redis server:

podman rm -f redis-test
podman run -d --name redis-test -p 6379:6379 docker.io/redis

Sync DB-API 2.0 with RedisLock:

import rqlite
from rqlite import RedisLock

# Connect with Redis distributed lock
lock = RedisLock(name="transfer", timeout=10.0)
conn = rqlite.connect(host="localhost", port=4001, lock=lock)
cursor = conn.cursor()

# Use lock for serialized bank transfer
with lock:
    cursor.execute("SELECT balance FROM accounts WHERE id=?", (1,))
    balance = cursor.fetchone()[0]
    cursor.execute("UPDATE accounts SET balance=? WHERE id=?", (balance - 100, 1))
    conn.commit()

Async DB-API 2.0 with AioRedisLock:

import asyncio
import rqlite
from rqlite import AioRedisLock

async def transfer():
    lock = AioRedisLock(name="transfer", timeout=10.0)
    conn = rqlite.async_connect(lock=lock)
    cursor = await conn.cursor()

    async with lock:
        await cursor.execute("SELECT balance FROM accounts WHERE id=?", (1,))
        balance = cursor.fetchone()[0]
        await cursor.execute(
            "UPDATE accounts SET balance=? WHERE id=?",
            (balance - 100, 1),
        )
        await conn.commit()

    await cursor.close()
    await conn.close()

asyncio.run(transfer())

SQLAlchemy with RedisLock:

from sqlalchemy import create_engine
from rqlite import RedisLock

lock = RedisLock(name="sa_lock", timeout=10.0)
engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"lock": lock}
)

Available locks:

Lock Sync/Async Scope Use case
ThreadLock sync In-process threads Single process, thread-safe transactions
threading.Lock sync In-process threads Direct stdlib lock
RedisLock sync Cross-process (distributed) Multi-process, ACID isolation
AioLock async In-process tasks Single process, async-safe transactions
AioRedisLock async Cross-process (distributed) Multi-process async, ACID isolation

For full examples see: examples/redis_lock_sync.py, examples/redis_lock_async.py, examples/distributed_transfer.py.


### SQLAlchemy

**Note:** For SQLAlchemy, custom parameters like `read_consistency` and `lock` must be passed via `connect_args` dictionary, not directly to `create_engine()`. This is because SQLAlchemy validates kwargs before passing them to the dialect.

```python
# Basic (uses LINEARIZABLE consistency by default)
engine = create_engine("rqlite://localhost:4001")

# With authentication
engine = create_engine("rqlite://admin:secret@localhost:4001")

# Enable SQL echo for debugging
engine = create_engine("rqlite://localhost:4001", echo=True)

# Custom read consistency via URL query parameter
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")

# Custom read consistency via connect_args
from rqlite import ReadConsistency

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"read_consistency": ReadConsistency.WEAK}
)

# With lock for transaction support (via connect_args)
from rqlite import ThreadLock

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"lock": ThreadLock()}
)

# Both read_consistency and lock together
engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={
        "read_consistency": ReadConsistency.WEAK,
        "lock": ThreadLock()
    }
)

Error Handling

import rqlite

try:
    conn = rqlite.connect()
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM nonexistent_table")
except rqlite.ProgrammingError as e:
    print(f"SQL error: {e}")
except rqlite.OperationalError as e:
    print(f"Connection error: {e}")
except rqlite.DatabaseError as e:
    print(f"Database error: {e}")

Exception Hierarchy

  • Error - Base exception
    • InterfaceError - Interface-related errors
    • DatabaseError - Database errors
      • DataError - Data-related errors
      • OperationalError - Connection/operation errors
      • IntegrityError - Constraint violations
      • InternalError - Internal database errors
      • ProgrammingError - SQL syntax errors
      • NotSupportedError - Unsupported operations

Examples

See the examples/ directory for complete working examples.

Running Sync Examples

Without lock (shows transaction warnings):

# DB-API 2.0 examples with warnings
uv run python -B examples/basic_usage.py

# SQLAlchemy ORM examples with warnings
uv run python -B examples/sqlalchemy_orm.py

With lock (no transaction warnings):

# DB-API 2.0 examples without warnings
uv run python -B examples/basic_usage.py --with-lock

# SQLAlchemy ORM examples without warnings
uv run python -B examples/sqlalchemy_orm.py --with-lock

The -B flag disables byte-code generation for cleaner output.

Example Files

File Description
basic_usage.py DB-API 2.0 CRUD operations
async_basic_usage.py Async DB-API 2.0 examples
sqlalchemy_orm.py SQLAlchemy ORM usage
redis_lock_sync.py Sync Redis distributed lock examples
redis_lock_async.py Async Redis distributed lock examples
distributed_transfer.py Cross-process bank transfer demo (proves Redis lock works)

Locking Mechanism

The examples demonstrate the optional locking mechanism for transaction support:

  • Without lock: Shows warnings about explicit BEGIN/COMMIT/ROLLBACK SQL not being supported
  • With lock (--with-lock): Uses ThreadLock to suppress warnings, allowing explicit transaction SQL

For more details, see the Locking Mechanism section below.

Locking Mechanism

rqlite provides an optional locking mechanism to support transactions and suppress warnings about explicit transaction SQL commands (BEGIN/COMMIT/ROLLBACK/SAVEPOINT).

Why Use Locks?

By default, when using the rqlite library without a lock, you will receive a UserWarning:

UserWarning: Explicit BEGIN/COMMIT/ROLLBACK/SAVEPOINT SQL is not supported.

This warning is expected behavior and indicates that:

  • You are aware of rqlite's transaction model (queue-based, atomic batch)
  • You understand that explicit transaction SQL commands are not supported in the traditional sense
  • You are using the Python API (commit(), rollback()) for transaction control

This is fine if you understand how rqlite transactions work. However, if you need true ACID compliance with proper isolation guarantees, it is recommended to use a lock (e.g., ThreadLock()). The lock:

  • Suppresses the warning
  • Indicates intentional handling of transaction limitations
  • Provides thread-safety for concurrent operations

When you provide a lock, these warnings are suppressed, indicating that you're aware of the limitations and handling transactions appropriately.

Available Lock Classes

  1. ThreadLock (recommended) - Thread-safe wrapper around threading.Lock
  2. threading.Lock - Use directly (satisfies LockProtocol)
  3. Custom locks - Any class implementing LockProtocol

Usage Examples

DB-API 2.0 with ThreadLock:

import rqlite
from rqlite import ThreadLock

# Connect with lock to suppress warnings
conn = rqlite.connect(lock=ThreadLock())
cursor = conn.cursor()

# No warning about explicit transaction SQL
cursor.execute("BEGIN")
cursor.execute("INSERT INTO users (name) VALUES ('Alice')")
cursor.execute("COMMIT")

Using threading.Lock directly:

import rqlite
import threading

conn = rqlite.connect(lock=threading.Lock())
# Same behavior as ThreadLock

SQLAlchemy with lock:

from sqlalchemy import create_engine
from rqlite import ThreadLock

engine = create_engine(
    "rqlite://localhost:4001",
    connect_args={"lock": ThreadLock()}
)
# No warnings when using explicit transaction SQL

Custom lock implementation:

import rqlite
from rqlite import LockProtocol

class MyLock:
    """Custom lock satisfying LockProtocol."""
    def __init__(self): pass
    def acquire(self, blocking=True, timeout=-1): return True
    def release(self): pass
    def __enter__(self): return self
    def __exit__(
        self,
        exc_type: type[Exception] | None,
        exc_val: Exception | None,
        exc_tb: object,
    ) -> None: pass

conn = rqlite.connect(lock=MyLock())

LockProtocol

Any lock implementation must satisfy the LockProtocol:

from typing import Protocol

class LockProtocol(Protocol):
    def __init__(self) -> None: ...
    def acquire(self, blocking: bool = ..., timeout: float = ...) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> "LockProtocol": ...
    def __exit__(
        self,
        exc_type: type[Exception] | None,
        exc_val: Exception | None,
        exc_tb: object,
    ) -> None: ...

Abstract Lock Class

The rqlite.Lock class is abstract and should NOT be instantiated directly. Use ThreadLock or threading.Lock instead:

from rqlite import Lock

# ❌ Don't do this - raises NotImplementedError
lock = Lock()

# ✅ Do this instead
from rqlite import ThreadLock
lock = ThreadLock()

Development

Setup

uv sync

Run Tests

# Start fresh rqlite first
podman rm -f rqlite-test
podman run -d --name rqlite-test -p 4001:4001 docker.io/rqlite/rqlite

# Run tests
pytest -v

# With coverage
pytest --cov=rqlite --cov-report=term-missing

Linting & Type Checking

uv run ruff check .
uv run ty check

Architecture

rqlite/
├── __init__.py              # Package init, exports
├── connection.py            # Connection class (DB-API 2.0)
├── cursor.py                # Cursor class (DB-API 2.0)
├── types.py                 # Type helpers & sync locks
├── exceptions.py            # Exception classes
├── redis_lock.py            # Redis distributed lock (sync)
├── async_redis_lock.py      # Async Redis distributed lock
├── async_connection.py      # Async Connection class
├── async_cursor.py          # Async Cursor class
└── async_types.py           # Async locks
    └── sqlalchemy/           # SQLAlchemy dialect
        ├── __init__.py       # Dialect exports
        ├── dialect.py        # RQLiteDialect implementation
        └── async_dialect.py  # AioRQLiteDialect implementation

References

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tangled_pyrqlite-0.1.3.tar.gz (73.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tangled_pyrqlite-0.1.3-py3-none-any.whl (56.7 kB view details)

Uploaded Python 3

File details

Details for the file tangled_pyrqlite-0.1.3.tar.gz.

File metadata

  • Download URL: tangled_pyrqlite-0.1.3.tar.gz
  • Upload date:
  • Size: 73.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tangled_pyrqlite-0.1.3.tar.gz
Algorithm Hash digest
SHA256 296d9148e620bad69f75a38cb2e9cdfc10de39e015548bb1967d8573419022d1
MD5 85fadf83e85986dfde0eaa5b8b5f523a
BLAKE2b-256 999cdfd36334548d32641b68170fecfd4dd6c0e885bb6d4da655ced96bcf7d20

See more details on using hashes here.

File details

Details for the file tangled_pyrqlite-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: tangled_pyrqlite-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 56.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tangled_pyrqlite-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 31a49ee0f408648685570f7845be0f5d91635c4d95ba69105a56c091342455bf
MD5 db7a76fd1c6b4fccd919f2d192340f78
BLAKE2b-256 2b4943cb8922c5a44b993553d1446b315d3987925334dacdf9590f627626bc58

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page