Schema-aware DynamoDB adapter with prefixing, SNS hooks, and batching.
Project description
🪣 daplug-s3 (da•plug)
Schema-Driven S3 Normalization & Event Publishing for Python
daplug-s3 is an adapter that wraps boto3 S3 primitives with schema-friendly helpers, SNS publishing, and convenience utilities for streaming, multipart uploads, and presigned URL generation.
Need deeper operational context? See .agents/.AGENTS.md for the full consumer guide and .agents/CODEX.md for contributor workflow expectations.
✨ Highlights
- Schema-aware writes – Convert Python dictionaries into JSON payloads automatically.
- Event-driven – Every write publishes presigned URLs via the
daplug_corepublisher so downstream services can react in real time. - Complete S3 toolkit – Simple helpers for CRUD, streaming uploads, multipart chunking, directory listings, and rename/delete patterns.
- Local-first – Works seamlessly with LocalStack using the
endpointparameter.
📦 Installation
pip install daplug-s3
# or with Pipenv
pipenv install daplug-s3
The library targets Python 3.9+.
🚀 Quick Start
from daplug_s3 import adapter
s3 = adapter(
endpoint="https://s3.us-east-1.amazonaws.com", # optional override
bucket="my-team-bucket",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret",
region="us-east-1",
sns_arn="arn:aws:sns:us-east-1:123456789012:my-topic",
sns_endpoint="https://sns.us-east-1.amazonaws.com",
)
Adapter Configuration Arguments
| Kwarg | Type | Required | Description |
|---|---|---|---|
endpoint |
str,None |
No | Custom S3/LocalStack endpoint. Leave None for AWS default. |
bucket |
str |
Yes | Default bucket applied to every request. |
aws_access_key_id |
str |
Yes | IAM access key. |
aws_secret_access_key |
str |
Yes | IAM secret key. |
region |
str |
Yes | AWS region name. |
sns_arn |
str |
No | SNS topic ARN used by BaseAdapter.publish. |
sns_endpoint |
str |
No | SNS endpoint override (LocalStack). |
sns_attributes |
dict[str,int,float,bool] |
No | Default SNS message attributes. |
All public methods below accept keyword arguments only.
🧰 API Reference & Examples
put(**kwargs) (alias: create)
Store data in S3 with optional JSON encoding.
payload = {"type": "invoice", "id": 256}
s3.put(s3_path="docs/invoice-256.json", data=payload, json=True)
| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | — | Object key. |
data |
bytes | str | dict |
Yes | — | Content to write. |
json |
bool |
No | False |
Encode via jsonpickle. |
encode |
bool |
No | True |
Convert strings to UTF-8 bytes. |
public_read |
bool |
No | False |
Applies public-read ACL. |
Always triggers BaseAdapter.publish with a presigned URL payload.
upload_stream(**kwargs)
Upload streamed or buffered data.
from pathlib import Path
with Path("brochure.pdf").open("rb") as fh:
s3.upload_stream(s3_path="docs/brochure.pdf", io=fh, public_read=True)
| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | — | Object key. |
io |
binary file object | One of | — | File-like object to stream from. |
data |
bytes |
One of | — | Raw bytes used when io missing. |
threshold |
int |
No | 10000 |
Multipart threshold for uploads. |
concurrency |
int |
No | 4 |
Parallel worker count. |
public_read |
bool |
No | False |
Public ACL toggle. |
Either io or data must be supplied.
get(**kwargs) (alias: read)
Retrieve and optionally decode objects.
config = s3.get(s3_path="docs/invoice-256.json", json=True)
| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | — | Object key. |
json |
bool |
No | False |
Decode JSON via jsonpickle. |
decode |
bool |
No | True |
When False, returns raw boto3 response. |
read simply calls get.
download(**kwargs)
Save S3 content locally.
target = s3.download(s3_path="reports/weekly.csv", download_path="/tmp/weekly.csv")
| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Remote key. |
download_path |
str |
Yes | Destination path; directories are created automatically. |
Returns the download_path string.
multipart_upload(**kwargs)
Manual chunk uploads.
chunks = [b"chunk-1", b"chunk-2", b"chunk-3"]
s3.multipart_upload(s3_path="large/data.bin", chunks=chunks)
| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Target key. |
chunks |
list[bytes] |
Yes | Ordered byte chunks uploaded sequentially. |
Publishes a presigned URL when complete.
create_public_url(**kwargs)
Generate unsigned URLs for public objects.
public_url = s3.create_public_url(s3_path="docs/brochure.pdf")
| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Object key. |
Only works for objects uploaded with public_read=True.
create_presigned_read_url(**kwargs)
Time-limited access.
signed_url = s3.create_presigned_read_url(s3_path="docs/invoice-256.json", expiration=900)
| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | — | Object key. |
expiration |
int |
No | 3600 |
Seconds before URL expires. |
create_presigned_post_url(**kwargs)
Generate POST policies for browser uploads.
post_config = s3.create_presigned_post_url(
s3_path="uploads/raw.txt",
required_fields={"acl": "private"},
required_conditions=[["content-length-range", 0, 1048576]],
expiration=600,
)
| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Destination key. |
required_fields |
dict[str, str] |
No | Pre-populated form fields. |
required_conditions |
list[list[str]] |
No | Additional policy conditions. |
expiration |
int |
No (3600) |
Policy lifetime (seconds). |
object_exist(**kwargs)
Check for object existence.
if not s3.object_exist(s3_path="docs/invoice-999.json"):
raise LookupError("missing doc")
| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Object key. |
Returns True when the object exists, False on 404, otherwise raises ClientError.
list_dir_subfolders(**kwargs)
folders = s3.list_dir_subfolders(dir_name="reports/")
| Kwarg | Type | Required | Description |
|---|---|---|---|
dir_name |
str |
Yes | Prefix ending with /. |
Returns prefixes for child folders.
list_dir_files(**kwargs)
recent = s3.list_dir_files(dir_name="reports/", date=datetime.utcnow())
| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
dir_name |
str |
Yes | — | Prefix to list. |
date |
datetime |
No | None |
Only return objects newer than this timestamp. |
Outputs a list of object keys.
rename_object(**kwargs)
Copy + delete convenience.
s3.rename_object(old_file_name="logs/old.txt", new_file_name="logs/new.txt")
| Kwarg | Type | Required | Description |
|---|---|---|---|
old_file_name |
str |
Yes | Existing key. |
new_file_name |
str |
Yes | New key. |
delete(**kwargs)
Remove an object.
s3.delete(s3_path="archives/data.bin")
| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Key to delete. |
Returns the boto3 delete_object response.
🧪 Local Development & Testing
pipenv install --dev
# lint & type check
pipenv run lint
pipenv run type-check
# unit tests (moto-backed)
pipenv run test
# integration tests (LocalStack S3 via docker-compose)
pipenv run integrations
Set S3_ENDPOINT=http://localhost:4566 plus fake AWS credentials when using LocalStack manually.
📚 Additional Resources
- Consumer-facing playbook:
.agents/.AGENTS.md - Contributor guide & automation notes:
.agents/CODEX.md
🤝 Contributing
Issues and pull requests are welcome! Please run the full quality gate before opening a PR:
pipenv run lint
pipenv run type-check
pipenv run test
pipenv run integrations
📄 License
Apache License 2.0 – see LICENSE for details.
Built to keep S3 integrations predictable, event-driven, and schema-friendly. EOF
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daplug_s3-1.0.0b1.tar.gz.
File metadata
- Download URL: daplug_s3-1.0.0b1.tar.gz
- Upload date:
- Size: 21.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0376044499992e3c34c7bc4f24bbaeefd6ad03c8ce09a129092d857a477655b1
|
|
| MD5 |
5d879f5c18ce8e32294b6a85ff876149
|
|
| BLAKE2b-256 |
4c820a20060e98c87c99fe74d095ff4cbda6549df4b109e00bbac624925c116b
|
File details
Details for the file daplug_s3-1.0.0b1-py3-none-any.whl.
File metadata
- Download URL: daplug_s3-1.0.0b1-py3-none-any.whl
- Upload date:
- Size: 20.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8d0585eb4780dde927561718537d4f971509ed9382f892c7114116d174e802f
|
|
| MD5 |
476cef1e959c100e9d3c2508c7ec9c6b
|
|
| BLAKE2b-256 |
82544e1e9e13d0634b05f7297ed78d05136653304f8e0519905af672527ba3a7
|