Skip to main content

Cloud-agnostic abstraction for storage, messaging, document databases, cache, secrets, and pub/sub

Project description

cloudrift

Cloud-agnostic abstraction for storage, messaging, document databases, and cache — built for Lyzr microservices.

  • Async-first. Every public method is async def. All four categories use native-async SDK clients (aioboto3, azure.*.aio, motor, redis.asyncio) — no thread-pool wrapping.
  • Drop-in providers. Same interface across AWS, Azure, and self-hosted backends. Swap s3azure_blob (or sqsazure_bus, documentdbcosmos, rediselasticacheazure_redis) by changing one string.
  • Multiple auth methods per provider. Static keys, IAM roles, profiles, managed identity, service principals, SAS tokens, mTLS, IAM auth — pick what your microservice already has.
Category AWS Azure Self-hosted
Storage S3 Blob Storage
Messaging SQS Service Bus
Document DB DocumentDB Cosmos DB (MongoDB API)
Cache ElastiCache Azure Cache for Redis Redis

Install

Pick the extras your service needs:

pip install "cloudrift[aws]"          # S3 + SQS + DocumentDB + Redis client
pip install "cloudrift[azure]"        # Blob + Service Bus + Cosmos + Redis client
pip install "cloudrift[cache]"        # Just Redis (any flavour)
pip install "cloudrift[all]"          # Everything

Python 3.11+.


Quick start

Every backend is constructed via a factory function and held for the lifetime of the service. Reuse one instance per resource — the underlying client is connection-pooled.

from cloudrift.storage import get_storage

# Construct once at startup
storage = get_storage(
    "s3",
    bucket="my-bucket",
    aws_access_key_id="AKIA...",
    aws_secret_access_key="...",
    region="us-east-1",
)

# Use anywhere
await storage.upload("docs/hello.txt", b"hello world", content_type="text/plain")
data = await storage.download("docs/hello.txt")
url = await storage.presigned_url("docs/hello.txt", expires_in=3600)

# Release sockets at shutdown
await storage.close()

Or as an async context manager (auto-close):

async with get_storage("s3", bucket="b", region="us-east-1") as storage:
    await storage.upload("k", b"v")

Microservice integration

Configuration via env vars

Pick the provider per environment with a single env var:

import os
from cloudrift.storage import get_storage

storage = get_storage(
    os.environ["STORAGE_PROVIDER"],   # "s3" in prod, "azure_blob" in dev
    **{
        k.lower().removeprefix("storage_"): v
        for k, v in os.environ.items()
        if k.startswith("STORAGE_") and k != "STORAGE_PROVIDER"
    },
)

Storage

from cloudrift.storage import get_storage

# AWS S3
s3 = get_storage("s3", bucket="b", region="us-east-1")                       # IAM role
s3 = get_storage("s3", bucket="b", aws_access_key_id="...",                  # static keys
                 aws_secret_access_key="...", region="us-east-1")
s3 = get_storage("s3", bucket="b", profile_name="dev")                       # ~/.aws/credentials

# Azure Blob
blob = get_storage("azure_blob", connection_string="...", container="c")
blob = get_storage("azure_blob", account_url="https://acct.blob.core.windows.net",
                   account_key="...", container="c")
blob = get_storage("azure_blob", account_url="...", sas_token="...", container="c")
blob = get_storage("azure_blob", account_url="...", container="c")           # managed identity
blob = get_storage("azure_blob", account_url="...", container="c",
                   tenant_id="...", client_id="...", client_secret="...")    # service principal

Operations — same on every backend:

await storage.upload(key, data, content_type="application/json")
data: bytes = await storage.download(key)
await storage.delete(key)
exists: bool = await storage.exists(key)
keys: list[str] = await storage.list(prefix="logs/")
url: str = await storage.presigned_url(key, expires_in=3600)
await storage.close()

Messaging

from cloudrift.messaging import get_queue

# AWS SQS
sqs = get_queue("sqs", queue_url="https://sqs.us-east-1.amazonaws.com/.../q",
                region="us-east-1")

# Azure Service Bus
bus = get_queue("azure_bus", connection_string="...", queue_name="my-queue")
bus = get_queue("azure_bus", fully_qualified_namespace="ns.servicebus.windows.net",
                queue_name="my-queue")  # managed identity

Operations:

msg_id = await queue.send({"action": "process", "id": 42}, delay=0)
ids = await queue.send_batch([{"n": 1}, {"n": 2}])

messages = await queue.receive(max_messages=10, wait_time=20)   # long-poll
for m in messages:
    handle_job(m.body)
    await queue.delete(m.receipt_handle)   # ack (SQS only — see below)

await queue.purge()
await queue.close()

Azure Service Bus note: delete(receipt_handle) raises NotImplementedError because Service Bus completes messages via the receiver's lock token, not by handle. Until the abstraction is reworked, complete messages inside a custom receiver loop using azure-servicebus directly, or use the purge() helper.


Document Database

get_mongodb(...) returns a configured Motor AsyncIOMotorClient. Both providers speak the MongoDB wire protocol — AWS DocumentDB natively, Azure Cosmos via its MongoDB-API endpoint — so the caller uses Motor's API directly:

from cloudrift.document import get_mongodb

# AWS DocumentDB (MongoDB-compatible)
client = get_mongodb(
    "documentdb",
    uri="mongodb://user:pass@cluster.docdb.amazonaws.com:27017/?tls=true",
    tls_ca_file="/etc/ssl/rds-ca-bundle.pem",
    max_pool_size=200,
)

# Azure Cosmos DB (MongoDB API)
client = get_mongodb("cosmos", connection_string="mongodb://...")
client = get_mongodb("cosmos", account="myacct", account_key="...")

Operations — full Motor / pymongo surface, no wrappers:

db = client["lyzr"]
users = db["users"]

result = await users.insert_one({"name": "Alice", "age": 30})
doc_id = result.inserted_id

doc = await users.find_one({"name": "Alice"})
async for u in users.find({"age": {"$gte": 18}}).skip(0).limit(100):
    ...

await users.update_one({"_id": doc_id}, {"$set": {"age": 31}})
await db["events"].delete_many({"v": 1})
total = await users.count_documents({"age": {"$gte": 18}})

# bulk writes, aggregations, change streams, transactions, GridFS — all
# of Motor is available; nothing is hidden behind a wrapper.

client.close()

Cosmos auth note. Cosmos for MongoDB (RU) is keys-only at the wire protocol layer — Azure AD tokens are not accepted. Use the connection string from the portal or the account name + account key. Earlier versions of cloudrift exposed managed-identity / service-principal factories for Cosmos that called the SQL API; those have been removed in favour of a single Motor-based path.


Cache

from cloudrift.cache import get_cache

# Self-hosted Redis
cache = get_cache("redis", "from_url", url="redis://localhost:6379/0")
cache = get_cache("redis", "from_credentials",
                  host="redis.internal", port=6379, password="...", db=0)

# AWS ElastiCache
cache = get_cache("elasticache", "from_auth_token",
                  host="my-cluster.cache.amazonaws.com", auth_token="...")
cache = get_cache("elasticache", "from_iam_auth",
                  host="my-cluster.cache.amazonaws.com",
                  username="lyzr-app", region="us-east-1")  # SigV4 + auto-refresh

# Azure Cache for Redis
cache = get_cache("azure_redis", "from_access_key",
                  host="my-cache.redis.cache.windows.net", access_key="...")
cache = get_cache("azure_redis", "from_managed_identity",
                  host="my-cache.redis.cache.windows.net", username="lyzr-app")

Operations — KV, hash, list, counters:

await cache.set("session:abc", b"data", ttl=3600)
value: bytes | None = await cache.get("session:abc")
await cache.delete("session:abc")

await cache.hset("user:1", "name", "Alice")
fields = await cache.hgetall("user:1")

await cache.lpush("jobs", "job-1", "job-2")
batch = await cache.lrange("jobs", 0, 99)

count = await cache.incr("hits:home")
ok = await cache.ping()
await cache.close()

Connection pooling & lifecycle

Every backend holds one long-lived async client that is reused across all operations. This is the single biggest perf knob:

  • Don't call get_storage(...) inside a request handler.
  • Do construct it once at app startup and share it (e.g. app.state.storage, FastAPI dependency, or module-level singleton).

Pool sizes are configurable per backend:

get_storage("s3", bucket="b", region="us-east-1",
            max_pool_connections=100, connect_timeout=5.0, read_timeout=30.0)

get_mongodb("documentdb", uri="...",
            max_pool_size=200, min_pool_size=10)

Always release sockets on shutdown with await backend.close() — or wrap the whole lifetime in async with.


Errors

All backends raise from a single hierarchy under cloudrift.core.exceptions:

from cloudrift.core.exceptions import (
    ObjectNotFoundError, StoragePermissionError, StorageError,
    QueueNotFoundError, MessageSendError, MessagingError,
    DocumentConnectionError,
    CacheKeyNotFoundError, CacheConnectionError, CacheError,
)

try:
    await storage.download("missing.txt")
except ObjectNotFoundError:
    ...

Provider-specific exceptions (e.g. botocore.ClientError, azure.core.exceptions.HttpResponseError) are translated to the cloudrift hierarchy at the boundary. The document layer is the exception: get_mongodb(...) returns a Motor client and any operation errors propagate as native pymongo exceptions (e.g. pymongo.errors.OperationFailure, DuplicateKeyError). Connect-time failures still surface as DocumentConnectionError.


Testing

The dev extra ships moto and fakeredis so unit tests don't need real cloud credentials:

pip install "cloudrift[dev]"
pytest

For local integration testing of the AWS backends, the suite uses ThreadedMotoServer (LocalStack-style in-process mock) — see tests/test_storage.py for the pattern. Azure backends are tested against Azurite / Service Bus emulators (configure endpoint via the relevant *_url kwarg). For DocumentDB and Cosmos (MongoDB API), tests/test_document.py covers connection construction; for live integration smoke tests, see scripts/test_cosmos_*.py.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lyzr_cloudrift-0.2.0.tar.gz (187.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lyzr_cloudrift-0.2.0-py3-none-any.whl (43.4 kB view details)

Uploaded Python 3

File details

Details for the file lyzr_cloudrift-0.2.0.tar.gz.

File metadata

  • Download URL: lyzr_cloudrift-0.2.0.tar.gz
  • Upload date:
  • Size: 187.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for lyzr_cloudrift-0.2.0.tar.gz
Algorithm Hash digest
SHA256 55fce9295dc3b78c807fbe4d965fa8b3e264f511811b9abfa4353334490d493e
MD5 d724b5925875dd1974798e3fa696e1c3
BLAKE2b-256 8c9cbd86ec73fa7ae8efccd5aed3b65cafa93026440c26d0e9a448cc2298635f

See more details on using hashes here.

Provenance

The following attestation bundles were made for lyzr_cloudrift-0.2.0.tar.gz:

Publisher: release.yml on NeuralgoLyzr/lyzr-cloudrift

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file lyzr_cloudrift-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: lyzr_cloudrift-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 43.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for lyzr_cloudrift-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 61f6b1ea0f9326162426b046713e4d47ea919c3c5579600c7bb41dc7de1ea5e3
MD5 58c727724f22f6a0b641b6c7b713df2f
BLAKE2b-256 c2666652141580a4bc3a7a7d95c97191075397c09d042337c35b9e2c8afecf04

See more details on using hashes here.

Provenance

The following attestation bundles were made for lyzr_cloudrift-0.2.0-py3-none-any.whl:

Publisher: release.yml on NeuralgoLyzr/lyzr-cloudrift

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page