Production-grade async connection pooler for SurrealDB
Project description
Purreal
Quick Test (After Git Pull)
# 1. Start SurrealDB
surreal start --bind 0.0.0.0:8000 --user root --pass root
# 2. Install purreal
pip install -e .
# 3. Test connectivity (takes 5 seconds)
python tests/test_connectivity.py
# 4. Test 500 concurrent connections (optional)
python tests/stress_test_simple.py 500
Quick test runner:
# Linux/Mac
./test.sh
# Windows
test.bat
See QUICKSTART.md for full setup guide.
Production-Grade SurrealDB Connection Pooling
Overview
Purreal is a production-grade async connection pooler for SurrealDB that solves the critical websockets.exceptions.ConcurrencyError: cannot call recv while another coro is calling recv issue in high-concurrency Python applications.
By ensuring exclusive connection leasing and sophisticated connection lifecycle management, Purreal enables your async applications to safely handle thousands of concurrent database operations without race conditions or connection conflicts.
Why Purreal?
The Problem: SurrealDB's Python client connections cannot safely handle concurrent operations from multiple coroutines. Attempting to share a single connection results in ConcurrencyError crashes.
The Solution: Purreal provides:
- Exclusive connection leasing - each coroutine gets its own connection
- Automatic pool management - connections created/destroyed based on demand
- Robust error handling - connections auto-replaced on failure
- Production-ready - battle-tested with comprehensive logging and stats
Key Features
Core Pooling
- Exclusive Connection Leasing: Prevents
ConcurrencyErrorby ensuring only one coroutine uses a connection at a time - Dynamic Pool Sizing: Auto-scales between
min_connectionsandmax_connectionsbased on load - Connection Queueing: Fair FIFO queue when pool exhausted, with configurable timeout
- Async Context Manager: Safe
async withpattern guarantees connection return
Reliability & Health
- Automatic Health Checks: Background maintenance loop validates connection health
- Connection Lifecycle Management: Tracks usage count, age, and health status
- Intelligent Retry Logic: Exponential backoff with jitter for transient failures
- Graceful Degradation: Continues operating even when some connections fail
Production Features
- Connection State Reset: Optional
reset_on_returnto clear session state - Schema Initialization: Auto-execute
.surqlschema files on connection creation - Comprehensive Stats: Track acquisitions, timeouts, errors, peak usage
- Detailed Logging: Debug-level connection lifecycle tracing
- Query Logging: Optional request/response logging for debugging
Installation
pip install purreal
Requirements:
- Python 3.11+
- surrealdb >= 0.3.0
Quick Start
Basic Usage
import asyncio
from purreal import SurrealDBConnectionPool
async def main():
# Initialize pool
pool = SurrealDBConnectionPool(
uri="ws://localhost:8000/rpc",
credentials={"username": "root", "password": "root"},
namespace="test",
database="test",
min_connections=5,
max_connections=20,
)
# Use async context manager (handles init + cleanup)
async with pool:
# Acquire exclusive connection
async with pool.acquire() as conn:
result = await conn.query("SELECT * FROM users")
print(result)
asyncio.run(main())
Production Example with Error Handling
import asyncio
import logging
from purreal import SurrealDBConnectionPool
logging.basicConfig(level=logging.INFO)
async def process_user_batch(pool, user_ids):
"""Process multiple users concurrently using the pool."""
async def process_user(user_id):
try:
async with pool.acquire() as conn:
# Each coroutine gets exclusive connection access
user = await conn.query(f"SELECT * FROM user:{user_id}")
await conn.query(
f"UPDATE user:{user_id} SET last_accessed = time::now()"
)
return user
except asyncio.TimeoutError:
logging.warning(f"Timeout acquiring connection for user {user_id}")
return None
except Exception as e:
logging.error(f"Error processing user {user_id}: {e}")
return None
# Process all users concurrently - pool handles queueing
results = await asyncio.gather(*[process_user(uid) for uid in user_ids])
return [r for r in results if r is not None]
async def main():
pool = SurrealDBConnectionPool(
uri="wss://mydb.surreal.cloud",
credentials={"username": "admin", "password": "secure_pass"},
namespace="production",
database="app",
min_connections=10,
max_connections=50,
acquisition_timeout=30.0, # Wait up to 30s for connection
health_check_interval=60.0, # Check health every 60s
log_queries=True, # Log all queries (disable in prod)
schema_file="schema.surql", # Auto-execute on new connections
)
async with pool:
# Simulate high-concurrency workload
user_ids = [f"user_{i}" for i in range(100)]
results = await process_user_batch(pool, user_ids)
# Check pool stats
stats = await pool.get_stats()
logging.info(f"Pool stats: {stats}")
asyncio.run(main())
Configuration Options
Required Parameters
uri(str): SurrealDB connection URI (e.g.,ws://localhost:8000/rpc,wss://cloud.surreal.io)credentials(dict): Authentication credentials{"username": "...", "password": "..."}namespace(str): SurrealDB namespacedatabase(str): SurrealDB database name
Pool Sizing
min_connections(int, default: 4): Minimum connections maintained in poolmax_connections(int, default: 10): Maximum connections allowedacquisition_timeout(float, default: 10.0): Seconds to wait for available connection
Connection Lifecycle
max_idle_time(float, default: 300.0): Seconds before idle connection is recycledmax_usage_count(int, default: 1000): Max queries before connection recycledconnection_timeout(float, default: 25.0): Seconds to establish new connectionconnection_retry_attempts(int, default: 3): Retries for failed connectionsconnection_retry_delay(float, default: 1.0): Base delay between retries (exponential backoff)
Health & Maintenance
health_check_interval(float, default: 30.0): Seconds between health checksreset_on_return(bool, default: True): Reset connection state on release
Advanced
schema_file(str, optional): Path to.surqlfile to execute on new connectionson_connection_create(callable, optional): Async callback when connection createdlog_queries(bool, default: False): Log all queries (useful for debugging)
API Reference
SurrealDBConnectionPool
Initialization
pool = SurrealDBConnectionPool(
uri, credentials, namespace, database,
min_connections=4, max_connections=10,
acquisition_timeout=10.0, **kwargs
)
Context Manager (Recommended)
async with pool: # Calls initialize() and close() automatically
async with pool.acquire() as conn:
await conn.query("SELECT * FROM table")
Methods
async with acquire() -> SurrealConnection
- Acquires exclusive connection from pool
- Must use with
async withto ensure proper release - Raises
asyncio.TimeoutErrorif pool exhausted foracquisition_timeoutseconds - Raises
RuntimeErrorif pool closed or not initialized
async initialize()
- Creates minimum connections and starts maintenance loop
- Safe to call multiple times (idempotent)
- Automatically called by
async with pool
async close()
- Gracefully closes all connections
- Cancels maintenance loop
- Notifies waiting coroutines
- Safe to call multiple times
- Automatically called by
async with poolexit
async get_stats() -> dict
- Returns pool statistics:
{ "total_connections_created": int, "total_connections_closed": int, "total_acquisitions": int, "total_releases": int, "acquisition_timeouts": int, "connection_errors": int, "health_check_failures": int, "peak_connections": int, "peak_waiters": int, "current_pool_size": int, "current_available": int, "current_in_use": int, "current_waiters": int, }
async execute_query(query: str, params: dict = None) -> Any
- Convenience method: acquires connection, executes query, releases
- Equivalent to:
async with pool.acquire() as conn: return await conn.query(query, params)
Best Practices
✅ DO
# Use async context managers for guaranteed cleanup
async with pool:
async with pool.acquire() as conn:
await conn.query("SELECT * FROM users")
# Set appropriate pool sizes for your workload
pool = SurrealDBConnectionPool(
min_connections=10, # Based on baseline load
max_connections=50, # Based on peak load
)
# Handle timeouts gracefully
try:
async with pool.acquire() as conn:
result = await conn.query("SLOW QUERY")
except asyncio.TimeoutError:
logger.warning("Pool exhausted, consider scaling")
❌ DON'T
# Don't share connections between coroutines
conn = await pool.acquire() # Missing async with!
await asyncio.gather(
conn.query("SELECT 1"), # ❌ ConcurrencyError!
conn.query("SELECT 2"),
)
# Don't forget to close the pool
pool = SurrealDBConnectionPool(...)
await pool.initialize()
# ... use pool ...
# ❌ Missing: await pool.close()
# Don't set pool sizes too small
pool = SurrealDBConnectionPool(
min_connections=1, # ❌ Too small for production
max_connections=2, # ❌ Will bottleneck quickly
)
Troubleshooting
ConcurrencyError: cannot call recv
Cause: Sharing a connection between coroutines
Solution: Always use async with pool.acquire() - never store or share the connection object
asyncio.TimeoutError during acquisition
Cause: Pool exhausted (all connections in use)
Solutions:
- Increase
max_connections - Increase
acquisition_timeout - Reduce query execution time
- Check for connection leaks (not releasing connections)
Connections failing health checks
Cause: Network issues, database restart, or connection timeout
Solution: Purreal auto-replaces unhealthy connections. Check logs for patterns:
# Enable debug logging to diagnose
logging.getLogger('purreal').setLevel(logging.DEBUG)
Memory usage growing
Cause: Connections not being released
Solution: Verify all pool.acquire() uses are in async with blocks
Performance Tips
Pool Sizing
# Calculate based on your workload
concurrent_requests = 100 # Peak concurrent users
avg_query_time = 0.1 # Average query time in seconds
connections_needed = concurrent_requests * avg_query_time
pool = SurrealDBConnectionPool(
min_connections=int(connections_needed * 0.5), # 50% for baseline
max_connections=int(connections_needed * 2), # 200% for peaks
)
Connection Lifecycle
# Tune based on your database workload
pool = SurrealDBConnectionPool(
max_usage_count=10000, # Higher for read-heavy workloads
max_idle_time=600, # Longer for stable connections
health_check_interval=60, # More frequent for critical apps
)
Monitoring
# Periodically check pool health
async def monitor_pool(pool):
while True:
stats = await pool.get_stats()
if stats['acquisition_timeouts'] > 10:
logger.warning(f"High timeout rate: {stats}")
if stats['in_use_connections'] / stats['current_connections'] > 0.8:
logger.warning("Pool utilization above 80%")
await asyncio.sleep(60)
asyncio.create_task(monitor_pool(pool))
Known Limitations
Burst Load > max_connections
Issue: When burst traffic exceeds max_connections, waiting tasks may timeout instead of queuing properly.
Example:
- Pool:
max_connections=50 - Burst: 100 concurrent requests
- Result: 50 succeed, 50 timeout (instead of queuing)
Workaround:
# Size pool for peak burst load
pool = SurrealDBConnectionPool(
max_connections=150, # 1.5x expected peak
...
)
# Or use semaphore to limit concurrency
semaphore = asyncio.Semaphore(50)
async def limited_query():
async with semaphore:
async with pool.acquire() as conn:
await conn.query(...)
Status: Known issue in v0.1.0. See KNOWN_ISSUES.md for details.
Impact: Sustained load and gradual ramp-up work fine. Only affects sudden bursts >> pool size.
Contributing
We welcome contributions! Please feel free to submit a Pull Request.
Guidelines
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Write tests for your changes (
pytest tests/) - Ensure all tests pass (
pytest) - Submit a pull request
Development Setup
git clone https://github.com/dyleeeeeeee/purreal.git
cd purreal
pip install -e ".[dev]"
pytest
License
This project is licensed under the GNU General Public License v3 (GPLv3) - see the LICENSE file for details.
Acknowledgments
- Solves the critical
ConcurrencyErrorissue affecting SurrealDB Python applications - Built for production use in high-concurrency async applications
- Thanks to the SurrealDB team for an excellent database
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file purreal-0.1.2.tar.gz.
File metadata
- Download URL: purreal-0.1.2.tar.gz
- Upload date:
- Size: 55.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a25c798c25b7922656b28002322313f6e156c9c5ce0e72f9d00cbb9683e505b9
|
|
| MD5 |
b03ded4f5063c1e4a7d5070ab1db292f
|
|
| BLAKE2b-256 |
48e65b57368b42ae3e7e39e894d6db4d3376148878b9f4c6f4790a8ae329cab0
|
File details
Details for the file purreal-0.1.2-py3-none-any.whl.
File metadata
- Download URL: purreal-0.1.2-py3-none-any.whl
- Upload date:
- Size: 32.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4a4c14afa6562e25cc5e27d2458eb6a8ef238613c131d1555bd465ce69caf04f
|
|
| MD5 |
7e235d0427e56b35e845a60d07070fbb
|
|
| BLAKE2b-256 |
ce10f19cd5b5c6ef39cc6205882370f94bf22210caae1cd810678b89f5c27e4d
|