Skip to main content

Async boto3 client generator.

Project description

aboto3

aboto3 is an async boto3 client generator!

There are other boto3-like libraries that offer asyncio but the interface can be quite different from normal boto3 clients. The goal of aboto3 is to closely replicate the boto3 client interface with acceptable performance from the python ThreadPoolExecutor!

API NOTE - aboto3 was created with boto3 API compatibility in mind. Because of this it does not support boto3 "Resources", and there is no plan to support them. New "Resources" are no longer being added to boto3.

Performance NOTE - Because aboto3 provides an async wrapper around boto3, it is not truly async to it's core. It sends a boto3 call to a thread and the thread runs asynchronously. asyncio tasks are much lighter weight than threads so if you want to run hundreds of concurrent calls with the best performance, pure async boto3 equivalents are a better fit.

Tutorial

To create an async client simply pass in the normal boto3 client to create an AIOClient.

Use the async client, in a coroutine, like you would if the boto3 client's API calls were all async! See boto3 docs for details.

import asyncio

import aboto3
import boto3

# create a normal boto3 client
ec2_client = boto3.client("ec2")
# create the asyncio version from the client
aio_ec2_client = aboto3.AIOClient(ec2_client)

# you can still use the other client as usual
instances = ec2_client.describe_instances()

# the async client must be used in a coroutine
# but acts exactly the same as the boto3 client except method calls are async
async def aio_tester():
    aio_instances = await aio_ec2_client.describe_instances()

    return aio_instances


aio_instances = asyncio.run(aio_tester())

Pass in parameters to the coroutine like a normal client.

import asyncio

import aboto3
import boto3

ec2_client = boto3.client("ec2")
aio_ec2_client = aboto3.AIOClient(ec2_client)


instances = ec2_client.describe_instances(InstanceIds=["i-123412341234"])


async def aio_tester():
    aio_instances = await aio_ec2_client.describe_instances(InstanceIds=["i-123412341234"])

    return aio_instances


aio_instances = asyncio.run(aio_tester())

Get an async paginator from the aboto3 client.

import asyncio

import aboto3
import boto3

ec2_client = boto3.client("ec2")
aio_ec2_client = aboto3.AIOClient(ec2_client)
filters = [
    {
        "Name": "instance-type",
        "Values": [
            "t2.micro"
        ]
    }
]

pages = []
pager = ec2_client.get_paginator("describe_instances")
for page in pager.paginate(Filters=filters):
    pages.append(page)

# note the use of an "async for" loop so calls for a page are non-blocking.
async def aio_tester():
    aio_pages = []
    aio_pager = aio_ec2_client.get_paginator("describe_instances")
    async for page in aio_pager.paginate(Filters=filters):
        aio_pages.append(page)

    return aio_pages


aio_pages = asyncio.run(aio_tester())

Client exceptions can be caught on the AIOClient just like a normal boto3 client. botocore exceptions are caught as normal.

import asyncio

import aboto3
import boto3

ssm_client = boto3.client("ssm")
aio_ssm_client = aboto3.AIOClient(ssm_client)

try:
    ssm_client.get_parameter(Name="/unknown/param")
except ssm_client.exceptions.ParameterNotFound as error:
    print("found an error here: {}".format(error))


async def aio_tester():
    try:
        aio_ssm_client.get_parameter(Name="/unknown/param")
    except aio_ssm_client.exceptions.ParameterNotFound as error:
        print("found an error here: {}".format(error))


aio_pages = asyncio.run(aio_tester())

You can also use boto3 augmenting libraries since aboto3 is only a wrapper.

Optimization

When an AIOClient is created it will automatically create a ThreadPoolExecutor to run the boto3 calls asynchronously. The size of max workers of the pool is determined by the boto3 client's config for max_pool_connections. By default this is 10. See botocore Config Reference for more details.

The thread pool adds a small amount of overhead for each AIOClient that is created (though this is far less than the overhead of creating a boto3 client). To save some initialization time or have more control over total number of threads you can provide your own ThreadPoolExecutor and share this between clients.

import asyncio
from concurrent.futures import ThreadPoolExecutor

import aboto3
import boto3

boto3_thread_pool = ThreadPoolExecutor(max_workers=16)

ec2_client = boto3.client("ec2")
aio_ec2_client = AIOClient(
    boto3_client=ec2_client, 
    thread_pool_executor=boto3_thread_pool
)

rds_client = boto3.client("rds")
aio_rds_client = AIOClient(
    boto3_client=rds_client, 
    thread_pool_executor=boto3_thread_pool
)

In general, for applications, you will want to cache the clients if possible. Try not to create a new one in every function. For applications, a shared thread pool can be useful in limiting the total number of threads, when necessary.

If you are making large numbers of concurrent calls with the same AIOClient you may want to pass in a custom botocore.config.Config to the boto3 client with a higher max_pool_connections. If you are using a shared thread pool you may also need to increase the max workers in that as well.

The example below will allow up to 32 concurrent calls to be in flight for the EC2 and RDS AIOClient's.

import asyncio
from concurrent.futures import ThreadPoolExecutor

import aboto3
import boto3
from botocore.config import Config

boto_config = Config(
    max_pool_connections=32
)
boto3_thread_pool = ThreadPoolExecutor(max_workers=64)

ec2_client = boto3.client("ec2", config=boto_config)
aio_ec2_client = AIOClient(
    boto3_client=ec2_client, 
    thread_pool_executor=boto3_thread_pool
)
rds_client = boto3.client("rds", config=boto_config)
aio_rds_client = AIOClient(
    boto3_client=rds_client, 
    thread_pool_executor=boto3_thread_pool
)

Or if you don't care about sharing the thread pool just pass in the config and each AIOClient will have it's own pool of 32 threads.

import asyncio

import aboto3
import boto3
from botocore.config import Config

boto_config = Config(
    max_pool_connections=32
)

ec2_client = boto3.client("ec2", config=boto_config)
aio_ec2_client = AIOClient(boto3_client=ec2_client)

rds_client = boto3.client("rds", config=boto_config)
aio_rds_client = AIOClient(boto3_client=rds_client)

Development

Install the package in editable mode with dev dependencies.

(venv) $ pip install -e .[dev]

nox is used to manage various dev functions. Start with

(venv) $ nox --help

pyenv is used to manage python versions. To run the nox tests for applicable python version you will first need to install them. In the root project dir run:

(venv) $ pyenv install

Changelog

Changelog for aboto3. All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

[0.1.1] - 2024-01-28

Added

Tests for python 3.12

Changed

Updated README to reflect recommendations.

Removed

Support for python 3.7

[0.1.0] - 2023-06-22

Initial Release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aboto3-0.1.1.tar.gz (6.8 kB view hashes)

Uploaded Source

Built Distribution

aboto3-0.1.1-py3-none-any.whl (6.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page