AWS boto3 bedrock client in async
Project description
aiobedrock
An asynchronous Python client for AWS Bedrock, providing non-blocking access to Amazon's foundation model service.
Features
- Fully Asynchronous: Non-blocking API calls using
aiohttp - Low Overhead: Minimal dependencies with efficient implementation
- Converse API: Unified API for all Bedrock models with structured messages
- Streaming Support: Stream responses for real-time AI model interactions
- Guardrail Integration: Support for AWS Bedrock Guardrails
- Service Tier Support: Configure processing tiers (priority, default, flex, reserved)
- AWS SigV4 Auth: Proper AWS authentication for secure API calls
- Batch Processing: Concurrent batch invocations with
invoke_manyandconverse_many - Error Handling: Comprehensive error handling with descriptive exceptions
- Type Hints: Optional type checking support with
mypy-boto3-bedrock-runtime
Installation
pip install aiobedrock
For type checking support (optional):
pip install aiobedrock[types]
Requirements
- Python 3.9 or later (tested through Python 3.14)
- AWS credentials configured in your environment
- boto3 1.38.21 or newer (installed automatically via dependencies)
Quick Start
Converse API (Recommended)
The Converse API provides a unified interface for all Bedrock models:
import json
import asyncio
from aiobedrock import Client
async def main():
async with Client(region_name="us-west-2") as client:
messages = [
{
"role": "user",
"content": [{"text": "What is the capital of France?"}],
}
]
response = await client.converse(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
messages=messages,
inferenceConfig={
"maxTokens": 1024,
"temperature": 0.7,
},
)
result = json.loads(response.decode("utf-8"))
print(json.dumps(result, indent=2))
if __name__ == "__main__":
asyncio.run(main())
Converse Streaming
import asyncio
from aiobedrock import Client
async def main():
async with Client(region_name="us-west-2") as client:
messages = [
{
"role": "user",
"content": [{"text": "Tell me a short story about a robot."}],
}
]
print("Assistant: ", end="", flush=True)
async for event in client.converse_stream(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
messages=messages,
inferenceConfig={
"maxTokens": 1024,
"temperature": 0.7,
},
):
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
print(delta["text"], end="", flush=True)
elif "messageStop" in event:
print(f"\n[Stop reason: {event['messageStop'].get('stopReason')}]")
elif "metadata" in event:
usage = event["metadata"].get("usage", {})
print(f"[Tokens - Input: {usage.get('inputTokens')}, Output: {usage.get('outputTokens')}]")
if __name__ == "__main__":
asyncio.run(main())
Basic Model Invocation (More controll)
For direct model invocation with model-specific request formats:
import json
import asyncio
from aiobedrock import Client
async def main():
async with Client(region_name="YOUR_AWS_REGION") as client:
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"temperature": 0.7,
"top_p": 0.9,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What can you do?"},
],
}
],
}
response = await client.invoke_model(
body=json.dumps(body),
modelId="anthropic.claude-3-haiku-20240307-v1:0",
accept="application/json",
contentType="application/json",
)
print(json.loads(response.decode("utf-8")))
if __name__ == "__main__":
asyncio.run(main())
Streaming Response (More controll)
import json
import asyncio
from aiobedrock import Client
async def main():
async with Client(region_name="YOUR_AWS_REGION") as client:
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"temperature": 0.7,
"top_p": 0.9,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What can you do?"},
],
}
],
}
async for chunk in client.invoke_model_with_response_stream(
body=json.dumps(body),
modelId="anthropic.claude-3-haiku-20240307-v1:0",
accept="application/json",
contentType="application/json",
):
print(json.loads(chunk.decode("utf-8")))
if __name__ == "__main__":
asyncio.run(main())
Using Guardrails
import json
import asyncio
from aiobedrock import Client
async def main():
async with Client(region_name="YOUR_AWS_REGION") as client:
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"temperature": 0.7,
"top_p": 0.9,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What can you do?"},
],
}
],
}
response = await client.invoke_model(
body=json.dumps(body),
modelId="anthropic.claude-3-haiku-20240307-v1:0",
accept="application/json",
contentType="application/json",
guardrailIdentifier="arn:aws:bedrock:YOUR_REGION:YOUR_ACCOUNT_ID:guardrail/YOUR_GUARDRAIL_ID",
guardrailVersion="LATEST",
)
print(json.loads(response.decode("utf-8")))
if __name__ == "__main__":
asyncio.run(main())
API Reference
Client
Client(
region_name: str,
assume_role_arn: Optional[str] = None,
*,
max_connections: int = 10000,
request_timeout: Optional[float] = None,
max_concurrency: Optional[int] = None,
max_retries: int = 2,
retry_backoff: float = 0.5,
max_backoff: float = 6.0,
retry_statuses: Optional[Sequence[int]] = None,
)
Creates a new Bedrock client instance.
The underlying aiohttp.ClientSession is created lazily when first used. You
can interact with the client by using async with or by awaiting individual
methods directly; both patterns will create a shared session automatically.
- region_name: AWS region where Bedrock is available (e.g., "us-east-1", "us-west-2")
- assume_role_arn: Optional ARN of an IAM role to assume for cross-account access
- max_connections: Maximum number of connections in the pool (default: 10000)
- request_timeout: Optional request timeout in seconds
- max_concurrency: Optional maximum concurrent requests
- max_retries: Maximum number of retry attempts (default: 2)
- retry_backoff: Initial backoff delay in seconds (default: 0.5)
- max_backoff: Maximum backoff delay in seconds (default: 6.0)
- retry_statuses: HTTP status codes to retry (default: 408, 424, 429, 500, 502, 503, 504)
Methods
converse
async converse(
modelId: str,
messages: Sequence[MessageTypeDef],
*,
system: Optional[Sequence[SystemContentBlockTypeDef]] = None,
inferenceConfig: Optional[InferenceConfigurationTypeDef] = None,
toolConfig: Optional[ToolConfigurationTypeDef] = None,
guardrailConfig: Optional[GuardrailConfigurationTypeDef] = None,
additionalModelRequestFields: Optional[Mapping[str, Any]] = None,
additionalModelResponseFieldPaths: Optional[Sequence[str]] = None,
promptVariables: Optional[Mapping[str, Any]] = None,
requestMetadata: Optional[Mapping[str, str]] = None,
performanceConfig: Optional[PerformanceConfigurationTypeDef] = None,
serviceTier: Optional[ServiceTierConfigTypeDef] = None,
) -> bytes
Invokes a Bedrock model using the Converse API and returns the complete response as bytes.
- modelId: Bedrock model identifier
- messages: List of conversation messages with
roleandcontent - system: Optional system prompts
- inferenceConfig: Optional inference parameters (
maxTokens,temperature,topP,stopSequences) - toolConfig: Optional tool configuration for function calling
- guardrailConfig: Optional guardrail configuration
- additionalModelRequestFields: Optional model-specific parameters
- performanceConfig: Optional performance configuration (
latency: "standard" or "optimized") - serviceTier: Optional service tier configuration (
type: "priority", "default", "flex", or "reserved")
converse_stream
async converse_stream(
modelId: str,
messages: Sequence[MessageTypeDef],
*,
# Same optional parameters as converse()
) -> AsyncGenerator[Dict[str, Any], None]
Invokes a Bedrock model using the ConverseStream API and yields streaming events.
Event types:
{"messageStart": {"role": "assistant"}}- Message started{"contentBlockStart": {...}}- Content block started{"contentBlockDelta": {"delta": {"text": "..."}, "contentBlockIndex": 0}}- Text delta{"contentBlockStop": {"contentBlockIndex": 0}}- Content block completed{"messageStop": {"stopReason": "end_turn"}}- Message completed{"metadata": {"usage": {...}, "metrics": {...}}}- Usage metadata
converse_many
async converse_many(
requests: Iterable[Mapping[str, Any]],
*,
concurrency: Optional[int] = None,
return_exceptions: bool = False,
) -> Sequence[Union[bytes, BaseException]]
Runs multiple converse invocations concurrently while preserving the order of results.
Each entry in requests must include modelId and messages; any additional key/value
pairs are forwarded to converse.
invoke_model
async invoke_model(body: str, modelId: str, **kwargs) -> bytes
Invokes a Bedrock model and returns the complete response.
- body: JSON string with model parameters and prompt
- modelId: Bedrock model identifier
- kwargs: Optional parameters
- accept: Accept header (default: "application/json")
- contentType: Content-Type header (default: "application/json")
- trace: Tracing level: "ENABLED", "ENABLED_FULL" or "DISABLED" (default: "DISABLED")
- guardrailIdentifier: ARN of the guardrail to use
- guardrailVersion: Version of the guardrail (e.g., "1" or "LATEST")
- performanceConfigLatency: Performance configuration for latency. Valid values are "standard" or "optimized".
- serviceTier: Processing tier type. Valid values are "priority", "default", "flex", or "reserved".
invoke_model_with_response_stream
async invoke_model_with_response_stream(body: str, modelId: str, **kwargs) -> AsyncGenerator[Union[Dict[str, Any], bytes], None]
Invokes a Bedrock model and returns an asynchronous generator. The generator yields either parsed JSON objects or raw byte chunks depending on the payload.
- Parameters are the same as
invoke_model - Streaming error events from Bedrock raise
aiobedrock.main.BedrockStreamErrorand surface the error payload in the exception message so you can respond or retry appropriately.
invoke_many
async invoke_many(requests: Iterable[Mapping[str, Any]], *, concurrency: Optional[int] = None, return_exceptions: bool = False) -> Sequence[Union[bytes, Exception]]
Runs multiple invocations concurrently while preserving the order of results.
Each entry in requests must include body (JSON string) and modelId; any
additional key/value pairs are forwarded to invoke_model.
concurrency: Optional per-call limit that overrides the client's globalmax_concurrency.return_exceptions: Mirrorsasyncio.gather; whenTrue, exceptions are returned alongside successful responses instead of aborting the batch.
See example/invoke_many.py for a complete usage example.
invoke_sagemaker_endpoint
async invoke_sagemaker_endpoint(
endpoint_name: str,
*,
body: Union[str, bytes],
content_type: Optional[str] = None,
accept: Optional[str] = None,
# ... additional SageMaker-specific headers
) -> bytes
Invokes a SageMaker endpoint asynchronously.
close
async close()
Closes the aiohttp session.
Supported Models
aiobedrock supports all models available on AWS Bedrock, AWS Sagemaker Ensure you have appropriate permissions to access these models in your AWS account.
Error Handling
The client provides detailed error messages for common Bedrock API errors:
- 403: AccessDeniedException
- 408: ModelTimeoutException
- 424: ModelErrorException
- 429: ThrottlingException
- 500: InternalServerException
- 503: ServiceUnavailableException
In addition, when the streaming API surfaces an error event the library raises
BedrockStreamError with the exception type that Bedrock reported (for
example ModelStreamError) and the payload returned by the service.
For more error details, refer to the AWS Bedrock API documentation.
License
MIT License - See LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiobedrock-0.5.0.tar.gz.
File metadata
- Download URL: aiobedrock-0.5.0.tar.gz
- Upload date:
- Size: 19.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4175049f50642b123fc86118bd316b663e549fc822bedc211e1b31c7e45673e
|
|
| MD5 |
a46c4ac538a45bbd6a0b9ac781817906
|
|
| BLAKE2b-256 |
77278dc86c6d7476b6e207f712f13d3fb97ae0eb82829507779c680e9b43826e
|
File details
Details for the file aiobedrock-0.5.0-py3-none-any.whl.
File metadata
- Download URL: aiobedrock-0.5.0-py3-none-any.whl
- Upload date:
- Size: 14.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dbdf0bde77c54f920483a5165021abf32375328f68b1e86183edf4df44cd41c7
|
|
| MD5 |
05e0abc4ec76d615067d2840dc9fa10a
|
|
| BLAKE2b-256 |
6574da71943047230a89e1477e6abd0a5064a0f429c53d2a89e81fefda20a7a8
|