Cloud-agnostic abstraction for storage, messaging, document databases, cache, secrets, and pub/sub
Project description
cloudrift
Cloud-agnostic abstraction for storage, messaging, document databases, and cache — built for Lyzr microservices.
- Async-first. Every public method is
async def. All four categories use native-async SDK clients (aioboto3,azure.*.aio,motor,redis.asyncio) — no thread-pool wrapping. - Drop-in providers. Same interface across AWS, Azure, and self-hosted backends. Swap
s3↔azure_blob(orsqs↔azure_bus,documentdb↔cosmos,redis↔elasticache↔azure_redis) by changing one string. - Multiple auth methods per provider. Static keys, IAM roles, profiles, managed identity, service principals, SAS tokens, mTLS, IAM auth — pick what your microservice already has.
| Category | AWS | Azure | Self-hosted |
|---|---|---|---|
| Storage | S3 | Blob Storage | — |
| Messaging | SQS | Service Bus | — |
| Document DB | DocumentDB | Cosmos DB (Core/SQL) | — |
| Cache | ElastiCache | Azure Cache for Redis | Redis |
Install
Pick the extras your service needs:
pip install "cloudrift[aws]" # S3 + SQS + DocumentDB + Redis client
pip install "cloudrift[azure]" # Blob + Service Bus + Cosmos + Redis client
pip install "cloudrift[cache]" # Just Redis (any flavour)
pip install "cloudrift[all]" # Everything
Python 3.11+.
Quick start
Every backend is constructed via a factory function and held for the lifetime of the service. Reuse one instance per resource — the underlying client is connection-pooled.
from cloudrift.storage import get_storage
# Construct once at startup
storage = get_storage(
"s3",
bucket="my-bucket",
aws_access_key_id="AKIA...",
aws_secret_access_key="...",
region="us-east-1",
)
# Use anywhere
await storage.upload("docs/hello.txt", b"hello world", content_type="text/plain")
data = await storage.download("docs/hello.txt")
url = await storage.presigned_url("docs/hello.txt", expires_in=3600)
# Release sockets at shutdown
await storage.close()
Or as an async context manager (auto-close):
async with get_storage("s3", bucket="b", region="us-east-1") as storage:
await storage.upload("k", b"v")
Microservice integration
Configuration via env vars
Pick the provider per environment with a single env var:
import os
from cloudrift.storage import get_storage
storage = get_storage(
os.environ["STORAGE_PROVIDER"], # "s3" in prod, "azure_blob" in dev
**{
k.lower().removeprefix("storage_"): v
for k, v in os.environ.items()
if k.startswith("STORAGE_") and k != "STORAGE_PROVIDER"
},
)
Storage
from cloudrift.storage import get_storage
# AWS S3
s3 = get_storage("s3", bucket="b", region="us-east-1") # IAM role
s3 = get_storage("s3", bucket="b", aws_access_key_id="...", # static keys
aws_secret_access_key="...", region="us-east-1")
s3 = get_storage("s3", bucket="b", profile_name="dev") # ~/.aws/credentials
# Azure Blob
blob = get_storage("azure_blob", connection_string="...", container="c")
blob = get_storage("azure_blob", account_url="https://acct.blob.core.windows.net",
account_key="...", container="c")
blob = get_storage("azure_blob", account_url="...", sas_token="...", container="c")
blob = get_storage("azure_blob", account_url="...", container="c") # managed identity
blob = get_storage("azure_blob", account_url="...", container="c",
tenant_id="...", client_id="...", client_secret="...") # service principal
Operations — same on every backend:
await storage.upload(key, data, content_type="application/json")
data: bytes = await storage.download(key)
await storage.delete(key)
exists: bool = await storage.exists(key)
keys: list[str] = await storage.list(prefix="logs/")
url: str = await storage.presigned_url(key, expires_in=3600)
await storage.close()
Messaging
from cloudrift.messaging import get_queue
# AWS SQS
sqs = get_queue("sqs", queue_url="https://sqs.us-east-1.amazonaws.com/.../q",
region="us-east-1")
# Azure Service Bus
bus = get_queue("azure_bus", connection_string="...", queue_name="my-queue")
bus = get_queue("azure_bus", fully_qualified_namespace="ns.servicebus.windows.net",
queue_name="my-queue") # managed identity
Operations:
msg_id = await queue.send({"action": "process", "id": 42}, delay=0)
ids = await queue.send_batch([{"n": 1}, {"n": 2}])
messages = await queue.receive(max_messages=10, wait_time=20) # long-poll
for m in messages:
handle_job(m.body)
await queue.delete(m.receipt_handle) # ack (SQS only — see below)
await queue.purge()
await queue.close()
Azure Service Bus note:
delete(receipt_handle)raisesNotImplementedErrorbecause Service Bus completes messages via the receiver's lock token, not by handle. Until the abstraction is reworked, complete messages inside a custom receiver loop usingazure-servicebusdirectly, or use thepurge()helper.
Document Database
from cloudrift.document import get_mongodb
# AWS DocumentDB (MongoDB-compatible)
db = get_mongodb(
"documentdb",
uri="mongodb://user:pass@cluster.docdb.amazonaws.com:27017/?tls=true",
database="lyzr",
tls_ca_file="/etc/ssl/rds-ca-bundle.pem",
max_pool_size=200,
)
# Azure Cosmos DB (Core/SQL API)
cdb = get_mongodb("cosmos", connection_string="...", database="lyzr")
cdb = get_mongodb("cosmos", url="https://acct.documents.azure.com:443/",
account_key="...", database="lyzr")
Operations (MongoDB-style on both):
doc_id = await db.insert_one("users", {"name": "Alice", "age": 30})
ids = await db.insert_many("events", [{"v": 1}, {"v": 2}])
doc = await db.find_one("users", {"name": "Alice"})
docs = await db.find("events", {"v": {"$gte": 1}}, limit=100, skip=0)
modified = await db.update_one("users", {"_id": doc_id}, {"$set": {"age": 31}})
deleted = await db.delete_many("events", {"v": 1})
total = await db.count("users", {"age": {"$gte": 18}})
await db.close()
Cache
from cloudrift.cache import get_cache
# Self-hosted Redis
cache = get_cache("redis", "from_url", url="redis://localhost:6379/0")
cache = get_cache("redis", "from_credentials",
host="redis.internal", port=6379, password="...", db=0)
# AWS ElastiCache
cache = get_cache("elasticache", "from_auth_token",
host="my-cluster.cache.amazonaws.com", auth_token="...")
cache = get_cache("elasticache", "from_iam_auth",
host="my-cluster.cache.amazonaws.com",
username="lyzr-app", region="us-east-1") # SigV4 + auto-refresh
# Azure Cache for Redis
cache = get_cache("azure_redis", "from_access_key",
host="my-cache.redis.cache.windows.net", access_key="...")
cache = get_cache("azure_redis", "from_managed_identity",
host="my-cache.redis.cache.windows.net", username="lyzr-app")
Operations — KV, hash, list, counters:
await cache.set("session:abc", b"data", ttl=3600)
value: bytes | None = await cache.get("session:abc")
await cache.delete("session:abc")
await cache.hset("user:1", "name", "Alice")
fields = await cache.hgetall("user:1")
await cache.lpush("jobs", "job-1", "job-2")
batch = await cache.lrange("jobs", 0, 99)
count = await cache.incr("hits:home")
ok = await cache.ping()
await cache.close()
Connection pooling & lifecycle
Every backend holds one long-lived async client that is reused across all operations. This is the single biggest perf knob:
- Don't call
get_storage(...)inside a request handler. - Do construct it once at app startup and share it (e.g.
app.state.storage, FastAPI dependency, or module-level singleton).
Pool sizes are configurable per backend:
get_storage("s3", bucket="b", region="us-east-1",
max_pool_connections=100, connect_timeout=5.0, read_timeout=30.0)
get_mongodb("documentdb", uri="...", database="db",
max_pool_size=200, min_pool_size=10)
Always release sockets on shutdown with await backend.close() — or wrap the whole lifetime in async with.
Errors
All backends raise from a single hierarchy under cloudrift.core.exceptions:
from cloudrift.core.exceptions import (
ObjectNotFoundError, StoragePermissionError, StorageError,
QueueNotFoundError, MessageSendError, MessagingError,
DocumentNotFoundError, DocumentConnectionError, DocumentError,
CacheKeyNotFoundError, CacheConnectionError, CacheError,
)
try:
await storage.download("missing.txt")
except ObjectNotFoundError:
...
Provider-specific exceptions (e.g. botocore.ClientError, azure.core.exceptions.HttpResponseError) are translated to the cloudrift hierarchy at the boundary.
Testing
The dev extra ships moto + a Motor mock so unit tests don't need real cloud credentials:
pip install "cloudrift[dev]"
pytest
For local integration testing of the AWS backends, the suite uses ThreadedMotoServer (LocalStack-style in-process mock) — see tests/test_storage.py for the pattern. Azure backends are tested against Azurite / Service Bus / Cosmos emulators (configure endpoint via the relevant *_url kwarg).
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lyzr_cloudrift-0.1.2.tar.gz.
File metadata
- Download URL: lyzr_cloudrift-0.1.2.tar.gz
- Upload date:
- Size: 189.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46e9ac70864f0fb1afecbe4e53f79e01ca2b0ddf53899bc9e380bf75dc95581a
|
|
| MD5 |
e06b1cfc04e7945255e875068dc8594a
|
|
| BLAKE2b-256 |
2feb2e01401400048d7585e88c3f73502f2abb54fc56586d08ba016a9fa6648c
|
Provenance
The following attestation bundles were made for lyzr_cloudrift-0.1.2.tar.gz:
Publisher:
release.yml on NeuralgoLyzr/lyzr-cloudrift
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lyzr_cloudrift-0.1.2.tar.gz -
Subject digest:
46e9ac70864f0fb1afecbe4e53f79e01ca2b0ddf53899bc9e380bf75dc95581a - Sigstore transparency entry: 1439111568
- Sigstore integration time:
-
Permalink:
NeuralgoLyzr/lyzr-cloudrift@91bbb2f8f1a3734ff34acf372681688cf911f5af -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/NeuralgoLyzr
-
Access:
internal
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@91bbb2f8f1a3734ff34acf372681688cf911f5af -
Trigger Event:
push
-
Statement type:
File details
Details for the file lyzr_cloudrift-0.1.2-py3-none-any.whl.
File metadata
- Download URL: lyzr_cloudrift-0.1.2-py3-none-any.whl
- Upload date:
- Size: 46.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
292de5ef20d11b7603bf7950c15acf60409fee7928a43f9e5642063b40938e23
|
|
| MD5 |
35430a545bb340b060f25df42bdf94f7
|
|
| BLAKE2b-256 |
75637cb485fe1c5b649ae50f8e09e713fcc81d7e97bec93601e22d6f837de1b3
|
Provenance
The following attestation bundles were made for lyzr_cloudrift-0.1.2-py3-none-any.whl:
Publisher:
release.yml on NeuralgoLyzr/lyzr-cloudrift
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lyzr_cloudrift-0.1.2-py3-none-any.whl -
Subject digest:
292de5ef20d11b7603bf7950c15acf60409fee7928a43f9e5642063b40938e23 - Sigstore transparency entry: 1439111574
- Sigstore integration time:
-
Permalink:
NeuralgoLyzr/lyzr-cloudrift@91bbb2f8f1a3734ff34acf372681688cf911f5af -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/NeuralgoLyzr
-
Access:
internal
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@91bbb2f8f1a3734ff34acf372681688cf911f5af -
Trigger Event:
push
-
Statement type: