Skip to main content

The simple module for putting and getting object from Amazon S3 compatible endpoints

Project description

aiohttp-s3-client

PyPI - License Wheel Mypy PyPI PyPI Coverage Status tests

The simple module for putting and getting object from Amazon S3 compatible endpoints

Installation

pip install aiohttp-s3-client

Usage

import asyncio
from http import HTTPStatus

from aiohttp import ClientSession
from aiohttp_s3_client import S3Client


async def main():
    async with ClientSession(raise_for_status=True) as session:
        client = S3Client(
            url="http://s3-url",
            session=session,
            access_key_id="key-id",
            secret_access_key="hackme",
            region="us-east-1",
        )

        # Upload str object to bucket "bucket" and key "str"
        async with client.put("bucket/str", "hello, world") as resp:
            assert resp.status == HTTPStatus.OK

        # Upload bytes object to bucket "bucket" and key "bytes"
        async with client.put("bucket/bytes", b"hello, world") as resp:
            assert resp.status == HTTPStatus.OK

        # Upload AsyncIterable to bucket "bucket" and key "iterable"
        async def gen():
            yield b"some bytes"

        async with client.put("bucket/file", gen()) as resp:
            assert resp.status == HTTPStatus.OK

        # Upload file to bucket "bucket" and key "file"
        async with client.put_file("bucket/file", "/path_to_file") as resp:
            assert resp.status == HTTPStatus.OK

        # Check object exists using bucket+key
        async with client.head("bucket/key") as resp:
            assert resp == HTTPStatus.OK

        # Get object by bucket+key
        async with client.get("bucket/key") as resp:
            data = await resp.read()

        # Make presigned URL
        url = client.presign_url("GET", "bucket/key", expires=60 * 60)

        # Delete object using bucket+key
        async with client.delete("bucket/key") as resp:
            assert resp == HTTPStatus.NO_CONTENT

        # Server-side copy
        async with client.copy("bucket/src-key", "bucket/dst-key") as resp:
            assert resp.status == HTTPStatus.OK

        # Rename (copy + delete source, not atomic)
        await client.rename("bucket/old-key", "bucket/new-key")

        # List objects by prefix
        async for result, prefixes in client.list_objects_v2(
            "bucket/", prefix="prefix",
        ):
            # Each result is a list of metadata objects representing an object
            # stored in the bucket.  Each prefixes is a list of common prefixes
            print(result, prefixes)


asyncio.run(main())

Bucket may be specified as subdomain or in object name:

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client


async def main():
    async with aiohttp.ClientSession() as session:
        # As a subdomain
        client = S3Client(url="http://bucket.your-s3-host", session=session)
        async with client.put("key", b"data") as resp:
            ...

        # In the object name
        client = S3Client(url="http://your-s3-host", session=session)
        async with client.put("bucket/key", b"data") as resp:
            ...

        # In the base URL
        client = S3Client(url="http://your-s3-host/bucket", session=session)
        async with client.put("key", b"data") as resp:
            ...


asyncio.run(main())

Auth may be specified with keywords or in URL:

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client


async def main():
    async with aiohttp.ClientSession() as session:
        client_credentials_as_kw = S3Client(
            url="http://your-s3-host",
            access_key_id="key_id",
            secret_access_key="access_key",
            session=session,
        )

        client_credentials_in_url = S3Client(
            url="http://key_id:access_key@your-s3-host",
            session=session,
        )


asyncio.run(main())

Credentials

By default S3Client trying to collect all available credentials from keyword arguments like access_key_id= and secret_access_key=, after that from the username and password from passed url argument, so the next step is environment variables parsing and the last source for collection is the config file.

You can pass credentials explicitly using aiohttp_s3_client.credentials module.

aiohttp_s3_client.credentials.StaticCredentials

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import StaticCredentials


async def main():
    credentials = StaticCredentials(
        access_key_id="aaaa",
        secret_access_key="bbbb",
        region="us-east-1",
    )
    async with aiohttp.ClientSession() as session:
        client = S3Client(
            url="http://your-s3-host",
            session=session,
            credentials=credentials,
        )


asyncio.run(main())

aiohttp_s3_client.credentials.URLCredentials

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import URLCredentials


async def main():
    url = "http://key:hack-me@your-s3-host"
    credentials = URLCredentials(url, region="us-east-1")
    async with aiohttp.ClientSession() as session:
        client = S3Client(
            url="http://your-s3-host",
            session=session,
            credentials=credentials,
        )


asyncio.run(main())

aiohttp_s3_client.credentials.EnvironmentCredentials

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import EnvironmentCredentials


async def main():
    credentials = EnvironmentCredentials(region="us-east-1")
    async with aiohttp.ClientSession() as session:
        client = S3Client(
            url="http://your-s3-host",
            session=session,
            credentials=credentials,
        )


asyncio.run(main())

aiohttp_s3_client.credentials.ConfigCredentials

Using user config file:

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import ConfigCredentials


async def main():
    credentials = ConfigCredentials()   # Will be used ~/.aws/credentials config
    async with aiohttp.ClientSession() as session:
        client = S3Client(
            url="http://your-s3-host",
            session=session,
            credentials=credentials,
        )


asyncio.run(main())

Using the custom config location:

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import ConfigCredentials


async def main():
    credentials = ConfigCredentials("~/.my-custom-aws-credentials")
    async with aiohttp.ClientSession() as session:
        client = S3Client(
            url="http://your-s3-host",
            session=session,
            credentials=credentials,
        )


asyncio.run(main())

aiohttp_s3_client.credentials.merge_credentials

This function collect all passed credentials instances and return a new one which contains all non-blank fields from passed instances. The first argument has more priority.

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import (
    ConfigCredentials, EnvironmentCredentials, merge_credentials,
)


async def main():
    credentials = merge_credentials(
        EnvironmentCredentials(),
        ConfigCredentials(),
    )
    async with aiohttp.ClientSession() as session:
        client = S3Client(
            url="http://your-s3-host",
            session=session,
            credentials=credentials,
        )


asyncio.run(main())

aiohttp_s3_client.credentials.MetadataCredentials

Trying to get credentials from the metadata service:

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import MetadataCredentials


async def main():
    credentials = MetadataCredentials()

    # start refresh credentials from metadata server
    await credentials.start()
    try:
        async with aiohttp.ClientSession() as session:
            client = S3Client(
                url="http://your-s3-host",
                session=session,
                credentials=credentials,
            )
    finally:
        await credentials.stop()


asyncio.run(main())

Multipart upload

For uploading large files multipart uploading can be used. It allows you to asynchronously upload multiple parts of a file to S3. S3Client handles retries of part uploads and calculates part hash for integrity checks.

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client


async def main():
    async with aiohttp.ClientSession() as session:
        client = S3Client(url="http://your-s3-host", session=session)
        await client.put_file_multipart(
            "test/bigfile.csv",
            headers={
                "Content-Type": "text/csv",
            },
            workers_count=8,
        )


asyncio.run(main())

Content-Type inference

When uploading objects the client automatically infers the Content-Type header from the object key (or local file path) using Python's mimetypes.guess_type. For example, uploading to bucket/photo.jpg will set Content-Type: image/jpeg. If the type cannot be determined it falls back to application/octet-stream.

You can always override this by passing an explicit Content-Type header:

async with client.put(
    "bucket/data.bin",
    some_bytes,
    headers={"Content-Type": "application/x-custom"},
) as resp:
    ...

Custom metadata

S3 allows you to attach arbitrary key-value metadata to objects using x-amz-meta-<key> headers. You can pass these via the headers parameter on any upload method.

With client.put():

async with client.put(
    "bucket/report.json",
    b'{"result": 42}',
    headers={
        "x-amz-meta-author": "alice",
        "x-amz-meta-version": "3",
    },
) as resp:
    assert resp.status == 200

With client.put_file():

resp = await client.put_file(
    "bucket/photo.jpg",
    "/path/to/photo.jpg",
    headers={
        "x-amz-meta-camera": "Nikon D850",
        "x-amz-meta-location": "Paris",
    },
)

With client.put_file_multipart():

await client.put_file_multipart(
    "bucket/bigfile.csv",
    "/path/to/bigfile.csv",
    headers={
        "Content-Type": "text/csv",
        "x-amz-meta-source": "etl-pipeline",
    },
    workers_count=8,
)

Metadata can also be set or replaced during a server-side copy by passing replace_metadata=True:

async with client.copy(
    "bucket/src-key",
    "bucket/dst-key",
    replace_metadata=True,
    headers={
        "x-amz-meta-status": "archived",
    },
) as resp:
    assert resp.status == 200

Parallel download to file

S3 supports GET requests with Range header. It's possible to download objects in parallel with multiple connections for speedup. S3Client handles retries of partial requests and makes sure that file won't be changed during download with ETag header. If your system supports pwrite syscall (Linux, macOS, etc.) it will be used to write simultaneously to a single file. Otherwise, each worker will have own file which will be concatenated after downloading.

import asyncio

import aiohttp
from aiohttp_s3_client import S3Client


async def main():
    async with aiohttp.ClientSession() as session:
        client = S3Client(url="http://your-s3-host", session=session)
        await client.get_file_parallel(
            "dump/bigfile.csv",
            "/home/user/bigfile.csv",
            workers_count=8,
        )


asyncio.run(main())

Manual multipart upload

You can also manually control multipart upload process using multipart_upload method. It returns an async context manager which handles upload creation and completion. This method gives you more control over the upload process, for example you can specify part size, add custom metadata, or control concurrency.

Important multipart restrictions and recommendations:

  • Minimum part size: 5 MiB (5 * 1024 * 1024 bytes). Every part must be at least 5 MiB in size, except for the final part.
  • Maximum number of parts: 10,000. The total number of uploaded parts must be <= 10,000.
  • Choosing a part size: pick a part size that satisfies both constraints. A safe formula when you know the total object size is:
    part_size = max(5 * 1024 * 1024, math.ceil(total_size / 10000))
    
  • If you don't know the total size in advance, choose a conservative part size (for example 8 MiB or 16 MiB) so you are unlikely to exceed 10,000 parts.
  • The uploader implements retries for failed part uploads; you should still ensure parts (except the last) meet the 5 MiB minimum before uploading.

The put_part method returns a coroutine — calling put_part(...) does not perform the network upload immediately, it registers the part (and its part number) and returns a coroutine which performs the actual upload when awaited. This lets you schedule uploads and then await them concurrently.

Note: the coroutine returned by put_part(...) performs the actual network upload when awaited and the uploader will automatically retry failed part uploads according to its retry policy; awaiting the coroutine will run those retries for that part. You don't need to retry manually when using the returned coroutine — the uploader handles integrity checks and retrying.

Important usage notes:

  • You MUST call put_part(...) in the logical part sequence so parts get the correct part numbers (the uploader assigns part numbers in call order).
  • You MAY await the returned coroutines later and in any concurrency pattern you like (for example with asyncio.gather), which enables concurrent part uploads.

Examples

Create parts then upload them concurrently:

import asyncio
import hashlib

import aiohttp
from aiohttp_s3_client import S3Client


async def main():
    async with aiohttp.ClientSession() as session:
        client = S3Client(url="http://your-s3-host", session=session)

        chunks = [b"x" * 5 * 1024 * 1024, b"y" * 5 * 1024 * 1024]

        async with client.multipart_upload("test/video.mov") as uploader:
            uploads = []

            # Call put_part in the correct part sequence and collect coroutines.
            # The uploader assigns part numbers in the order put_part is called.
            for chunk in chunks:
                uploads.append(
                    uploader.put_part(
                        chunk,
                        content_sha256=hashlib.sha256(chunk).hexdigest(),
                    ),
                )

            # Now execute all part uploads concurrently. The uploader will
            # handle retries and integrity checks for each part.
            await asyncio.gather(*uploads)


asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiohttp_s3_client-1.1.1.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiohttp_s3_client-1.1.1-py3-none-any.whl (20.5 kB view details)

Uploaded Python 3

File details

Details for the file aiohttp_s3_client-1.1.1.tar.gz.

File metadata

  • Download URL: aiohttp_s3_client-1.1.1.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for aiohttp_s3_client-1.1.1.tar.gz
Algorithm Hash digest
SHA256 26bbb94f673be08f48e774c42af331b5b93650804e7cbdbe0178f04439b4395e
MD5 6e984e5ff81e7baae89a616743557e13
BLAKE2b-256 9c3b6ae7a317248cdec94c1b493c57e0f15f2d6a3074ddcc9282756ee9d36b6a

See more details on using hashes here.

File details

Details for the file aiohttp_s3_client-1.1.1-py3-none-any.whl.

File metadata

  • Download URL: aiohttp_s3_client-1.1.1-py3-none-any.whl
  • Upload date:
  • Size: 20.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for aiohttp_s3_client-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5e28b730aa1f1225580c3a9d8e628bf916c498aa46bb4c60d6bcdc0c60260056
MD5 f5d7c0c46adc675920d2c22721159ae5
BLAKE2b-256 937ddc310db37eccaa6288f9aaa355fdbff66a6d42a1963fabd6fa781173dc3a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page