Skip to main content

Stateful FastAPI primitives — rate limit, distributed lock, idempotency, session, cache, dedup, circuit breaker, feature flags, OTP — sharing one Backend protocol.

Project description

wyolet-primitives

Stateful ASGI primitives — rate limit, distributed lock, idempotency, session, user-session, cache, dedup, circuit breaker, feature flags, OTP — sharing one pluggable Backend. Memory, SQLite, Redis, Valkey, Postgres.

First package under the wyolet namespace.

uv add wyolet-primitives                 # core (memory backend)
uv add "wyolet-primitives[sqlite]"       # + SQLiteBackend
uv add "wyolet-primitives[redis]"        # + RedisBackend
uv add "wyolet-primitives[valkey]"       # + ValkeyBackend
uv add "wyolet-primitives[postgres]"     # + PostgresBackend + PostgresUserSessionRepo
uv add "wyolet-primitives[cache]"        # + orjson JSON serializer

Why

FastAPI projects keep reinventing the same primitives. Existing packages are each scoped to one — slowapi for rate limiting, redlock-py for locks, cashews for caching — each with its own storage abstraction, lifecycle, and framework coupling. wyolet.primitives unifies them: implement one Backend, get every primitive that needs state. All middlewares are pure ASGI — FastAPI, Starlette, Litestar, BlackSheep, Quart all work.

Primitives

Module What Needs
ratelimit FixedWindow strategy + ASGI middleware with key extractors (IP, header, cookie, scope) and standard X-RateLimit-* headers. any Backend
lock DistributedLock with TTL, fencing-safe release (compare-and-delete), optional auto-renew. any Backend
idempotency Idempotency-Key header middleware. Caches original response, serializes concurrent replays via lock, never caches 5xx. any Backend
session Ephemeral cookie-session. Flat payload, dirty-tracking, regenerate() for fixation defense, pure-ASGI signed-cookie middleware. any Backend
user_session Device/login records (user_id, ua, ip, os, device, …). Pluggable repo + cache-aside wrapper + service. Ship-your-own schema via pydantic subclassing. UserSessionRepo impl (Postgres shipped)
cache Cache class + @cached decorator with stampede protection via lock. HTTPCacheMiddleware for response-level caching (honors Cache-Control, explicit Vary). Pluggable serializers (PickleSerializer, JSONSerializer via orjson). any Backend
dedup claim() / once() — atomic "have I seen this ID?" for webhook receivers, queue consumers, cron-once-across-replicas. any Backend
circuit_breaker Stateless per-process breaker. Three states (closed/open/half-open) with single-probe recovery. Decorator, context-manager, and .call(fn) forms.
feature_flags Booleans + percentage rollout + allow/block lists with in-process TTL cache. Deterministic per-subject bucketing, salted by flag name. any Backend
otp Issue / verify single-use codes. Hashed storage, constant-time compare, attempt limits, resend cooldown. any Backend

Backends

TTL Locks Incr Notes
MemoryBackend asyncio-safe, monotonic-clock TTL, lazy eviction + sweep. Dev / test / single-process.
SQLiteBackend via aiosqlite. WAL mode, single-connection + lock. For single-node deploys and local dev without docker.
RedisBackend over redis.asyncio.Redis. Tiny Lua script for compare-and-delete.
ValkeyBackend over valkey.asyncio.Valkey. Drop-in for Redis.
PostgresBackend over psycopg_pool.AsyncConnectionPool. TIMESTAMPTZ server-clock TTL, upsert-based set_nx / incr. For teams with Postgres but no Redis.

Bring-your-own: subclass wyolet.primitives.backends.base.Backend (ABC), implement get / set / delete / incr / expire / set_nx / compare_and_delete. Every primitive works on it.

Quickstart — FastAPI

from fastapi import FastAPI
from redis.asyncio import Redis

from wyolet.primitives.backends.redis import RedisBackend
from wyolet.primitives.session import SessionMiddleware
from wyolet.primitives.ratelimit import FixedWindow, RateLimitMiddleware, compose, from_scope, from_ip
from wyolet.primitives.idempotency import IdempotencyMiddleware

backend = RedisBackend(Redis.from_url("redis://localhost"))
app = FastAPI()

# Rate limit: prefer user_id if upstream middleware set it, else client IP
app.add_middleware(
    RateLimitMiddleware,
    strategy=FixedWindow(backend, limit=100, window=60),
    key=compose(from_scope("user_id"), from_ip()),
)

# Idempotent POST/PUT/PATCH/DELETE on `Idempotency-Key` header
app.add_middleware(IdempotencyMiddleware, backend=backend, ttl=86400)

# Signed-cookie session
app.add_middleware(
    SessionMiddleware,
    backend=backend,
    secret_key="change-me",
    cookie_name="sid",
    max_age=14 * 24 * 3600,
)

@app.post("/orders")
async def create_order(request):
    session = request.scope["session"]
    session["last_order"] = "ord-123"  # auto-saved at response
    ...

Distributed lock

from wyolet.primitives.lock import DistributedLock, LockAcquireError

lock = DistributedLock(backend, ttl=30, wait=5.0)

async with lock.acquire(f"order:{order_id}"):
    await process_order(order_id)  # auto-renewed while this runs

Fencing is handled for you: if the TTL expires and another holder takes over, the original holder's release() returns False without deleting the new holder's key.

Cache

from wyolet.primitives.cache import Cache, cached, HTTPCacheMiddleware

cache = Cache(backend, ttl=300)

@cached(cache, ttl=60)
async def get_user(user_id: str) -> dict:
    return await db.fetch_one(...)  # 1000 concurrent misses → 1 DB call

# Or response-level caching:
app.add_middleware(HTTPCacheMiddleware, backend=backend, ttl=60, vary=("accept-encoding",))

Dedup

from wyolet.primitives.dedup import Dedup

dedup = Dedup(backend, ttl=3 * 24 * 3600)  # Stripe retries for 3 days

@app.post("/webhook/stripe")
async def stripe_webhook(event: dict):
    async with dedup.once(f"stripe:{event['id']}") as first:
        if not first:
            return {"status": "duplicate"}
        await process(event)  # rolled back on exception
        return {"status": "ok"}

Circuit breaker

from wyolet.primitives.circuit_breaker import CircuitBreaker, CircuitOpenError

stripe_breaker = CircuitBreaker(fail_threshold=5, recovery_timeout=30)

@stripe_breaker.guard
async def charge(amount: int):
    return await stripe.charge(amount)

try:
    await charge(1000)
except CircuitOpenError:
    return {"error": "payment provider unavailable"}  # fail fast, don't pile on

Feature flags

from wyolet.primitives.feature_flags import FeatureFlags

flags = FeatureFlags(backend, cache_ttl=10.0)

# Admin:
await flags.set("new_checkout", enabled=True, rollout=25, allow=["beta-user-1"])

# Hot path:
if await flags.is_enabled("new_checkout", subject=user.id):
    return new_flow()
return old_flow()

Bucketing identity is caller-chosen: user.id for logged-in features, session.id for pre-login A/B, org.id for B2B per-tenant rollout.

OTP

from wyolet.primitives.otp import Otp, OtpNotFoundError, OtpTooManyAttemptsError

otp = Otp(backend, ttl=300, code_length=6, max_attempts=5, resend_cooldown=30)

code = await otp.issue(phone_number)
await sms.send(phone_number, f"Your code is {code}")

# Verification endpoint:
try:
    ok = await otp.verify(phone_number, submitted_code)
except OtpNotFoundError:
    return {"error": "code expired or never sent"}
except OtpTooManyAttemptsError:
    return {"error": "too many attempts, request a new code"}
return {"ok": ok}

User sessions (devices logged in)

from psycopg_pool import AsyncConnectionPool

from wyolet.primitives.user_session import CachedUserSessionRepo, UserSession, UserSessionService
from wyolet.primitives.user_session.postgres import PostgresUserSessionRepo

pool = AsyncConnectionPool("postgresql://...")
pg_repo = PostgresUserSessionRepo(pool, model=UserSession)
await pg_repo.ensure_schema()

repo = CachedUserSessionRepo(pg_repo, backend, model=UserSession, ttl=3600)
service = UserSessionService(repo, model=UserSession)

# On login:
session = await service.create(
    user_id=user.id, user_agent=ua, ip=request.client.host, os="macOS", device="Desktop",
)

# Devices-logged-in UI:
devices = await service.list_active(user.id)

# Revoke remotely:
await service.revoke(some_sid)

Extend by subclassing:

class MyUserSession(UserSession):
    tenant_id: str
    last_location: str | None = None

Pass model=MyUserSession everywhere; extras live in JSONB extra column unless you also override PostgresUserSessionRepo._row_to_model / _model_to_row.

Lifecycle (FastAPI lifespan)

No separate pool/health composer — just use the stdlib pattern:

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app):
    async with redis_client, pg_pool:
        await backend.start()  # for Memory/SQLite/Postgres (no-op for Redis/Valkey)
        yield
        await backend.close()

app = FastAPI(lifespan=lifespan)

Running tests

uv run pytest                      # unit tests, MemoryBackend + SQLiteBackend
docker compose up -d               # redis/valkey/postgres for the rest
uv run pytest                      # parametrized backend tests now hit all five
uv run pytest -m integration       # live-service tests (user_session postgres)

Deliberately out of scope

  • Retry / backoff — use tenacity or stamina. Stateless; no backend unification story.
  • Pool composer / health composer — each async lib ships its own pool. Stdlib @asynccontextmanager is a 5-line solution.
  • WebSockets — FastAPI native is enough.
  • ORM / DB layersqlmodel / sqlalchemy own this.
  • Background task queues — use taskiq / arq.

Roadmap

v0.1 (shipped): ratelimit (FixedWindow), lock, idempotency, session, user_session, cache (+ HTTPCache), dedup, circuit_breaker, feature_flags, otp; Memory/SQLite/Redis/Valkey/Postgres backends.

Next: more rate-limit strategies (token bucket, sliding window) via ScriptableBackend / LockableBackend capability mixins.

License

TBD.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wyolet_primitives-0.1.0.tar.gz (50.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wyolet_primitives-0.1.0-py3-none-any.whl (58.4 kB view details)

Uploaded Python 3

File details

Details for the file wyolet_primitives-0.1.0.tar.gz.

File metadata

  • Download URL: wyolet_primitives-0.1.0.tar.gz
  • Upload date:
  • Size: 50.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.8

File hashes

Hashes for wyolet_primitives-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6bc9de84099e5de9fa69bb2e7df6e09a9018c4ef4814ccd22f4bb0eadd27d6b2
MD5 291e99b2b9e49e5282029a73e5933c1a
BLAKE2b-256 006b308ffeb96c8485401ce7db3e82fdb357f8325b9e056542159aeeddda1f69

See more details on using hashes here.

File details

Details for the file wyolet_primitives-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for wyolet_primitives-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 78b9d7e7b1aaac6843538c9391de0e376e3cc05db6ebca650a5794b813e3786d
MD5 642977673d796db299e7dbb0075b5e51
BLAKE2b-256 298d53cff2af3e68b7453eb98640d0a3555c64647138a51277a9bc3ed2c53792

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page