Library for managing data exchange with manifests and shards via S3.
Project description
s3exchange
A Python library for service-wise namespace management of S3 artifacts with manifest-based data exchange, supporting both loose objects and shard archives.
Installation
uv add s3exchange
Quick Start
import boto3
from s3exchange import S3ExchangeStore
# Initialize store
s3_client = boto3.client('s3', endpoint_url='http://garage:3900')
store = S3ExchangeStore(
s3_client=s3_client,
bucket='my-bucket',
base_prefix='prod', # Optional: prefix all keys
default_vars={'service_name': 'training-service'},
)
Key Features
Reading Objects
Read a single object:
stream = store.get_object("training/123/samples/file.wav")
data = stream.read()
stream.close()
Read from a manifest (lazy iteration):
# Iterate over all objects in a manifest
for stream, entry in store.iter_objects("training/123/samples/manifest.jsonl"):
print(f"Reading {entry['key']}")
data = stream.read()
# Process data...
stream.close()
Read manifest entries without fetching objects:
for entry in store.iter_manifest_entries("training/123/samples/manifest.jsonl"):
print(f"Entry: {entry['kind']}, Key: {entry.get('key', entry.get('archive_key'))}")
Writing Objects
Put a single object:
entry = store.put_object(
key="training/123/samples/file.wav",
data=b"audio data...",
id="file-001",
meta={"sr": 16000, "length": 15342},
content_type="audio/wav",
)
# Returns a FileEntry for the manifest
Put from file path:
entry = store.put_object(
key="training/123/samples/file.wav",
data="/path/to/file.wav", # Path string or Path object
id="file-001",
)
Writing Manifests
Overwrite mode (simple):
entries = [
{"kind": "file", "key": "training/123/samples/file1.wav", "id": "001"},
{"kind": "file", "key": "training/123/samples/file2.wav", "id": "002"},
]
store.write_manifest(
key="training/123/samples/manifest.jsonl",
entries=entries,
mode="overwrite",
)
Append parts mode (recommended for updates):
# Add new entries without rewriting entire manifest
new_entries = [
{"kind": "file", "key": "training/123/samples/file3.wav", "id": "003"},
]
store.write_manifest(
key="training/123/samples/manifest.jsonl",
entries=new_entries,
mode="append_parts", # Creates a part file and updates root manifest
)
Shard Archives
Shard archives are tar/tar.gz files containing multiple files with an internal manifest.
Create and upload shards:
from s3exchange import Shard
# Prepare items for sharding
items = [
{
"source": "/path/to/file1.wav",
"member_path": "0001.wav",
"id": "0001",
"meta": {"sr": 16000},
"size_bytes": 123456,
},
{
"source": "/path/to/file2.wav",
"member_path": "0002.wav",
"id": "0002",
"meta": {"sr": 16000},
"size_bytes": 234567,
},
# ... more items
]
# Split into shards (max 10000 entries or 1GB per shard)
shard_batches = Shard.split_items(
items,
max_entries=10000,
max_bytes=1024 * 1024 * 1024, # 1 GB
)
# Upload each shard
shard_entries = []
for i, batch in enumerate(shard_batches):
archive_key = f"training/123/samples/shards/shard-{i:05d}.tar.gz"
shard = store.put_shard_archive(
archive_key=archive_key,
shard_items=batch,
format="tar",
compression="gzip",
)
shard_entries.append(shard.entry) # Extract ShardEntry from Shard object
# Write shard entries to manifest
store.put_sharded(
manifest_key="training/123/samples/manifest.jsonl",
shard_entries=shard_entries,
update_mode="append_parts",
)
Read from shards (automatically expanded):
# Shards are automatically expanded when iterating objects
for stream, entry in store.iter_objects("training/123/samples/manifest.jsonl"):
# For shard members, entry includes:
# - entry['archive_key']: The shard archive key
# - entry['member_path']: Path inside the archive
# - entry['key']: Virtual key like "archive.tar.gz#member.wav"
print(f"Reading from shard: {entry['archive_key']}, member: {entry['member_path']}")
data = stream.read()
stream.close()
Deletion Operations
Delete a single object:
store.delete_key("training/123/samples/file.wav")
Delete by prefix:
# Delete all objects with prefix, optionally filtered by regex
count = store.delete_prefix(
prefix="training/123/samples/",
regex=r".*\.wav$", # Only delete .wav files
)
print(f"Deleted {count} objects")
Delete by manifest (recursive):
# Deletes all objects/shards referenced in manifest, plus manifest files themselves
report = store.delete_manifest(
manifest="training/123/samples/manifest.jsonl",
delete_manifests=True, # Also delete manifest files
dedupe=True, # Avoid double-deletion
)
print(f"Deleted {report['deleted_object_count']} objects")
print(f"Deleted {report['deleted_archive_count']} archives")
print(f"Deleted {report['deleted_manifest_count']} manifests")
Listing Operations
List S3 keys:
for key in store.list_keys(prefix="training/123/"):
print(key)
List files in manifest:
# List all file entries (including shard members if include_shards=True)
for entry in store.list_manifest_files(
manifest="training/123/samples/manifest.jsonl",
include_shards=True,
):
print(f"File: {entry.get('key', entry.get('member_path'))}")
Filter by prefix:
# Filter manifest entries by prefix (works for shard members too)
for entry in store.list_by_manifest_prefix(
manifest="training/123/samples/manifest.jsonl",
prefix_filter="training/123/samples/000",
):
print(entry['key'])
Manifest Compaction
Flatten a manifest with many parts into a single clean manifest:
report = store.compact_manifest(
src_manifest_key="training/123/samples/manifest.jsonl",
dst_manifest_key="training/123/samples/manifest-compact.jsonl",
resolve_refs=True, # Resolve all manifest_ref entries
expand_shards=False, # Keep shard entries (set True to expand into files)
)
print(f"Compacted {report['total_entries']} entries")
Manifest Format
Manifests are JSONL files (one JSON object per line):
File Entry
{"kind":"file","key":"training/123/samples/0001.wav","id":"0001","meta":{"sr":16000},"size_bytes":123456}
Shard Entry
{"kind":"shard","archive_key":"training/123/samples/shards/shard-00001.tar.gz","format":"tar","compression":"gzip","internal_manifest_path":"__manifest__.jsonl","count":10000,"size_bytes":987654321}
Manifest Reference Entry
{"kind":"manifest_ref","key":"training/123/samples/manifests/part-00001.jsonl"}
Error Handling
The library provides domain-specific exceptions:
from s3exchange import (
ObjectNotFoundError,
ManifestNotFoundError,
MissingPlaceholderError,
InvalidManifestError,
ShardReadError,
)
try:
stream = store.get_object("nonexistent.wav")
except ObjectNotFoundError as e:
print(f"Object not found: {e.key}")
try:
key = store._resolve_key("training/{job_id}/samples", {}) # Missing job_id
except MissingPlaceholderError as e:
print(f"Missing placeholder: {e.placeholder}")
Type Hints
The library is fully typed with Python 3.12 type annotations. All types are available for import:
from s3exchange import (
ManifestEntry,
FileEntry,
ShardEntry,
ShardItem,
DeleteReport,
CompactReport,
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file s3exchange-0.1.1.tar.gz.
File metadata
- Download URL: s3exchange-0.1.1.tar.gz
- Upload date:
- Size: 67.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fba1a3b9d8f362bd18ee284c518a5cb20f38071180c6d616a45588c0c8751f6c
|
|
| MD5 |
f6b21802cf8402628e4f364c9be8ffa0
|
|
| BLAKE2b-256 |
19559ea23760d9ce71d12a270e2b748263af8b5405bcd1f5e727b78360958269
|
File details
Details for the file s3exchange-0.1.1-py3-none-any.whl.
File metadata
- Download URL: s3exchange-0.1.1-py3-none-any.whl
- Upload date:
- Size: 30.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d9d0f687b80dabc3654bf69b9c0495e7539d26da6515a44b724a6aa264bf851
|
|
| MD5 |
a82a203eb621853dd1e0e1e6d5c318d4
|
|
| BLAKE2b-256 |
5baac5314d2f7886de146b6c4c2a5f1b2fa2bc8786e01b322b4368ba5b5963bd
|