Skip to main content

Adapters following hexagonal architecture to connect various AWS services.

Project description

auto-merge tests

aws-hexagonal-adapters

Adapters following hexagonal architecture to connect various AWS services

Example usage

S3

Download object

from os import environ

from aws_hexagonal_adapters.s3_service import S3Service

S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])

# Download file
S3_SERVICE.download(
    bucket="bucket_name", local_path=f"/tmp/object", remote_path="object"
)

Upload object

from os import environ

from aws_hexagonal_adapters.s3_service import S3Service

S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])

# Download file
S3_SERVICE.upload(
    bucket="bucket_name",
    local_path=f"/tmp/object_name",
    remote_path="object_name",
    extra_args={"StorageClass": "STANDARD_IA"},
)

List objects

from os import environ

from aws_hexagonal_adapters.s3_service import S3Service

S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])

# Download file
S3_SERVICE.list_files(bucket="bucket_name", prefix="folder_name")

Delete object

from os import environ

from aws_hexagonal_adapters.s3_service import S3Service

S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])

# Download file
S3_SERVICE.delete_object(bucket="bucket_name", key="object_name")

Delete objects

from os import environ

from aws_hexagonal_adapters.s3_service import S3Service

S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])
objects = ["folder/object1.txt", "folder/object2.txt", "folder/object3.txt"]
# Download file
S3_SERVICE.delete_objects(bucket="bucket_name", keys=objects)

DynamoDB

Get Item

from os import environ

from aws_hexagonal_adapters.dynamo_db_service import DynamoDBService

DYNAMODB_SERVICE = DynamoDBService(region_name=environ["REGION_NAME"])

item = DYNAMODB_SERVICE.get_item(
    table_name=environ["DYNAMODB_TABLE_NAME"],
    key={"pk": f'{environ["DYNAMODB_TABLE_ITEM_NAME"]}', "sk": "1"},
)

Get Items

from os import environ

from boto3.dynamodb.conditions import Key

from aws_hexagonal_adapters.dynamo_db_service import DynamoDBService

DYNAMODB_SERVICE = DynamoDBService(region_name=environ["REGION_NAME"])

item = DYNAMODB_SERVICE.get_items(
    table_name=environ["DYNAMODB_TABLE_NAME"],
    filter_expression=Key("sk").eq("2"),
)

Scan

from os import environ

from aws_hexagonal_adapters.dynamo_db_service import DynamoDBService

DYNAMODB_SERVICE = DynamoDBService(region_name=environ["REGION_NAME"])

item = DYNAMODB_SERVICE.scan_items(
    table_name=environ["DYNAMODB_TABLE_NAME"],
    index_name=environ["DYNAMODB_INDEX_NAME"],
)

SQS

from os import environ

from aws_hexagonal_adapters.sqs_service import SQSService

SQS_SERVICE = SQSService(region_name=environ["REGION_NAME"])

messages = SQS_SERVICE.receive_messages(queue_url=environ["SQS_QUEUE_NAME"])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_hexagonal_adapters-1.2.10.tar.gz (14.9 kB view hashes)

Uploaded Source

Built Distribution

aws_hexagonal_adapters-1.2.10-py3-none-any.whl (19.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page