Skip to main content

AWS boto3 bedrock client in async

Project description

aiobedrock

PyPI version Python versions License

An asynchronous Python client for AWS Bedrock, providing non-blocking access to Amazon's foundation model service.

Features

  • Fully Asynchronous: Non-blocking API calls using aiohttp
  • Low Overhead: Minimal dependencies with efficient implementation
  • Converse API: Unified API for all Bedrock models with structured messages
  • Streaming Support: Stream responses for real-time AI model interactions
  • Guardrail Integration: Support for AWS Bedrock Guardrails
  • AWS SigV4 Auth: Proper AWS authentication for secure API calls
  • Batch Processing: Concurrent batch invocations with invoke_many and converse_many
  • Error Handling: Comprehensive error handling with descriptive exceptions
  • Type Hints: Optional type checking support with mypy-boto3-bedrock-runtime

Installation

pip install aiobedrock

For type checking support (optional):

pip install aiobedrock[types]

Requirements

  • Python 3.9 or later (tested through Python 3.14)
  • AWS credentials configured in your environment
  • boto3 1.38.21 or newer (installed automatically via dependencies)

Quick Start

Converse API (Recommended)

The Converse API provides a unified interface for all Bedrock models:

import json
import asyncio
from aiobedrock import Client

async def main():
    async with Client(region_name="us-west-2") as client:
        messages = [
            {
                "role": "user",
                "content": [{"text": "What is the capital of France?"}],
            }
        ]

        response = await client.converse(
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            messages=messages,
            inferenceConfig={
                "maxTokens": 1024,
                "temperature": 0.7,
            },
        )

        result = json.loads(response.decode("utf-8"))
        print(json.dumps(result, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

Converse Streaming

import asyncio
from aiobedrock import Client

async def main():
    async with Client(region_name="us-west-2") as client:
        messages = [
            {
                "role": "user",
                "content": [{"text": "Tell me a short story about a robot."}],
            }
        ]

        print("Assistant: ", end="", flush=True)

        async for event in client.converse_stream(
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            messages=messages,
            inferenceConfig={
                "maxTokens": 1024,
                "temperature": 0.7,
            },
        ):
            if "contentBlockDelta" in event:
                delta = event["contentBlockDelta"].get("delta", {})
                if "text" in delta:
                    print(delta["text"], end="", flush=True)
            elif "messageStop" in event:
                print(f"\n[Stop reason: {event['messageStop'].get('stopReason')}]")
            elif "metadata" in event:
                usage = event["metadata"].get("usage", {})
                print(f"[Tokens - Input: {usage.get('inputTokens')}, Output: {usage.get('outputTokens')}]")

if __name__ == "__main__":
    asyncio.run(main())

Basic Model Invocation (Legacy)

For direct model invocation with model-specific request formats:

import json
import asyncio
from aiobedrock import Client

async def main():
    async with Client(region_name="YOUR_AWS_REGION") as client:
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "temperature": 0.7,
            "top_p": 0.9,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "What can you do?"},
                    ],
                }
            ],
        }

        response = await client.invoke_model(
            body=json.dumps(body),
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            accept="application/json",
            contentType="application/json",
        )

        print(json.loads(response.decode("utf-8")))

if __name__ == "__main__":
    asyncio.run(main())

Streaming Response (Legacy)

import json
import asyncio
from aiobedrock import Client

async def main():
    async with Client(region_name="YOUR_AWS_REGION") as client:
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "temperature": 0.7,
            "top_p": 0.9,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "What can you do?"},
                    ],
                }
            ],
        }

        async for chunk in client.invoke_model_with_response_stream(
            body=json.dumps(body),
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            accept="application/json",
            contentType="application/json",
        ):
            print(json.loads(chunk.decode("utf-8")))

if __name__ == "__main__":
    asyncio.run(main())

Using Guardrails

import json
import asyncio
from aiobedrock import Client

async def main():
    async with Client(region_name="YOUR_AWS_REGION") as client:
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "temperature": 0.7,
            "top_p": 0.9,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "What can you do?"},
                    ],
                }
            ],
        }

        response = await client.invoke_model(
            body=json.dumps(body),
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            accept="application/json",
            contentType="application/json",
            guardrailIdentifier="arn:aws:bedrock:YOUR_REGION:YOUR_ACCOUNT_ID:guardrail/YOUR_GUARDRAIL_ID",
            guardrailVersion="LATEST",
        )

        print(json.loads(response.decode("utf-8")))

if __name__ == "__main__":
    asyncio.run(main())

API Reference

Client

Client(
    region_name: str,
    assume_role_arn: Optional[str] = None,
    *,
    max_connections: int = 10000,
    request_timeout: Optional[float] = None,
    max_concurrency: Optional[int] = None,
    max_retries: int = 2,
    retry_backoff: float = 0.5,
    max_backoff: float = 6.0,
    retry_statuses: Optional[Sequence[int]] = None,
)

Creates a new Bedrock client instance.

The underlying aiohttp.ClientSession is created lazily when first used. You can interact with the client by using async with or by awaiting individual methods directly; both patterns will create a shared session automatically.

  • region_name: AWS region where Bedrock is available (e.g., "us-east-1", "us-west-2")
  • assume_role_arn: Optional ARN of an IAM role to assume for cross-account access
  • max_connections: Maximum number of connections in the pool (default: 10000)
  • request_timeout: Optional request timeout in seconds
  • max_concurrency: Optional maximum concurrent requests
  • max_retries: Maximum number of retry attempts (default: 2)
  • retry_backoff: Initial backoff delay in seconds (default: 0.5)
  • max_backoff: Maximum backoff delay in seconds (default: 6.0)
  • retry_statuses: HTTP status codes to retry (default: 408, 424, 429, 500, 502, 503, 504)

Methods

converse

async converse(
    modelId: str,
    messages: Sequence[MessageTypeDef],
    *,
    system: Optional[Sequence[SystemContentBlockTypeDef]] = None,
    inferenceConfig: Optional[InferenceConfigurationTypeDef] = None,
    toolConfig: Optional[ToolConfigurationTypeDef] = None,
    guardrailConfig: Optional[GuardrailConfigurationTypeDef] = None,
    additionalModelRequestFields: Optional[Mapping[str, Any]] = None,
    additionalModelResponseFieldPaths: Optional[Sequence[str]] = None,
    promptVariables: Optional[Mapping[str, Any]] = None,
    requestMetadata: Optional[Mapping[str, str]] = None,
    performanceConfig: Optional[PerformanceConfigurationTypeDef] = None,
) -> bytes

Invokes a Bedrock model using the Converse API and returns the complete response as bytes.

  • modelId: Bedrock model identifier
  • messages: List of conversation messages with role and content
  • system: Optional system prompts
  • inferenceConfig: Optional inference parameters (maxTokens, temperature, topP, stopSequences)
  • toolConfig: Optional tool configuration for function calling
  • guardrailConfig: Optional guardrail configuration
  • additionalModelRequestFields: Optional model-specific parameters
  • performanceConfig: Optional performance configuration (latency: "standard" or "optimized")

converse_stream

async converse_stream(
    modelId: str,
    messages: Sequence[MessageTypeDef],
    *,
    # Same optional parameters as converse()
) -> AsyncGenerator[Dict[str, Any], None]

Invokes a Bedrock model using the ConverseStream API and yields streaming events.

Event types:

  • {"messageStart": {"role": "assistant"}} - Message started
  • {"contentBlockStart": {...}} - Content block started
  • {"contentBlockDelta": {"delta": {"text": "..."}, "contentBlockIndex": 0}} - Text delta
  • {"contentBlockStop": {"contentBlockIndex": 0}} - Content block completed
  • {"messageStop": {"stopReason": "end_turn"}} - Message completed
  • {"metadata": {"usage": {...}, "metrics": {...}}} - Usage metadata

converse_many

async converse_many(
    requests: Iterable[Mapping[str, Any]],
    *,
    concurrency: Optional[int] = None,
    return_exceptions: bool = False,
) -> Sequence[Union[bytes, BaseException]]

Runs multiple converse invocations concurrently while preserving the order of results. Each entry in requests must include modelId and messages; any additional key/value pairs are forwarded to converse.

invoke_model

async invoke_model(body: str, modelId: str, **kwargs) -> bytes

Invokes a Bedrock model and returns the complete response.

  • body: JSON string with model parameters and prompt
  • modelId: Bedrock model identifier
  • kwargs: Optional parameters
    • accept: Accept header (default: "application/json")
    • contentType: Content-Type header (default: "application/json")
    • trace: Tracing level: "ENABLED", "ENABLED_FULL" or "DISABLED" (default: "DISABLED")
    • guardrailIdentifier: ARN of the guardrail to use
    • guardrailVersion: Version of the guardrail (e.g., "1" or "LATEST")
    • performanceConfigLatency: Performance configuration for latency. Valid values are "standard" or "optimized".

invoke_model_with_response_stream

async invoke_model_with_response_stream(body: str, modelId: str, **kwargs) -> AsyncGenerator[Union[Dict[str, Any], bytes], None]

Invokes a Bedrock model and returns an asynchronous generator. The generator yields either parsed JSON objects or raw byte chunks depending on the payload.

  • Parameters are the same as invoke_model
  • Streaming error events from Bedrock raise aiobedrock.main.BedrockStreamError and surface the error payload in the exception message so you can respond or retry appropriately.

invoke_many

async invoke_many(requests: Iterable[Mapping[str, Any]], *, concurrency: Optional[int] = None, return_exceptions: bool = False) -> Sequence[Union[bytes, Exception]]

Runs multiple invocations concurrently while preserving the order of results. Each entry in requests must include body (JSON string) and modelId; any additional key/value pairs are forwarded to invoke_model.

  • concurrency: Optional per-call limit that overrides the client's global max_concurrency.
  • return_exceptions: Mirrors asyncio.gather; when True, exceptions are returned alongside successful responses instead of aborting the batch.

See example/invoke_many.py for a complete usage example.

invoke_sagemaker_endpoint

async invoke_sagemaker_endpoint(
    endpoint_name: str,
    *,
    body: Union[str, bytes],
    content_type: Optional[str] = None,
    accept: Optional[str] = None,
    # ... additional SageMaker-specific headers
) -> bytes

Invokes a SageMaker endpoint asynchronously.

close

async close()

Closes the aiohttp session.

Supported Models

aiobedrock supports all models available on AWS Bedrock, including:

  • Anthropic Claude (Claude 3.5 Sonnet, Claude 3 Opus/Sonnet/Haiku, Claude 4 models)
  • Amazon Nova (Nova Pro, Nova Lite, Nova Micro)
  • Amazon Titan (Titan Text, Titan Embeddings)
  • Meta Llama (Llama 3.x models)
  • Mistral AI (Mistral Large, Mixtral)
  • Cohere (Command R, Command R+)
  • AI21 Labs (Jamba models)

Ensure you have appropriate permissions to access these models in your AWS account.

Error Handling

The client provides detailed error messages for common Bedrock API errors:

  • 403: AccessDeniedException
  • 408: ModelTimeoutException
  • 424: ModelErrorException
  • 429: ThrottlingException
  • 500: InternalServerException
  • 503: ServiceUnavailableException

In addition, when the streaming API surfaces an error event the library raises BedrockStreamError with the exception type that Bedrock reported (for example ModelStreamError) and the payload returned by the service.

For more error details, refer to the AWS Bedrock API documentation.

License

MIT License - See LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiobedrock-0.3.2.tar.gz (19.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiobedrock-0.3.2-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file aiobedrock-0.3.2.tar.gz.

File metadata

  • Download URL: aiobedrock-0.3.2.tar.gz
  • Upload date:
  • Size: 19.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for aiobedrock-0.3.2.tar.gz
Algorithm Hash digest
SHA256 0453fd8319e0a6c0af4e66e5905c989a857ad3802953ce3dba40289cc85e257b
MD5 975fef8ddec42fe146fcb3b451b7a519
BLAKE2b-256 029ac8669c429dc2479a2cc527ca0537bd27b9c40d2d2757a965faec0686d2ba

See more details on using hashes here.

File details

Details for the file aiobedrock-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: aiobedrock-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 14.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for aiobedrock-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 164024081bf279634bec3233f5189b5126b6bd29ec60ab49dc1193c4e88dde5c
MD5 725cfe7848ce7118cfd08944aa0bd912
BLAKE2b-256 863311b5974a520fd3fcb31a71b07106c9dff21bb54c2da6b34dc26b64ebcc4c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page