Adapters following hexagonal architecture to connect various AWS services.
Project description
aws-hexagonal-adapters
Adapters following hexagonal architecture to connect various AWS services
Example usage
S3
Download object
from os import environ
from aws_hexagonal_adapters.s3_service import S3Service
S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])
# Download file
S3_SERVICE.download(
bucket="bucket_name", local_path=f"/tmp/object", remote_path="object"
)
Upload object
from os import environ
from aws_hexagonal_adapters.s3_service import S3Service
S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])
# Download file
S3_SERVICE.upload(
bucket="bucket_name",
local_path=f"/tmp/object_name",
remote_path="object_name",
extra_args={"StorageClass": "STANDARD_IA"},
)
List objects
from os import environ
from aws_hexagonal_adapters.s3_service import S3Service
S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])
# Download file
S3_SERVICE.list_files(bucket="bucket_name", prefix="folder_name")
Delete object
from os import environ
from aws_hexagonal_adapters.s3_service import S3Service
S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])
# Download file
S3_SERVICE.delete_object(bucket="bucket_name", key="object_name")
Delete objects
from os import environ
from aws_hexagonal_adapters.s3_service import S3Service
S3_SERVICE = S3Service(region_name=environ["CURRENT_REGION"])
objects = ["folder/object1.txt", "folder/object2.txt", "folder/object3.txt"]
# Download file
S3_SERVICE.delete_objects(bucket="bucket_name", keys=objects)
DynamoDB
Get Item
from os import environ
from aws_hexagonal_adapters.dynamo_db_service import DynamoDBService
DYNAMODB_SERVICE = DynamoDBService(region_name=environ["REGION_NAME"])
item = DYNAMODB_SERVICE.get_item(
table_name=environ["DYNAMODB_TABLE_NAME"],
key={"pk": f'{environ["DYNAMODB_TABLE_ITEM_NAME"]}', "sk": "1"},
)
Get Items
from os import environ
from boto3.dynamodb.conditions import Key
from aws_hexagonal_adapters.dynamo_db_service import DynamoDBService
DYNAMODB_SERVICE = DynamoDBService(region_name=environ["REGION_NAME"])
item = DYNAMODB_SERVICE.get_items(
table_name=environ["DYNAMODB_TABLE_NAME"],
filter_expression=Key("sk").eq("2"),
)
Scan
from os import environ
from aws_hexagonal_adapters.dynamo_db_service import DynamoDBService
DYNAMODB_SERVICE = DynamoDBService(region_name=environ["REGION_NAME"])
item = DYNAMODB_SERVICE.scan_items(
table_name=environ["DYNAMODB_TABLE_NAME"],
index_name=environ["DYNAMODB_INDEX_NAME"],
)
SQS
from os import environ
from aws_hexagonal_adapters.sqs_service import SQSService
SQS_SERVICE = SQSService(region_name=environ["REGION_NAME"])
messages = SQS_SERVICE.receive_messages(queue_url=environ["SQS_QUEUE_NAME"])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for aws_hexagonal_adapters-1.2.6.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2b99e0b858376e655cba066691f1bd6f502219257d763e5ae6a31f8783e8f1f1 |
|
MD5 | 12315df2b0c3e2258e20407bd716d028 |
|
BLAKE2b-256 | 5136f2bebdfe2f534a000905fb6ed8a2968039647f5f7bf78be026589c9e2ab9 |
Close
Hashes for aws_hexagonal_adapters-1.2.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7fb6eb67611082ae4ca5c5c75fbb8485a66a84a140efa59735d870e20a4bd928 |
|
MD5 | c74b6b7851dc7d4a281a72679018b43c |
|
BLAKE2b-256 | cd58e335d907c8a73901d6d71d61a44a993357c42c7d2e5fca0a8a092d5fe5cf |