Safe concurrent MySQL writes and Redis Streams queue operations
Project description
COTI SafeSync Framework
Safe concurrent MySQL writes and Redis Streams queue operations
COTI SafeSync Framework is a Python library for building robust, concurrent backend systems. It provides explicit concurrency control primitives for MySQL and safe message consumption from Redis Streams, designed for multi-process and multi-host environments.
Features
- 🔒 Explicit concurrency control - Pessimistic locking, optimistic concurrency control (OCC), and atomic SQL operations
- 🗄️ Transactional MySQL operations - Safe, explicit transaction management with
DbSession - 📨 Redis Streams queue consumption - At-least-once delivery with explicit acknowledgment
- 🚀 Multi-process safe - Designed for distributed systems with multiple workers
- 📊 Prometheus metrics - Built-in observability for operations and locks
- 🎯 Framework-agnostic - Works with FastAPI, CLI workers, schedulers, etc.
Installation
pip install coti-safesync-framework
Requirements
- Python >= 3.11
- MySQL 8.0+ with InnoDB storage engine
- Redis 5.0+ (for queue operations)
Quick Start
Database Operations
from sqlalchemy import create_engine
from coti_safesync_framework.db.session import DbSession
# Create engine (typically done once at application startup)
engine = create_engine("mysql+pymysql://user:password@host/database")
# Use DbSession for transactional operations
with DbSession(engine) as session:
# Execute SQL
session.execute(
"UPDATE accounts SET balance = balance + :amount WHERE id = :id",
{"id": 123, "amount": 100}
)
# Transaction commits automatically on success
Queue Consumption
from redis import Redis
from coti_safesync_framework.config import QueueConfig
from coti_safesync_framework.queue.consumer import QueueConsumer
from coti_safesync_framework.db.session import DbSession
from coti_safesync_framework.queue.models import QueueMessage
# Setup
redis_client = Redis(host="localhost", port=6379)
config = QueueConfig(
stream_key="orders",
consumer_group="workers",
consumer_name="worker_1"
)
consumer = QueueConsumer(redis_client, config)
# Process messages
def handle_message(msg: QueueMessage, session: DbSession) -> None:
order_id = msg.payload["order_id"]
session.execute(
"UPDATE orders SET status = 'processed' WHERE id = :id",
{"id": order_id}
)
consumer.run(handler=handle_message, engine=engine)
Database Examples
1. Atomic SQL Updates
For simple operations that can be expressed as a single SQL statement:
from coti_safesync_framework.db.session import DbSession
def increment_counter(engine, counter_id: int) -> None:
"""Increment a counter atomically - no locks needed."""
with DbSession(engine) as session:
session.execute(
"UPDATE counters SET value = value + 1 WHERE id = :id",
{"id": counter_id}
)
# MySQL guarantees atomicity for single statements
2. Pessimistic Row Locking
When you need strict serialization for read-modify-write operations:
from coti_safesync_framework.db.session import DbSession
from coti_safesync_framework.db.locking.row_lock import RowLock
def process_order(engine, order_id: int) -> None:
"""Process an order - only one worker can process a specific order."""
with DbSession(engine) as session:
# Acquire exclusive lock on the order row
order = RowLock(session, "orders", {"id": order_id}).acquire()
if order is None:
return # Order doesn't exist
if order["status"] == "processed":
return # Already processed
# Safe to modify - we hold the lock
session.execute(
"UPDATE orders SET status = :status WHERE id = :id",
{"id": order_id, "status": "processed"}
)
# Lock released when transaction commits
3. Optimistic Concurrency Control (OCC)
For high-throughput scenarios where conflicts are rare:
from coti_safesync_framework.db.session import DbSession
from coti_safesync_framework.db.helpers import occ_update
import time
import random
def update_account_balance(engine, account_id: int, amount_change: int) -> None:
"""Update account balance using OCC with retry."""
MAX_RETRIES = 10
for attempt in range(MAX_RETRIES):
with DbSession(engine) as session:
# Read current balance and version
account = session.fetch_one(
"SELECT balance, version FROM accounts WHERE id = :id",
{"id": account_id}
)
if account is None:
raise ValueError(f"Account {account_id} not found")
# Attempt OCC update
rowcount = occ_update(
session=session,
table="accounts",
id_column="id",
id_value=account_id,
version_column="version",
version_value=account["version"],
updates={"balance": account["balance"] + amount_change}
)
if rowcount == 1:
return # Success!
# Version mismatch - retry with new transaction
time.sleep(random.uniform(0.001, 0.01))
raise RuntimeError(f"Failed to update account after {MAX_RETRIES} retries")
Important: Each OCC attempt must use a new transaction. Never retry inside a single DbSession.
4. Advisory Locks
For application-level synchronization across multiple tables:
from coti_safesync_framework.db.session import DbSession
from coti_safesync_framework.db.locking.advisory_lock import AdvisoryLock
from coti_safesync_framework.errors import LockTimeoutError
def process_user_data(engine, user_id: int) -> None:
"""Process all data for a user - only one worker at a time."""
lock_key = f"user_processing:{user_id}"
try:
with DbSession(engine) as session:
with AdvisoryLock(session, lock_key, timeout=10):
# Lock acquired - we're the only worker processing this user
# Read user's orders
orders = session.fetch_all(
"SELECT id, total FROM orders WHERE user_id = :user_id",
{"user_id": user_id}
)
# Update user's summary
total_spent = sum(order["total"] for order in orders)
session.execute(
"UPDATE users SET total_spent = :total WHERE id = :id",
{"id": user_id, "total": total_spent}
)
# Lock released when connection closes (after commit)
except LockTimeoutError:
# Another worker is processing this user
print(f"Could not acquire lock for user {user_id}")
5. Idempotent INSERTs
For safe duplicate inserts using database constraints:
from coti_safesync_framework.db.session import DbSession
from coti_safesync_framework.db.helpers import insert_idempotent
def create_user_profile(engine, user_id: int, initial_data: dict) -> None:
"""Create user profile - safe to call multiple times."""
with DbSession(engine) as session:
inserted = insert_idempotent(
session,
"""
INSERT INTO user_profiles (user_id, display_name, created_at)
VALUES (:user_id, :display_name, NOW())
""",
{
"user_id": user_id,
"display_name": initial_data.get("display_name", "User")
}
)
if inserted:
print("Profile created")
else:
print("Profile already exists")
Queue Examples
1. Basic Message Consumption
from redis import Redis
from coti_safesync_framework.config import QueueConfig
from coti_safesync_framework.queue.consumer import QueueConsumer
redis_client = Redis(host="localhost", port=6379)
config = QueueConfig(
stream_key="orders",
consumer_group="workers",
consumer_name="worker_1",
block_ms=5_000, # Block 5 seconds when no messages
)
consumer = QueueConsumer(redis_client, config)
# Iterator-based consumption
for msg in consumer.iter_messages():
try:
process_message(msg.payload)
consumer.ack(msg) # Acknowledge after successful processing
except Exception as e:
# Don't ack on failure - message remains pending
print(f"Failed to process: {e}")
2. Template-Method Pattern (Recommended)
The run() method handles the complete flow: fetch → process → commit → ack:
from sqlalchemy import create_engine
from coti_safesync_framework.queue.models import QueueMessage
from coti_safesync_framework.db.session import DbSession
engine = create_engine("mysql+pymysql://user:password@host/database")
def handle_message(msg: QueueMessage, session: DbSession) -> None:
"""Process message within a database transaction."""
order_id = msg.payload["order_id"]
# Read current state
order = session.fetch_one(
"SELECT id, status FROM orders WHERE id = :id",
{"id": order_id}
)
if not order:
raise ValueError(f"Order {order_id} not found")
# Update order
session.execute(
"UPDATE orders SET status = :status WHERE id = :id",
{"id": order_id, "status": "processed"}
)
# Transaction commits automatically on exit
# Message is ACKed after commit
# Run the consumer
consumer.run(handler=handle_message, engine=engine)
Error handling: If handle_message raises an exception:
- Transaction rolls back automatically
- Message is NOT acknowledged
- Message remains pending for retry
3. Stale Message Recovery
Messages may become stale if a worker crashes before acknowledging them. These messages remain pending in Redis and are not automatically redelivered. Use run_claim_stale() in a separate worker process to recover stale messages:
def recovery_worker():
"""Run in a separate process to recover stale messages."""
consumer = QueueConsumer(redis_client, config)
consumer.run_claim_stale(
handler=handle_message, # Same handler as main consumer
engine=engine,
min_idle_ms=60_000, # Claim messages idle > 60 seconds
claim_interval_ms=5_000, # Check every 5 seconds
max_claim_count=10 # Claim up to 10 messages per check
)
How it works: run_claim_stale() periodically checks for stale messages (every claim_interval_ms), claims them, and processes them using the same handler pattern as run(). It loops until stop() is called.
Important: Run the recovery worker in a separate process alongside your main consumer. The recovery worker should use the same handler function for consistency.
4. Manual Message Fetching
For more control over the consumption loop:
while not consumer._stopping.is_set():
msg = consumer.next(block_ms=5_000)
if msg is None:
continue # No message available
try:
with DbSession(engine) as session:
process_message(msg.payload, session)
consumer.ack(msg)
except Exception:
# Transaction rolled back, message not acked
raise
5. Graceful Shutdown
import signal
consumer = QueueConsumer(redis_client, config)
def shutdown_handler(signum, frame):
consumer.stop()
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)
# Consumer will stop after current message completes
consumer.run(handler=handle_message, engine=engine)
Concurrency Strategies
COTI SafeSync Framework provides multiple strategies for safe concurrent operations:
| Strategy | Use When | Performance | Contention |
|---|---|---|---|
| Atomic SQL | Single-statement operations | Highest | Low |
| Idempotent INSERT | One-time initialization | High | Low |
| OCC | Low contention, can retry | High | Low |
| Row Lock | Need strict serialization | Medium | High |
| Advisory Lock | Cross-table synchronization | Medium | Medium |
See LOCKING_STRATEGIES.md for detailed guidance.
Design Principles
- Explicit over implicit - Locks and transactions are always explicit
- Primitives, not workflows - Building blocks you compose
- Control stays with you - You compose logic inside locks/transactions
- No magic retries - Retry logic is your decision
- Framework-agnostic - Works with any Python framework
Important Notes
Database Transactions
- ⚠️ Never retry inside a single
DbSession- Each retry must use a new transaction - ⚠️ Keep transactions short - Long-held locks increase contention
- ⚠️ Index WHERE clauses - Non-indexed predicates can cause performance issues
Queue Semantics
- ⚠️ At-least-once delivery - Messages may be redelivered if ACK fails
- ⚠️ Handlers must be idempotent - Or use DB constraints/locks/OCC
- ⚠️ Stale message recovery - Run a separate recovery worker for stale messages
OCC Usage
- ⚠️ Each attempt uses a new transaction - Never retry inside
DbSession - ⚠️ Must retry on
rowcount == 0- Indicates version mismatch - ⚠️ Must re-read before retrying - Don't reuse stale data
See docs/occ.md for the complete OCC usage guide.
Metrics
COTI SafeSync Framework exposes Prometheus metrics:
coti_safesync_db_write_total- DB write operation countscoti_safesync_db_write_latency_seconds- DB write latenciescoti_safesync_db_lock_acquire_latency_seconds- Lock acquisition timingcoti_safesync_queue_messages_read_total- Queue message readscoti_safesync_queue_messages_ack_total- Message acknowledgmentscoti_safesync_queue_messages_claimed_total- Stale messages claimed
Documentation
- Complete API Reference - Authoritative design document
- Locking Strategies Guide - When to use each strategy
- OCC Usage Guide - Optimistic concurrency control patterns
- Queue Consumer Guide - Redis Streams patterns
Requirements
- Database: MySQL 8.0+ with InnoDB storage engine
- Queue: Redis 5.0+ with Streams support
- Python: >= 3.11
License
MIT
Author
COTI - dev@coti.io
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file coti_safesync_framework-0.0.4.tar.gz.
File metadata
- Download URL: coti_safesync_framework-0.0.4.tar.gz
- Upload date:
- Size: 31.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
665b85117ee970885a12885452aba260cee4e3cf43360cc4157374d5cdb08a23
|
|
| MD5 |
1c415a4e3cb2f685c8de93324247d246
|
|
| BLAKE2b-256 |
8603e589ae5558417269438ab97a48faa5d531cc97a7ee8cda3a157b8aa6c9d6
|
Provenance
The following attestation bundles were made for coti_safesync_framework-0.0.4.tar.gz:
Publisher:
build-publish.yml on coti-io/coti-safesync-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
coti_safesync_framework-0.0.4.tar.gz -
Subject digest:
665b85117ee970885a12885452aba260cee4e3cf43360cc4157374d5cdb08a23 - Sigstore transparency entry: 1280694828
- Sigstore integration time:
-
Permalink:
coti-io/coti-safesync-framework@75b4e3464bba82b075c0cc9048b9596e3bb0d284 -
Branch / Tag:
refs/tags/v0.0.4 - Owner: https://github.com/coti-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build-publish.yml@75b4e3464bba82b075c0cc9048b9596e3bb0d284 -
Trigger Event:
release
-
Statement type:
File details
Details for the file coti_safesync_framework-0.0.4-py3-none-any.whl.
File metadata
- Download URL: coti_safesync_framework-0.0.4-py3-none-any.whl
- Upload date:
- Size: 32.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
067ce0efb9826e393e904a309392b355c104231bdf4147ab01b11cb9112c4dec
|
|
| MD5 |
e12a7925d0b3ba8714cb7f3280a0db83
|
|
| BLAKE2b-256 |
701743f497bc025d209122f86804f20c2634823c8d828d18dd10eb6bb8d48a57
|
Provenance
The following attestation bundles were made for coti_safesync_framework-0.0.4-py3-none-any.whl:
Publisher:
build-publish.yml on coti-io/coti-safesync-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
coti_safesync_framework-0.0.4-py3-none-any.whl -
Subject digest:
067ce0efb9826e393e904a309392b355c104231bdf4147ab01b11cb9112c4dec - Sigstore transparency entry: 1280694831
- Sigstore integration time:
-
Permalink:
coti-io/coti-safesync-framework@75b4e3464bba82b075c0cc9048b9596e3bb0d284 -
Branch / Tag:
refs/tags/v0.0.4 - Owner: https://github.com/coti-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build-publish.yml@75b4e3464bba82b075c0cc9048b9596e3bb0d284 -
Trigger Event:
release
-
Statement type: