Skip to main content

A robust asynchronous transaction management library for Python functions.

Project description

Python Async Transaction Manager

A robust asynchronous transaction management library for Python functions, designed to ensure data consistency by automatically rolling back preceding operations if any step in a defined transaction fails. This library is ideal for orchestrating complex workflows that involve multiple discrete, reversible steps.

Features

  • Automatic Rollback on Failure: If any execute_func within a transaction fails, all previously completed execute_funcs are automatically rolled back using their associated rollback_funcs.
  • Per-Step Timeout: Each transaction step can have an optional timeout, preventing long-running or unresponsive operations from holding up the entire transaction.
  • Configurable Retry Mechanism: Individual execute_func and rollback_func calls can be configured with a maximum number of retries and an exponential backoff interval, enhancing resilience against transient failures.
  • Transaction Hooks: Users can register custom hooks to execute logic at various stages of the transaction lifecycle (e.g., on_step_success, on_transaction_complete), enabling flexible integration for monitoring, logging, or custom event handling.
  • Customizable Logging: The library uses Python's standard logging module, allowing users to configure log levels and handlers to suit their application's logging infrastructure.

Installation

pip install transaction-manager

Usage

Basic Transaction

Here's how to define and run a simple transaction:

import asyncio
from src.transaction_manager import TransactionManager

class UserService:
    async def create_user(self, username, email):
        print(f"Creating user: {username} ({email})")
        # Simulate some work
        await asyncio.sleep(0.1)
        return {"user_id": "user_abc", "username": username, "email": email}

    async def delete_user(self, user_id):
        print(f"Deleting user: {user_id}")
        await asyncio.sleep(0.05)

class VMService:
    async def create_vm(self, vm_name, user_id):
        print(f"Creating VM: {vm_name} for user {user_id}")
        await asyncio.sleep(0.2)
        return {"vm_id": "vm_xyz", "vm_name": vm_name}

    async def delete_vm(self, vm_id):
        print(f"Deleting VM: {vm_id}")
        await asyncio.sleep(0.1)

user_service = UserService()
vm_service = VMService()

async def main():
    async with TransactionManager("create_user_and_vm_transaction", "Provision User and VM") as tx:
        # Step 1: Create user
        user_data = await tx.add_step(
            "create_user_step",
            user_service.create_user,
            rollback_func=user_service.delete_user,
            username="jane.doe",
            email="jane@example.com"
        )
        print(f"User created: {user_data}")

        # Step 2: Create VM, using data from previous step
        vm_data = await tx.add_step(
            "create_vm_step",
            vm_service.create_vm,
            rollback_func=vm_service.delete_vm,
            vm_name="jane-vm",
            user_id=user_data["user_id"]
        )
        print(f"VM created: {vm_data}")

    print("Transaction completed successfully!")
    summary = tx.get_transaction_summary()
    print("Transaction Summary:", summary)

if __name__ == "__main__":
    asyncio.run(main())

Handling Failures and Rollbacks

If a step fails, previous completed steps will be rolled back.

import asyncio
from src.transaction_manager import TransactionManager, TransactionHook
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class MyUserService:
    async def create_user(self, username):
        print(f"Creating user {username}")
        return {"user_id": f"user_{username}"}

    async def delete_user(self, user_id):
        print(f"Deleting user {user_id}")

class MyPaymentService:
    async def process_payment(self, user_id, amount):
        print(f"Processing payment for {user_id} of {amount}")
        if amount > 100:
            raise ValueError("Payment amount too high!")
        return {"payment_id": "pay_123"}

    async def refund_payment(self, payment_id):
        print(f"Refunding payment {payment_id}")

async def main_failure():
    user_service = MyUserService()
    payment_service = MyPaymentService()

    try:
        async with TransactionManager("payment_transaction", "Process Payment") as tx:
            user_data = await tx.add_step(
                "create_user",
                user_service.create_user,
                rollback_func=user_service.delete_user,
                username="test_user"
            )
            print(f"User created: {user_data['user_id']}")

            payment_data = await tx.add_step(
                "process_payment",
                payment_service.process_payment,
                rollback_func=payment_service.refund_payment,
                user_id=user_data['user_id'],
                amount=200
            )
            print(f"Payment processed: {payment_data['payment_id']}")

    except ValueError as e:
        print(f"Transaction failed with error: {e}")
    finally:
        summary = tx.get_transaction_summary()
        print("Transaction Summary:", summary)

if __name__ == "__main__":
    asyncio.run(main_failure())

Using rollback_args and override_args

Sometimes, the arguments needed for a rollback function are different from the original execution function, or need to be derived from the execution result.

import asyncio
from src.transaction_manager import TransactionManager
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class NetworkService:
    async def assign_ip(self, vm_id, ip_address):
        print(f"Assigning IP {ip_address} to VM {vm_id}")
        # In a real scenario, this might return the assigned IP
        return {"assigned_ip": ip_address, "rollback_data": {"ip_to_release": ip_address}}

    async def release_ip(self, ip_to_release):
        print(f"Releasing IP {ip_to_release}")

async def main_rollback_args():
    network_service = NetworkService()

    async with TransactionManager("ip_assignment_transaction") as tx:
        # Assume vm_id is 'vm_abc'
        assigned_ip_data = await tx.add_step(
            "assign_ip",
            network_service.assign_ip,
            rollback_func=network_service.release_ip,
            args=("vm_abc", "192.168.1.100"), # args for assign_ip
        )
        print(f"Assigned IP: {assigned_ip_data['assigned_ip']}")

        # Simulate a failure to trigger rollback
        tx.steps[-1].status = TransactionStepStatus.FAILED
        tx.steps[-1].error = ValueError("Simulated IP assignment failure")

    print("Transaction with rollback args completed.")
    print("Final Status:", tx.get_transaction_summary())

if __name__ == "__main__":
    asyncio.run(main_rollback_args())

Implementing Custom Transaction Hooks

You can create custom hooks to react to different transaction events.

import asyncio
from src.transaction_manager import TransactionManager, TransactionHook
import logging
from typing import Dict, Any

class MyCustomHook(TransactionHook):
    async def on_transaction_start(self, transaction_id: str, operation_name: str):
        logging.info(f"HOOK: Transaction '{operation_name}' ({transaction_id}) starting...")

    async def on_step_success(self, step):
        logging.info(f"HOOK: Step '{step.name}' completed successfully. Result: {step.result}")

    async def on_step_failure(self, step):
        logging.error(f"HOOK: Step '{step.name}' failed with error: {step.error}")

    async def on_rollback_start(self, transaction_id: str):
        logging.warning(f"HOOK: Rollback initiated for transaction {transaction_id}")

    async def on_step_rollback(self, step):
        logging.info(f"HOOK: Step '{step.name}' successfully rolled back.")

    async def on_step_rollback_failure(self, step):
        logging.error(f"HOOK: Step '{step.name}' rollback failed with error: {step.error}")

    async def on_transaction_complete(self, transaction_id: str, success: bool, summary: Dict[str, Any]):
        status = "succeeded" if success else "failed"
        logging.info(f"HOOK: Transaction {transaction_id} {status}. Summary: {summary}")

async def main_hooks():
    # Configure logging to see hook messages
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

    tx_manager = TransactionManager("hook_demo_transaction", "Demo with Hooks")
    tx_manager.register_hook(MyCustomHook())

    # Mock functions
    mock_execute_success = AsyncMock(return_value="data")
    mock_execute_fail = AsyncMock(side_effect=ValueError("Simulated failure"))
    mock_rollback = AsyncMock()

    try:
        async with tx_manager as tx:
            await tx.add_step("StepA", mock_execute_success, mock_rollback)
            await tx.add_step("StepB", mock_execute_fail, mock_rollback)
    except Exception as e:
        logging.error(f"Caught main exception: {e}")

if __name__ == "__main__":
    asyncio.run(main_hooks())

Configuring Logging

The library's internal logger can be configured directly.

import asyncio
import logging
from src.transaction_manager import TransactionManager

async def main_logging_config():
    # Option 1: Configure using basicConfig (affects root logger and potentially others)
    # logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # Option 2: Configure the library's logger specifically
    # By default, TransactionManager uses a NullHandler.
    # To see logs, you need to add a handler.
    
    # Example: Set level to DEBUG and add a StreamHandler
    TransactionManager.configure_logging(level=logging.DEBUG)

    async with TransactionManager("logging_demo") as tx:
        await tx.add_step("LoggedStep", AsyncMock(return_value=True))
        # Simulate a failure to see error logs
        await tx.add_step("FailingLoggedStep", AsyncMock(side_effect=ValueError("Log test failure")))

    print("Logging configuration demo complete.")

if __name__ == "__main__":
    asyncio.run(main_logging_config())

TransactionStep Dataclass

@dataclass
class TransactionStep:
    name: str
    execute_func: Callable[..., Awaitable[Any]]
    rollback_func: Optional[Callable[..., Awaitable[Any]]] = None
    execute_args: Any = field(default_factory=tuple) # Positional arguments for execute_func
    execute_kwargs: Any = field(default_factory=dict) # Keyword arguments for execute_func
    status: TransactionStepStatus = TransactionStepStatus.PENDING
    result: Any = None
    error: Optional[Exception] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    rollback_data: Optional[Dict[str, Any]] = None # Data passed to rollback_func as kwargs
    rollback_args: Optional[Tuple] = None # Positional arguments specifically for rollback_func
    override_args: Optional[bool] = False # If True, rollback_args override execute_args for rollback
    operation_name: Optional[str] = None
    timeout: Optional[int] = None # Timeout for this specific step
    max_retries: int = 0 # Max retries for this step
    retry_interval: float = 0.5 # Base interval for exponential backoff retries

TransactionHook Interface

Implement this abstract class to create custom hooks:

class TransactionHook(ABC):
    async def on_transaction_start(self, transaction_id: str, operation_name: str): pass
    async def on_step_execute(self, step: "TransactionStep"): pass
    async def on_step_success(self, step: "TransactionStep"): pass
    async def on_step_failure(self, step: "TransactionStep"): pass
    async def on_rollback_start(self, transaction_id: str): pass
    async def on_step_rollback(self, step: "TransactionStep"): pass
    async def on_step_rollback_failure(self, step: "TransactionStep"): pass
    async def on_transaction_complete(self, transaction_id: str, success: bool, summary: Dict[str, Any]): pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_transaction_manager-0.1.2.tar.gz (11.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_transaction_manager-0.1.2-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file async_transaction_manager-0.1.2.tar.gz.

File metadata

File hashes

Hashes for async_transaction_manager-0.1.2.tar.gz
Algorithm Hash digest
SHA256 98d554eac43baff82f1bda6d2b5f52700c35dfc3ffd91a5ca58cb15b1ea0577c
MD5 6cd964b6a8846c7dc1e08dbde07c405c
BLAKE2b-256 3b993f4f5d9e015739eaa0c73458fb54086fddf864ee31180f924b286cb69f30

See more details on using hashes here.

File details

Details for the file async_transaction_manager-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for async_transaction_manager-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8f68ef8f8a51cd8c786bdfa11e4486e81eda8df19a24d51751a773ce3addc105
MD5 0aedc271b1beb3a966d763c1075b226d
BLAKE2b-256 74b47076769017f56ef84071e724410c2230ed9ade98d14ef0b2c15fef475cc1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page