Skip to main content

Bulkhead pattern implementation with Trio for structured concurrency and resilient circuit breaking

Project description

Bulkman

Bulkman is a robust implementation of the Bulkhead Pattern for Python, built on Trio for structured concurrency and resilient-circuit for circuit breaking.

The bulkhead pattern isolates resources and prevents cascading failures in distributed systems by limiting concurrent access to critical resources.

Features

  • Structured Concurrency: Built on Trio for proper async/await support
  • Circuit Breaker Integration: Uses resilient-circuit with PostgreSQL support for distributed systems
  • Resource Isolation: Limit concurrent executions to prevent resource exhaustion
  • Automatic Failure Detection: Circuit breaker opens automatically after threshold failures
  • Comprehensive Metrics: Track executions, failures, queue sizes, and circuit states
  • Type Safe: Full type hints for better IDE support
  • Well Tested: 92%+ test coverage

Installation

pip install bulkman

For development with all dependencies:

pip install bulkman[dev]

Quick Start

import trio
from bulkman import Bulkhead, BulkheadConfig

async def main():
    # Create a bulkhead with configuration
    config = BulkheadConfig(
        name="api_calls",
        max_concurrent_calls=5,
        timeout_seconds=10.0,
        circuit_breaker_enabled=True,
    )
    bulkhead = Bulkhead(config)

    # Execute a function through the bulkhead
    result = await bulkhead.execute(my_function, arg1, arg2)

    if result.success:
        print(f"Result: {result.result}")
    else:
        print(f"Error: {result.error}")

trio.run(main)

Basic Usage

Simple Function Execution

import trio
from bulkman import Bulkhead, BulkheadConfig

async def fetch_data(url: str) -> dict:
    # Your async function
    await trio.sleep(0.1)
    return {"data": "example"}

async def main():
    config = BulkheadConfig(name="api", max_concurrent_calls=3)
    bulkhead = Bulkhead(config)

    result = await bulkhead.execute(fetch_data, "https://api.example.com")
    print(result.result)

trio.run(main)

Using Decorators

from bulkman import Bulkhead, BulkheadConfig
from bulkman.core import with_bulkhead

config = BulkheadConfig(name="database", max_concurrent_calls=10)
bulkhead = Bulkhead(config)

@with_bulkhead(bulkhead)
async def query_database(query: str):
    # Your database query
    return await db.execute(query)

# The decorator automatically wraps execution
result = await query_database("SELECT * FROM users")

Managing Multiple Bulkheads

import trio
from bulkman import BulkheadManager, BulkheadConfig

async def main():
    manager = BulkheadManager()

    # Create multiple bulkheads for different resources
    await manager.create_bulkhead(
        BulkheadConfig(name="database", max_concurrent_calls=20)
    )
    await manager.create_bulkhead(
        BulkheadConfig(name="external_api", max_concurrent_calls=5)
    )

    # Execute in specific bulkhead
    result = await manager.execute_in_bulkhead(
        "database",
        lambda: db.query("SELECT * FROM users")
    )

    # Get health status of all bulkheads
    health = await manager.get_health_status()
    print(health)  # {'database': True, 'external_api': True}

trio.run(main)

Configuration

BulkheadConfig Options

from bulkman import BulkheadConfig

config = BulkheadConfig(
    name="my_bulkhead",              # Unique name for the bulkhead
    max_concurrent_calls=10,         # Max concurrent executions
    max_queue_size=100,              # Max queued tasks (currently for reference)
    timeout_seconds=30.0,            # Execution timeout in seconds
    failure_threshold=5,             # Failures before circuit opens
    success_threshold=3,             # Successes to close circuit
    isolation_duration=30.0,         # Seconds circuit stays open
    circuit_breaker_enabled=True,    # Enable/disable circuit breaker
    health_check_interval=5.0,       # Health check interval (for reference)
)

Circuit Breaker Integration

Bulkman integrates with resilient-circuit for sophisticated circuit breaking:

from bulkman import Bulkhead, BulkheadConfig
from resilient_circuit.storage import create_storage

# Use PostgreSQL for distributed circuit breaker state
storage = create_storage(namespace="my_app")

config = BulkheadConfig(
    name="external_service",
    failure_threshold=3,
    isolation_duration=60.0,
)

bulkhead = Bulkhead(config, circuit_storage=storage)

The circuit breaker has three states:

  • CLOSED (Healthy): Normal operation
  • OPEN (Isolated): Blocking requests after failures
  • HALF_OPEN (Degraded): Testing if service recovered

Monitoring and Metrics

Get Statistics

stats = await bulkhead.get_stats()
print(stats)
# {
#     'name': 'api',
#     'state': 'healthy',
#     'total_executions': 150,
#     'successful_executions': 145,
#     'failed_executions': 5,
#     'rejected_executions': 0,
#     'active_tasks': 3,
#     'max_concurrent_calls': 10,
#     'max_queue_size': 100,
#     'circuit_breaker_enabled': True,
#     'circuit_status': 'CLOSED'
# }

Check Health

is_healthy = await bulkhead.is_healthy()
state = await bulkhead.get_state()  # BulkheadState enum

Reset Statistics

await bulkhead.reset_stats()

Advanced Usage

Context Manager

async with bulkhead.context():
    result1 = await bulkhead.execute(func1)
    result2 = await bulkhead.execute(func2)

Handling Sync and Async Functions

Bulkman automatically detects and handles both sync and async functions:

# Async function
async def async_operation():
    await trio.sleep(1)
    return "async result"

# Sync function
def sync_operation():
    import time
    time.sleep(1)
    return "sync result"

# Both work seamlessly
result1 = await bulkhead.execute(async_operation)
result2 = await bulkhead.execute(sync_operation)

Concurrent Execution

import trio
from bulkman import Bulkhead, BulkheadConfig

async def main():
    config = BulkheadConfig(name="workers", max_concurrent_calls=5)
    bulkhead = Bulkhead(config)

    async def worker(task_id: int):
        result = await bulkhead.execute(process_task, task_id)
        return result

    # Run many tasks concurrently, bulkhead limits concurrency
    async with trio.open_nursery() as nursery:
        for i in range(100):
            nursery.start_soon(worker, i)

trio.run(main)

Error Handling

Bulkman provides specific exceptions for different failure scenarios:

from bulkman.exceptions import (
    BulkheadError,
    BulkheadCircuitOpenError,
    BulkheadTimeoutError,
    BulkheadFullError,
)

try:
    result = await bulkhead.execute(my_function)
    if not result.success:
        print(f"Execution failed: {result.error}")
except BulkheadCircuitOpenError:
    print("Circuit breaker is open")
except BulkheadTimeoutError:
    print("Operation timed out")
except BulkheadFullError:
    print("Bulkhead is at capacity")

Testing

Run the test suite:

# Install dev dependencies
pip install -e ".[dev]"

# Run tests with coverage
pytest tests/ --cov=bulkman --cov-report=term-missing

# Run specific test file
pytest tests/test_bulkhead.py -v

Architecture

Bulkman uses:

  • Trio Semaphores for concurrency control
  • Trio Locks for thread-safe statistics
  • resilient-circuit for circuit breaking logic
  • Structured concurrency for clean resource management

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the Apache Software License 2.0 - see the LICENSE file for details.

Credits

  • Built with Trio for structured concurrency
  • Circuit breaker powered by resilient-circuit
  • Inspired by Michael Nygard's "Release It!" and Martin Fowler's circuit breaker pattern

Related Patterns

  • Circuit Breaker: Prevents cascading failures (integrated via resilient-circuit)
  • Rate Limiting: Controls request rate (complementary pattern)
  • Retry: Automatically retries failed operations (can be combined)
  • Timeout: Prevents indefinite waits (built-in via timeout_seconds)

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bulkman-1.0.0.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bulkman-1.0.0-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file bulkman-1.0.0.tar.gz.

File metadata

  • Download URL: bulkman-1.0.0.tar.gz
  • Upload date:
  • Size: 23.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for bulkman-1.0.0.tar.gz
Algorithm Hash digest
SHA256 458a4d1f469a55de0ade60f357c09d276d4589ef7d1820fb8ee5af0a0fbb57e8
MD5 7be48615f2d5f28461811f571e027548
BLAKE2b-256 cc15e4e660f738a71a37fca505c89cb15614a441ea13214c6a1d976a44297ca0

See more details on using hashes here.

File details

Details for the file bulkman-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: bulkman-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for bulkman-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2db9197e89ec65944c420c8d51a533f9a12bcc3c219acafeea444e0e63a28658
MD5 514256d8762635ae093e4135adcd00cf
BLAKE2b-256 36a221156acae405d76a3186713eb8d13a87b8e7d4de3fc8e922afe3a17ea12b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page