DB-API 2.0 client and SQLAlchemy dialect for rqlite distributed SQLite clusters
Project description
tangled-pyrqlite - python client
A pure Python client for rqlite distributed SQLite clusters, providing:
- DB-API 2.0 - Standard Python database API (PEP 249)
- SQLAlchemy dialect - Full ORM support via SQLAlchemy 2.0
- Parameterized queries - Safe, SQL injection-proof query execution
- Serializable transaction support - Atomic batch operations using locking mechanism — bring your own distributed locking implementation (Redis, Valkey)
Installation
# Using uv (recommended)
uv add tangled-pyrqlite
# Using pip
pip install tangled-pyrqlite
Quick Start
Starting rqlite Server
Before using the client, start an rqlite server:
Podman (recommended - no root required):
podman rm -f rqlite-test
podman run -d --name rqlite-test -p 4001:4001 docker.io/rqlite/rqlite
Docker:
docker rm -f rqlite-test
docker run -d --name rqlite-test -p 4001:4001 rqlite/rqlite
DB-API 2.0 Usage
import rqlite
from rqlite import ReadConsistency, ThreadLock
# Basic connection (uses LINEARIZABLE consistency by default)
conn = rqlite.connect(host="localhost", port=4001)
cursor = conn.cursor()
# With custom read consistency and lock for transaction support
conn = rqlite.connect(
host="localhost",
port=4001,
read_consistency=ReadConsistency.WEAK, # or "weak" string
lock=ThreadLock() # Suppresses transaction warnings
)
cursor = conn.cursor()
# Create table
cursor.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
conn.commit()
# Insert with positional parameters (recommended)
cursor.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
("Alice", "alice@example.com")
)
# Insert with named parameters (also supported)
cursor.execute(
"INSERT INTO users (name, email) VALUES (:name, :email)",
{"name": "Bob", "email": "bob@example.com"}
)
conn.commit()
# Query with positional parameters
cursor.execute("SELECT * FROM users WHERE name=?", ("Alice",))
row = cursor.fetchone()
print(row) # (1, "Alice", "alice@example.com")
# Fetch all
cursor.execute("SELECT * FROM users")
for row in cursor:
print(row)
# Close
cursor.close()
conn.close()
# Or use context managers
with rqlite.connect() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users")
for row in cursor:
print(row)
Parameter Binding
The client supports both parameter styles per DB-API 2.0 standard:
Positional parameters (?) - Recommended:
cursor.execute("SELECT * FROM users WHERE id=? AND name=?", (42, "Alice"))
Named parameters (:name) - Also supported:
cursor.execute(
"SELECT * FROM users WHERE id=:id AND name=:name",
{"id": 42, "name": "Alice"}
)
Note for SQLAlchemy users: SQLAlchemy automatically uses positional parameters (?) for all queries. The ORM and Core layers handle parameter binding before reaching the dialect, so you don't need to worry about parameter format when using SQLAlchemy.
SQLAlchemy Usage
from sqlalchemy import Integer, String, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
from rqlite import ReadConsistency, ThreadLock
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(nullable=False)
email: Mapped[str | None] = mapped_column(unique=True)
# Basic engine (uses LINEARIZABLE consistency by default)
engine = create_engine("rqlite://localhost:4001")
# With custom read consistency and lock via connect_args
engine = create_engine(
"rqlite://localhost:4001",
connect_args={
"read_consistency": ReadConsistency.WEAK, # or "weak" string
"lock": ThreadLock() # Suppresses transaction warnings
}
)
# Or use URL query parameter for read_consistency only
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")
# Create tables (echo=True shows SQL)
engine = create_engine("rqlite://localhost:4001", echo=True)
Base.metadata.create_all(engine)
# Use with Session
with Session(engine) as session:
user = User(name="Charlie", email="charlie@example.com")
session.add(user)
session.commit()
# Query
user = session.query(User).filter_by(name="Charlie").first()
print(user.name) # Charlie
Features
Read Consistency Levels
rqlite supports multiple read consistency levels to balance between data freshness and performance. The client defaults to LINEARIZABLE for guaranteed fresh reads.
| Level | Speed | Freshness | Best For |
|---|---|---|---|
| LINEARIZABLE (default) | Moderate | Guaranteed fresh | Critical reads requiring latest data |
| WEAK | Fast | Usually current (sub-second staleness possible) | General-purpose reads |
| NONE | Fastest | No guarantee | Read-only nodes, max performance |
| STRONG | Slow | Guaranteed fresh + applied | Testing only |
| AUTO | Varies | Varies | Mixed node type clusters |
Usage:
import rqlite
from rqlite import ReadConsistency
# Use LINEARIZABLE (default) for guaranteed fresh reads
conn = rqlite.connect()
# Use WEAK for faster reads with possible sub-second staleness
# Supports both enum and string:
conn = rqlite.connect(read_consistency=ReadConsistency.WEAK)
conn = rqlite.connect(read_consistency="weak")
# Use NONE for read-only nodes or maximum performance
conn = rqlite.connect(read_consistency="none")
SQLAlchemy:
from sqlalchemy import create_engine
# Via URL query parameter
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")
# Via connect_args
from rqlite import ReadConsistency
engine = create_engine(
"rqlite://localhost:4001",
connect_args={"read_consistency": ReadConsistency.WEAK}
)
See rqlite documentation for detailed explanations of each consistency level.
DB-API 2.0 Compliance
| Feature | Status | Notes |
|---|---|---|
connect() |
✅ | Returns Connection |
Connection.cursor() |
✅ | Returns Cursor |
Connection.commit() |
✅ | Queues then sends statements |
Connection.rollback() |
✅ | Discards queued statements |
Connection.close() |
✅ | Clears queue, closes resources |
Cursor.execute() |
✅ | Supports positional and named params |
Cursor.executemany() |
⚠️ | Executes sequentially |
Cursor.fetchall() |
✅ | Returns all rows as tuples |
Cursor.fetchmany() |
✅ | Respects arraysize |
Cursor.fetchone() |
✅ | Returns single row or None |
Cursor.description |
✅ | Column metadata after SELECT |
Cursor.rowcount |
⚠️ | Only for write operations |
Cursor.lastrowid |
✅ | Available after INSERT |
SQLAlchemy Support
| Feature | Status | Notes |
|---|---|---|
| Core SELECT/INSERT/UPDATE/DELETE | ✅ | Full support |
| ORM Models | ✅ | Full support |
| Relationships | ✅ | Via SQLite dialect |
| Sessions | ✅ | Standard SQLAlchemy sessions |
| Transactions | ⚠️ | Limited (see below) |
| Reflection | ⚠️ | Basic table/column introspection |
Transaction Model
rqlite's transaction model differs from traditional databases:
How It Works
- Queue-based: Statements are queued locally until
commit()is called - Atomic batch: All queued statements sent in single HTTP request with
?transaction=true - All-or-nothing: Either all statements succeed or none do
Important Limitations
- ❌ No savepoints
- ⚠️ Explicit
BEGIN/COMMIT/ROLLBACKSQL is ignored (use Python API) - ❌ No native transaction isolation levels
- ⚠️
rowcountnot available for SELECT statements
Connection URLs
DB-API 2.0
# Basic connection (uses LINEARIZABLE consistency by default)
conn = rqlite.connect(host="localhost", port=4001)
# With authentication
conn = rqlite.connect(
host="localhost",
port=4001,
username="admin",
password="secret"
)
# Custom timeout
conn = rqlite.connect(host="localhost", port=4001, timeout=60.0)
# Custom read consistency (enum or string)
conn = rqlite.connect(host="localhost", port=4001, read_consistency="weak")
# Or using the enum:
from rqlite import ReadConsistency
conn = rqlite.connect(
host="localhost",
port=4001,
read_consistency=ReadConsistency.WEAK
)
# With lock for transaction support (suppresses warnings)
from rqlite import ThreadLock
conn = rqlite.connect(
host="localhost",
port=4001,
lock=ThreadLock()
)
# Combining read_consistency and lock
conn = rqlite.connect(
host="localhost",
port=4001,
read_consistency=ReadConsistency.WEAK,
lock=ThreadLock()
)
Redis Distributed Lock (optional)
For true cross-process transaction serialization, use the built-in Redis-backed locks.
Install with the redis extra:
uv add tangled-pyrqlite[redis]
Start a Redis server:
podman rm -f redis-test
podman run -d --name redis-test -p 6379:6379 docker.io/redis
Sync DB-API 2.0 with RedisLock:
import rqlite
from rqlite import RedisLock
# Connect with Redis distributed lock
lock = RedisLock(name="transfer", timeout=10.0)
conn = rqlite.connect(host="localhost", port=4001, lock=lock)
cursor = conn.cursor()
# Use lock for serialized bank transfer
with lock:
cursor.execute("SELECT balance FROM accounts WHERE id=?", (1,))
balance = cursor.fetchone()[0]
cursor.execute("UPDATE accounts SET balance=? WHERE id=?", (balance - 100, 1))
conn.commit()
Async DB-API 2.0 with AioRedisLock:
import asyncio
import rqlite
from rqlite import AioRedisLock
async def transfer():
lock = AioRedisLock(name="transfer", timeout=10.0)
conn = rqlite.async_connect(lock=lock)
cursor = await conn.cursor()
async with lock:
await cursor.execute("SELECT balance FROM accounts WHERE id=?", (1,))
balance = cursor.fetchone()[0]
await cursor.execute(
"UPDATE accounts SET balance=? WHERE id=?",
(balance - 100, 1),
)
await conn.commit()
await cursor.close()
await conn.close()
asyncio.run(transfer())
SQLAlchemy with RedisLock:
from sqlalchemy import create_engine
from rqlite import RedisLock
lock = RedisLock(name="sa_lock", timeout=10.0)
engine = create_engine(
"rqlite://localhost:4001",
connect_args={"lock": lock}
)
Available locks:
| Lock | Sync/Async | Scope | Use case |
|---|---|---|---|
ThreadLock |
sync | In-process threads | Single process, thread-safe transactions |
threading.Lock |
sync | In-process threads | Direct stdlib lock |
RedisLock |
sync | Cross-process (distributed) | Multi-process, ACID isolation |
AioLock |
async | In-process tasks | Single process, async-safe transactions |
AioRedisLock |
async | Cross-process (distributed) | Multi-process async, ACID isolation |
For full examples see: examples/sync_redis_lock_basic_usage.py, examples/async_redis_lock_basic_usage.py,
examples/sync_redis_lock_distributed_transfer.py.
### SQLAlchemy
**Note:** For SQLAlchemy, custom parameters like `read_consistency` and `lock` must be passed via `connect_args` dictionary, not directly to `create_engine()`. This is because SQLAlchemy validates kwargs before passing them to the dialect.
```python
# Basic (uses LINEARIZABLE consistency by default)
engine = create_engine("rqlite://localhost:4001")
# With authentication
engine = create_engine("rqlite://admin:secret@localhost:4001")
# Enable SQL echo for debugging
engine = create_engine("rqlite://localhost:4001", echo=True)
# Custom read consistency via URL query parameter
engine = create_engine("rqlite://localhost:4001?read_consistency=weak")
# Custom read consistency via connect_args
from rqlite import ReadConsistency
engine = create_engine(
"rqlite://localhost:4001",
connect_args={"read_consistency": ReadConsistency.WEAK}
)
# With lock for transaction support (via connect_args)
from rqlite import ThreadLock
engine = create_engine(
"rqlite://localhost:4001",
connect_args={"lock": ThreadLock()}
)
# Both read_consistency and lock together
engine = create_engine(
"rqlite://localhost:4001",
connect_args={
"read_consistency": ReadConsistency.WEAK,
"lock": ThreadLock()
}
)
Error Handling
import rqlite
try:
conn = rqlite.connect()
cursor = conn.cursor()
cursor.execute("SELECT * FROM nonexistent_table")
except rqlite.ProgrammingError as e:
print(f"SQL error: {e}")
except rqlite.OperationalError as e:
print(f"Connection error: {e}")
except rqlite.DatabaseError as e:
print(f"Database error: {e}")
Exception Hierarchy
Error- Base exceptionInterfaceError- Interface-related errorsDatabaseError- Database errorsDataError- Data-related errorsOperationalError- Connection/operation errorsIntegrityError- Constraint violationsInternalError- Internal database errorsProgrammingError- SQL syntax errorsNotSupportedError- Unsupported operations
Examples
See the examples/ directory for complete working examples.
Running Sync Examples
Without lock (shows transaction warnings):
# Sync ThreadLock DB-API 2.0 examples with warnings
uv run python -B examples/sync_thread_lock_basic_usage.py
# Sync ThreadLock SQLAlchemy ORM examples with warnings
uv run python -B examples/sync_thread_lock_sqlalchemy_orm.py
With lock (no transaction warnings):
# Sync ThreadLock DB-API 2.0 examples without warnings
uv run python -B examples/sync_thread_lock_basic_usage.py --with-lock
# Sync ThreadLock SQLAlchemy ORM examples without warnings
uv run python -B examples/sync_thread_lock_sqlalchemy_orm.py --with-lock
The -B flag disables byte-code generation for cleaner output.
Example Files
| File | Description |
|---|---|
sync_thread_lock_basic_usage.py |
Sync DB-API 2.0 CRUD with ThreadLock |
async_aio_lock_basic_usage.py |
Async DB-API 2.0 examples with AioLock |
sync_thread_lock_sqlalchemy_orm.py |
Sync SQLAlchemy ORM usage |
async_aio_lock_sqlalchemy_orm.py |
Async SQLAlchemy ORM usage |
sync_redis_lock_basic_usage.py |
Sync Redis distributed lock examples |
async_redis_lock_basic_usage.py |
Async Redis distributed lock examples |
sync_valkey_lock_basic_usage.py |
Sync Valkey distributed lock examples |
async_valkey_lock_basic_usage.py |
Async Valkey distributed lock examples |
sync_redis_lock_distributed_transfer.py |
Cross-process bank transfer demo (proves Redis lock works) |
Locking Mechanism
The examples demonstrate the optional locking mechanism for transaction support:
- Without lock: Shows warnings about explicit BEGIN/COMMIT/ROLLBACK SQL not being supported
- With lock (
--with-lock): UsesThreadLockto suppress warnings, allowing explicit transaction SQL
For more details, see the Locking Mechanism section below.
Locking Mechanism
rqlite provides an optional locking mechanism to support transactions and suppress warnings about explicit transaction SQL commands (BEGIN/COMMIT/ROLLBACK/SAVEPOINT).
Why Use Locks?
By default, when using the rqlite library without a lock, you will receive a UserWarning:
UserWarning: Explicit BEGIN/COMMIT/ROLLBACK/SAVEPOINT SQL is not supported.
This warning is expected behavior and indicates that:
- You are aware of rqlite's transaction model (queue-based, atomic batch)
- You understand that explicit transaction SQL commands are not supported in the traditional sense
- You are using the Python API (
commit(),rollback()) for transaction control
This is fine if you understand how rqlite transactions work. However, if you need true ACID compliance with proper isolation guarantees, it is recommended to use a lock (e.g., ThreadLock()). The lock:
- Suppresses the warning
- Indicates intentional handling of transaction limitations
- Provides thread-safety for concurrent operations
When you provide a lock, these warnings are suppressed, indicating that you're aware of the limitations and handling transactions appropriately.
Available Lock Classes
ThreadLock(recommended) - Thread-safe wrapper aroundthreading.Lockthreading.Lock- Use directly (satisfiesLockProtocol)- Custom locks - Any class implementing
LockProtocol
Usage Examples
DB-API 2.0 with ThreadLock:
import rqlite
from rqlite import ThreadLock
# Connect with lock to suppress warnings
conn = rqlite.connect(lock=ThreadLock())
cursor = conn.cursor()
# No warning about explicit transaction SQL
cursor.execute("BEGIN")
cursor.execute("INSERT INTO users (name) VALUES ('Alice')")
cursor.execute("COMMIT")
Using threading.Lock directly:
import rqlite
import threading
conn = rqlite.connect(lock=threading.Lock())
# Same behavior as ThreadLock
SQLAlchemy with lock:
from sqlalchemy import create_engine
from rqlite import ThreadLock
engine = create_engine(
"rqlite://localhost:4001",
connect_args={"lock": ThreadLock()}
)
# No warnings when using explicit transaction SQL
Custom lock implementation:
import rqlite
from rqlite import LockProtocol
class MyLock:
"""Custom lock satisfying LockProtocol."""
def __init__(self): pass
def acquire(self, blocking=True, timeout=-1): return True
def release(self): pass
def __enter__(self): return self
def __exit__(
self,
exc_type: type[Exception] | None,
exc_val: Exception | None,
exc_tb: object,
) -> None: pass
conn = rqlite.connect(lock=MyLock())
LockProtocol
Any lock implementation must satisfy the LockProtocol:
from typing import Protocol
class LockProtocol(Protocol):
def __init__(self) -> None: ...
def acquire(self, blocking: bool = ..., timeout: float = ...) -> bool: ...
def release(self) -> None: ...
def __enter__(self) -> "LockProtocol": ...
def __exit__(
self,
exc_type: type[Exception] | None,
exc_val: Exception | None,
exc_tb: object,
) -> None: ...
Abstract Lock Class
The rqlite.Lock class is abstract and should NOT be instantiated directly. Use ThreadLock or threading.Lock instead:
from rqlite import Lock
# ❌ Don't do this - raises NotImplementedError
lock = Lock()
# ✅ Do this instead
from rqlite import ThreadLock
lock = ThreadLock()
Development
Setup
uv sync
Run Tests
# Start fresh rqlite first
podman rm -f rqlite-test
podman run -d --name rqlite-test -p 4001:4001 docker.io/rqlite/rqlite
# Run tests
pytest -v
# With coverage
pytest --cov=rqlite --cov-report=term-missing
Linting & Type Checking
uv run ruff check .
uv run ty check
Architecture
rqlite/
├── __init__.py # Package init, exports
├── connection.py # Connection class (DB-API 2.0 sync)
├── cursor.py # Cursor class (DB-API 2.0 sync)
├── types.py # Type helpers & sync locks (ThreadLock, LockProtocol)
├── exceptions.py # Exception classes
├── async_connection.py # Async Connection class
├── async_cursor.py # Async Cursor class
├── async_types.py # Async locks (AioLock, AsyncLockProtocol)
├── redis_lock.py # Redis distributed lock (sync, RedisLock)
├── async_redis_lock.py # Async Redis distributed lock (AioRedisLock)
├── valkey_lock.py # Valkey distributed lock (sync, ValkeyLock)
├── async_valkey_lock.py # Async Valkey distributed lock (AioValkeyLock)
└── sqlalchemy/ # SQLAlchemy dialect
├── __init__.py # Dialect exports
├── dialect.py # RQLiteDialect implementation (sync)
└── async_dialect.py # AioRQLiteDialect implementation (async)
References
- rqlite — Distributed SQLite database
- Python DB-API 2.0 (PEP 249) — Python database API specification
- SQLAlchemy Documentation — Python SQL toolkit and ORM
- Redis — Redis in-memory data store (distributed locking)
- Valkey — Valkey in-memory data store (distributed locking, Redis-compatible)
License
MIT License - see LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tangled_pyrqlite-0.1.5.tar.gz.
File metadata
- Download URL: tangled_pyrqlite-0.1.5.tar.gz
- Upload date:
- Size: 75.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d02295ec6787fc6191aee257e0c399d0faafcd944d395fc663534752ecc778bb
|
|
| MD5 |
6fa150139657bcc458b7d7cdff15a6bd
|
|
| BLAKE2b-256 |
e487e63dbec2cb8aa699bf77b24ac7f2a6942e9f793d38f5a56f8e4f7084ea4d
|
File details
Details for the file tangled_pyrqlite-0.1.5-py3-none-any.whl.
File metadata
- Download URL: tangled_pyrqlite-0.1.5-py3-none-any.whl
- Upload date:
- Size: 57.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03ef8d4781186dddf2f2cd1e33edfc050c29bc9a70074b5f4d588ee0f2faed38
|
|
| MD5 |
bec2daeac9b474bdb5bb09ab9d6b34ca
|
|
| BLAKE2b-256 |
b70d4a3e0c01d36604764ba7a68f3b778c57011388917276ee3861410f8ce923
|