A robust asynchronous transaction management library for Python functions.
Project description
Python Async Transaction Manager
A robust asynchronous transaction management library for Python functions, designed to ensure data consistency by automatically rolling back preceding operations if any step in a defined transaction fails. This library is ideal for orchestrating complex workflows that involve multiple discrete, reversible steps.
Features
- Automatic Rollback on Failure: If any
execute_funcwithin a transaction fails, all previously completedexecute_funcs are automatically rolled back using their associatedrollback_funcs. - Per-Step Timeout: Each transaction step can have an optional timeout, preventing long-running or unresponsive operations from holding up the entire transaction.
- Configurable Retry Mechanism: Individual
execute_funcandrollback_funccalls can be configured with a maximum number of retries and an exponential backoff interval, enhancing resilience against transient failures. - Transaction Hooks: Users can register custom hooks to execute logic at various stages of the transaction lifecycle (e.g.,
on_step_success,on_transaction_complete), enabling flexible integration for monitoring, logging, or custom event handling. - Customizable Logging: The library uses Python's standard
loggingmodule, allowing users to configure log levels and handlers to suit their application's logging infrastructure.
Installation
pip install transaction-manager
Usage
Basic Transaction
Here's how to define and run a simple transaction:
import asyncio
from src.transaction_manager import TransactionManager
class UserService:
async def create_user(self, username, email):
print(f"Creating user: {username} ({email})")
# Simulate some work
await asyncio.sleep(0.1)
return {"user_id": "user_abc", "username": username, "email": email}
async def delete_user(self, user_id):
print(f"Deleting user: {user_id}")
await asyncio.sleep(0.05)
class VMService:
async def create_vm(self, vm_name, user_id):
print(f"Creating VM: {vm_name} for user {user_id}")
await asyncio.sleep(0.2)
return {"vm_id": "vm_xyz", "vm_name": vm_name}
async def delete_vm(self, vm_id):
print(f"Deleting VM: {vm_id}")
await asyncio.sleep(0.1)
user_service = UserService()
vm_service = VMService()
async def main():
async with TransactionManager("create_user_and_vm_transaction", "Provision User and VM") as tx:
# Step 1: Create user
user_data = await tx.add_step(
"create_user_step",
user_service.create_user,
rollback_func=user_service.delete_user,
username="jane.doe",
email="jane@example.com"
)
print(f"User created: {user_data}")
# Step 2: Create VM, using data from previous step
vm_data = await tx.add_step(
"create_vm_step",
vm_service.create_vm,
rollback_func=vm_service.delete_vm,
vm_name="jane-vm",
user_id=user_data["user_id"]
)
print(f"VM created: {vm_data}")
print("Transaction completed successfully!")
summary = tx.get_transaction_summary()
print("Transaction Summary:", summary)
if __name__ == "__main__":
asyncio.run(main())
Handling Failures and Rollbacks
If a step fails, previous completed steps will be rolled back.
import asyncio
from src.transaction_manager import TransactionManager, TransactionHook
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class MyUserService:
async def create_user(self, username):
print(f"Creating user {username}")
return {"user_id": f"user_{username}"}
async def delete_user(self, user_id):
print(f"Deleting user {user_id}")
class MyPaymentService:
async def process_payment(self, user_id, amount):
print(f"Processing payment for {user_id} of {amount}")
if amount > 100:
raise ValueError("Payment amount too high!")
return {"payment_id": "pay_123"}
async def refund_payment(self, payment_id):
print(f"Refunding payment {payment_id}")
async def main_failure():
user_service = MyUserService()
payment_service = MyPaymentService()
try:
async with TransactionManager("payment_transaction", "Process Payment") as tx:
user_data = await tx.add_step(
"create_user",
user_service.create_user,
rollback_func=user_service.delete_user,
username="test_user"
)
print(f"User created: {user_data['user_id']}")
payment_data = await tx.add_step(
"process_payment",
payment_service.process_payment,
rollback_func=payment_service.refund_payment,
user_id=user_data['user_id'],
amount=200
)
print(f"Payment processed: {payment_data['payment_id']}")
except ValueError as e:
print(f"Transaction failed with error: {e}")
finally:
summary = tx.get_transaction_summary()
print("Transaction Summary:", summary)
if __name__ == "__main__":
asyncio.run(main_failure())
Using rollback_args and override_args
Sometimes, the arguments needed for a rollback function are different from the original execution function, or need to be derived from the execution result.
import asyncio
from src.transaction_manager import TransactionManager
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class NetworkService:
async def assign_ip(self, vm_id, ip_address):
print(f"Assigning IP {ip_address} to VM {vm_id}")
# In a real scenario, this might return the assigned IP
return {"assigned_ip": ip_address, "rollback_data": {"ip_to_release": ip_address}}
async def release_ip(self, ip_to_release):
print(f"Releasing IP {ip_to_release}")
async def main_rollback_args():
network_service = NetworkService()
async with TransactionManager("ip_assignment_transaction") as tx:
# Assume vm_id is 'vm_abc'
assigned_ip_data = await tx.add_step(
"assign_ip",
network_service.assign_ip,
rollback_func=network_service.release_ip,
args=("vm_abc", "192.168.1.100"), # args for assign_ip
)
print(f"Assigned IP: {assigned_ip_data['assigned_ip']}")
# Simulate a failure to trigger rollback
tx.steps[-1].status = TransactionStepStatus.FAILED
tx.steps[-1].error = ValueError("Simulated IP assignment failure")
print("Transaction with rollback args completed.")
print("Final Status:", tx.get_transaction_summary())
if __name__ == "__main__":
asyncio.run(main_rollback_args())
Implementing Custom Transaction Hooks
You can create custom hooks to react to different transaction events.
import asyncio
from src.transaction_manager import TransactionManager, TransactionHook
import logging
from typing import Dict, Any
class MyCustomHook(TransactionHook):
async def on_transaction_start(self, transaction_id: str, operation_name: str):
logging.info(f"HOOK: Transaction '{operation_name}' ({transaction_id}) starting...")
async def on_step_success(self, step):
logging.info(f"HOOK: Step '{step.name}' completed successfully. Result: {step.result}")
async def on_step_failure(self, step):
logging.error(f"HOOK: Step '{step.name}' failed with error: {step.error}")
async def on_rollback_start(self, transaction_id: str):
logging.warning(f"HOOK: Rollback initiated for transaction {transaction_id}")
async def on_step_rollback(self, step):
logging.info(f"HOOK: Step '{step.name}' successfully rolled back.")
async def on_step_rollback_failure(self, step):
logging.error(f"HOOK: Step '{step.name}' rollback failed with error: {step.error}")
async def on_transaction_complete(self, transaction_id: str, success: bool, summary: Dict[str, Any]):
status = "succeeded" if success else "failed"
logging.info(f"HOOK: Transaction {transaction_id} {status}. Summary: {summary}")
async def main_hooks():
# Configure logging to see hook messages
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
tx_manager = TransactionManager("hook_demo_transaction", "Demo with Hooks")
tx_manager.register_hook(MyCustomHook())
# Mock functions
mock_execute_success = AsyncMock(return_value="data")
mock_execute_fail = AsyncMock(side_effect=ValueError("Simulated failure"))
mock_rollback = AsyncMock()
try:
async with tx_manager as tx:
await tx.add_step("StepA", mock_execute_success, mock_rollback)
await tx.add_step("StepB", mock_execute_fail, mock_rollback)
except Exception as e:
logging.error(f"Caught main exception: {e}")
if __name__ == "__main__":
asyncio.run(main_hooks())
Configuring Logging
The library's internal logger can be configured directly.
import asyncio
import logging
from src.transaction_manager import TransactionManager
async def main_logging_config():
# Option 1: Configure using basicConfig (affects root logger and potentially others)
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Option 2: Configure the library's logger specifically
# By default, TransactionManager uses a NullHandler.
# To see logs, you need to add a handler.
# Example: Set level to DEBUG and add a StreamHandler
TransactionManager.configure_logging(level=logging.DEBUG)
async with TransactionManager("logging_demo") as tx:
await tx.add_step("LoggedStep", AsyncMock(return_value=True))
# Simulate a failure to see error logs
await tx.add_step("FailingLoggedStep", AsyncMock(side_effect=ValueError("Log test failure")))
print("Logging configuration demo complete.")
if __name__ == "__main__":
asyncio.run(main_logging_config())
TransactionStep Dataclass
@dataclass
class TransactionStep:
name: str
execute_func: Callable[..., Awaitable[Any]]
rollback_func: Optional[Callable[..., Awaitable[Any]]] = None
execute_args: Any = field(default_factory=tuple) # Positional arguments for execute_func
execute_kwargs: Any = field(default_factory=dict) # Keyword arguments for execute_func
status: TransactionStepStatus = TransactionStepStatus.PENDING
result: Any = None
error: Optional[Exception] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
rollback_data: Optional[Dict[str, Any]] = None # Data passed to rollback_func as kwargs
rollback_args: Optional[Tuple] = None # Positional arguments specifically for rollback_func
override_args: Optional[bool] = False # If True, rollback_args override execute_args for rollback
operation_name: Optional[str] = None
timeout: Optional[int] = None # Timeout for this specific step
max_retries: int = 0 # Max retries for this step
retry_interval: float = 0.5 # Base interval for exponential backoff retries
TransactionHook Interface
Implement this abstract class to create custom hooks:
class TransactionHook(ABC):
async def on_transaction_start(self, transaction_id: str, operation_name: str): pass
async def on_step_execute(self, step: "TransactionStep"): pass
async def on_step_success(self, step: "TransactionStep"): pass
async def on_step_failure(self, step: "TransactionStep"): pass
async def on_rollback_start(self, transaction_id: str): pass
async def on_step_rollback(self, step: "TransactionStep"): pass
async def on_step_rollback_failure(self, step: "TransactionStep"): pass
async def on_transaction_complete(self, transaction_id: str, success: bool, summary: Dict[str, Any]): pass
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_transaction_manager-0.1.2.tar.gz.
File metadata
- Download URL: async_transaction_manager-0.1.2.tar.gz
- Upload date:
- Size: 11.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98d554eac43baff82f1bda6d2b5f52700c35dfc3ffd91a5ca58cb15b1ea0577c
|
|
| MD5 |
6cd964b6a8846c7dc1e08dbde07c405c
|
|
| BLAKE2b-256 |
3b993f4f5d9e015739eaa0c73458fb54086fddf864ee31180f924b286cb69f30
|
File details
Details for the file async_transaction_manager-0.1.2-py3-none-any.whl.
File metadata
- Download URL: async_transaction_manager-0.1.2-py3-none-any.whl
- Upload date:
- Size: 9.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8f68ef8f8a51cd8c786bdfa11e4486e81eda8df19a24d51751a773ce3addc105
|
|
| MD5 |
0aedc271b1beb3a966d763c1075b226d
|
|
| BLAKE2b-256 |
74b47076769017f56ef84071e724410c2230ed9ade98d14ef0b2c15fef475cc1
|