Skip to main content

Bulkhead pattern implementation with Trio for structured concurrency and resilient circuit breaking

Project description

Bulkman

Bulkman is a robust implementation of the Bulkhead Pattern for Python, built on Trio for structured concurrency and resilient-circuit for circuit breaking.

The bulkhead pattern isolates resources and prevents cascading failures in distributed systems by limiting concurrent access to critical resources.

Features

  • Structured Concurrency: Built on Trio for proper async/await support
  • Circuit Breaker Integration: Uses resilient-circuit with PostgreSQL support for distributed systems
  • Resource Isolation: Limit concurrent executions to prevent resource exhaustion
  • Automatic Failure Detection: Circuit breaker opens automatically after threshold failures
  • Comprehensive Metrics: Track executions, failures, queue sizes, and circuit states
  • Type Safe: Full type hints for better IDE support
  • Well Tested: 92%+ test coverage

Installation

pip install bulkman

For development with all dependencies:

pip install bulkman[dev]

Quick Start

import trio
from bulkman import Bulkhead, BulkheadConfig

async def main():
    # Create a bulkhead with configuration
    config = BulkheadConfig(
        name="api_calls",
        max_concurrent_calls=5,
        timeout_seconds=10.0,
        circuit_breaker_enabled=True,
    )
    bulkhead = Bulkhead(config)

    # Execute a function through the bulkhead
    result = await bulkhead.execute(my_function, arg1, arg2)

    if result.success:
        print(f"Result: {result.result}")
    else:
        print(f"Error: {result.error}")

trio.run(main)

Basic Usage

Simple Function Execution

import trio
from bulkman import Bulkhead, BulkheadConfig

async def fetch_data(url: str) -> dict:
    # Your async function
    await trio.sleep(0.1)
    return {"data": "example"}

async def main():
    config = BulkheadConfig(name="api", max_concurrent_calls=3)
    bulkhead = Bulkhead(config)

    result = await bulkhead.execute(fetch_data, "https://api.example.com")
    print(result.result)

trio.run(main)

Using Decorators

from bulkman import Bulkhead, BulkheadConfig
from bulkman.core import with_bulkhead

config = BulkheadConfig(name="database", max_concurrent_calls=10)
bulkhead = Bulkhead(config)

@with_bulkhead(bulkhead)
async def query_database(query: str):
    # Your database query
    return await db.execute(query)

# The decorator automatically wraps execution
result = await query_database("SELECT * FROM users")

Managing Multiple Bulkheads

import trio
from bulkman import BulkheadManager, BulkheadConfig

async def main():
    manager = BulkheadManager()

    # Create multiple bulkheads for different resources
    await manager.create_bulkhead(
        BulkheadConfig(name="database", max_concurrent_calls=20)
    )
    await manager.create_bulkhead(
        BulkheadConfig(name="external_api", max_concurrent_calls=5)
    )

    # Execute in specific bulkhead
    result = await manager.execute_in_bulkhead(
        "database",
        lambda: db.query("SELECT * FROM users")
    )

    # Get health status of all bulkheads
    health = await manager.get_health_status()
    print(health)  # {'database': True, 'external_api': True}

trio.run(main)

Configuration

BulkheadConfig Options

from bulkman import BulkheadConfig

config = BulkheadConfig(
    name="my_bulkhead",              # Unique name for the bulkhead
    max_concurrent_calls=10,         # Max concurrent executions
    max_queue_size=100,              # Max queued tasks (currently for reference)
    timeout_seconds=30.0,            # Execution timeout in seconds
    failure_threshold=5,             # Failures before circuit opens
    success_threshold=3,             # Successes to close circuit
    isolation_duration=30.0,         # Seconds circuit stays open
    circuit_breaker_enabled=True,    # Enable/disable circuit breaker
    health_check_interval=5.0,       # Health check interval (for reference)
)

Circuit Breaker Integration

Bulkman integrates with resilient-circuit for sophisticated circuit breaking:

from bulkman import Bulkhead, BulkheadConfig
from resilient_circuit.storage import create_storage

# Use PostgreSQL for distributed circuit breaker state
storage = create_storage(namespace="my_app")

config = BulkheadConfig(
    name="external_service",
    failure_threshold=3,
    isolation_duration=60.0,
)

bulkhead = Bulkhead(config, circuit_storage=storage)

The circuit breaker has three states:

  • CLOSED (Healthy): Normal operation
  • OPEN (Isolated): Blocking requests after failures
  • HALF_OPEN (Degraded): Testing if service recovered

Monitoring and Metrics

Get Statistics

stats = await bulkhead.get_stats()
print(stats)
# {
#     'name': 'api',
#     'state': 'healthy',
#     'total_executions': 150,
#     'successful_executions': 145,
#     'failed_executions': 5,
#     'rejected_executions': 0,
#     'active_tasks': 3,
#     'max_concurrent_calls': 10,
#     'max_queue_size': 100,
#     'circuit_breaker_enabled': True,
#     'circuit_status': 'CLOSED'
# }

Check Health

is_healthy = await bulkhead.is_healthy()
state = await bulkhead.get_state()  # BulkheadState enum

Reset Statistics

await bulkhead.reset_stats()

Advanced Usage

Context Manager

async with bulkhead.context():
    result1 = await bulkhead.execute(func1)
    result2 = await bulkhead.execute(func2)

Handling Sync and Async Functions

Bulkman automatically detects and handles both sync and async functions:

# Async function
async def async_operation():
    await trio.sleep(1)
    return "async result"

# Sync function
def sync_operation():
    import time
    time.sleep(1)
    return "sync result"

# Both work seamlessly
result1 = await bulkhead.execute(async_operation)
result2 = await bulkhead.execute(sync_operation)

Concurrent Execution

import trio
from bulkman import Bulkhead, BulkheadConfig

async def main():
    config = BulkheadConfig(name="workers", max_concurrent_calls=5)
    bulkhead = Bulkhead(config)

    async def worker(task_id: int):
        result = await bulkhead.execute(process_task, task_id)
        return result

    # Run many tasks concurrently, bulkhead limits concurrency
    async with trio.open_nursery() as nursery:
        for i in range(100):
            nursery.start_soon(worker, i)

trio.run(main)

Error Handling

Bulkman provides specific exceptions for different failure scenarios:

from bulkman.exceptions import (
    BulkheadError,
    BulkheadCircuitOpenError,
    BulkheadTimeoutError,
    BulkheadFullError,
)

try:
    result = await bulkhead.execute(my_function)
    if not result.success:
        print(f"Execution failed: {result.error}")
except BulkheadCircuitOpenError:
    print("Circuit breaker is open")
except BulkheadTimeoutError:
    print("Operation timed out")
except BulkheadFullError:
    print("Bulkhead is at capacity")

Testing

Run the test suite:

# Install dev dependencies
pip install -e ".[dev]"

# Run tests with coverage
pytest tests/ --cov=bulkman --cov-report=term-missing

# Run specific test file
pytest tests/test_bulkhead.py -v

Architecture

Bulkman uses:

  • Trio Semaphores for concurrency control
  • Trio Locks for thread-safe statistics
  • resilient-circuit for circuit breaking logic
  • Structured concurrency for clean resource management

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the Apache Software License 2.0 - see the LICENSE file for details.

Credits

  • Built with Trio for structured concurrency
  • Circuit breaker powered by resilient-circuit
  • Inspired by Michael Nygard's "Release It!" and Martin Fowler's circuit breaker pattern

Related Patterns

  • Circuit Breaker: Prevents cascading failures (integrated via resilient-circuit)
  • Rate Limiting: Controls request rate (complementary pattern)
  • Retry: Automatically retries failed operations (can be combined)
  • Timeout: Prevents indefinite waits (built-in via timeout_seconds)

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bulkman-1.0.2.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bulkman-1.0.2-py3-none-any.whl (16.1 kB view details)

Uploaded Python 3

File details

Details for the file bulkman-1.0.2.tar.gz.

File metadata

  • Download URL: bulkman-1.0.2.tar.gz
  • Upload date:
  • Size: 24.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for bulkman-1.0.2.tar.gz
Algorithm Hash digest
SHA256 d8f91b766855c74bc684f5511577e3f81322a91c788ae7b5064228280cb48ef6
MD5 b0410c27b40ceecc417d1559db42ce39
BLAKE2b-256 20a2f4331be829a5cc1d89ecd61a2d5d15566bbe7034e12ae53ebeac9921d6a4

See more details on using hashes here.

File details

Details for the file bulkman-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: bulkman-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 16.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for bulkman-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f0370568d3c8883776925c5279b19979fd4e3ffbd99b42655516b43050af2159
MD5 e95285f2b67ed83f0f1513f649dfdae8
BLAKE2b-256 e5353809a21eafda652133a0fb95e2f99ec6622a71ad75bbaff38c0063d113ba

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page