Skip to main content

A Python auth library that handles JWT token management, password hashing, OAuth2 login flow, and route protection with full sync and async support so you don't have to wire it together yourself.

Project description


A Python auth library that handles JWT token management, password hashing, OAuth2 login flow, and route protection with full sync and async support so you don't have to wire it together yourself.

Most auth libraries do one thing. PyJWT gives you JWT encoding. bcrypt gives you password hashing. You still have to write the login flow, build the guards, handle the exceptions, and repeat that boilerplate across every project. gatevault wraps all of it into one coherent package with a clean API you can drop into any Python project regardless of framework whether you're using FastAPI with async SQLAlchemy, Flask with a sync ORM, or Django with its built-in ORM.

pip install richard-gatevault

Table of Contents


Installation

pip install richard-gatevault

Requires Python 3.9+. Dependencies PyJWT and bcrypt are installed automatically no extra setup needed.

Everything in gatevault is importable from the top level:

from gatevault import (
    TokenManager,
    OAuthHandler,
    GateVault,
    hash_password,
    verify_password,
)

Quick Start

The fastest path from zero to a working auth setup. This assumes a sync database swap login for async_login and add async def if you're using an async ORM.

import os
from gatevault import TokenManager, OAuthHandler, GateVault, hash_password
from gatevault import InvalidCredentialsError, UnauthorizedError, GuardError

# 1. Initialize once at startup
tm = TokenManager(
    secret_key=os.environ["AUTH_SECRET_KEY"],
    access_expiry_minutes=15,
    refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
oauth = OAuthHandler(token_manager=tm, get_user=lambda u: db.get_user(u))

# 2. Hash passwords at registration
hashed = hash_password(plain_password)
db.save_user(email=email, hashed_password=hashed)

# 3. Login returns tokens
tokens = oauth.login(email, password)
# → {"access_token": "eyJ...", "refresh_token": "eyJ...", "token_type": "bearer"}

# 4. Protect any function with a decorator
@gate.protected
def get_profile(payload=None):
    return db.get_user(payload["user_id"])

# Works on async functions too
@gate.protected
async def get_profile_async(payload=None):
    return await db.get_user(payload["user_id"])

# 5. Call the protected function with the token
result = get_profile(token=access_token)
result = await get_profile_async(token=access_token)

Choosing the right method

Situation Use
Sync database (Django ORM, SQLAlchemy sync) oauth.login(...)
Async database (async SQLAlchemy, asyncpg) await oauth.async_login(...)
Protecting a regular function @gate.protected + def
Protecting an async route/function @gate.protected + async def

The Full Picture

Here is what a complete auth setup looks like end to end registration, login, token storage, protected routes, and token refresh.

This example uses async_login and async def protected routes for an async SQLAlchemy setup. If you are using a synchronous database, replace async_login with login, async def protected functions with regular def, and remove the async/await keywords from the login route.

import os
from gatevault import (
    TokenManager, OAuthHandler, GateVault,
    hash_password,
    InvalidCredentialsError, UnauthorizedError,
    TokenExpiredError, GuardError
)

# ---------------------------------------------------------------------------
# Setup do this once at app startup
# ---------------------------------------------------------------------------

tm = TokenManager(
    secret_key=os.environ["AUTH_SECRET_KEY"],
    access_expiry_minutes=15,
    refresh_expiry_days=7
)

gate = GateVault(token_manager=tm)


# ---------------------------------------------------------------------------
# get_user async version for use with async_login
# ---------------------------------------------------------------------------

async def get_user(username: str):
    result = await db.execute(select(User).where(User.email == username))
    return result.scalar_one_or_none()

oauth = OAuthHandler(token_manager=tm, get_user=get_user)


# ---------------------------------------------------------------------------
# Registration hash and store the password, never store plain text
# ---------------------------------------------------------------------------

async def register(username: str, plain_password: str):
    hashed = hash_password(plain_password)
    await db.create_user(username=username, hashed_password=hashed)
    return {"message": "registered"}


# ---------------------------------------------------------------------------
# Login uses async_login since get_user is async
# Access token goes in the response body.
# Refresh token goes in an httpOnly cookie never in the body.
# ---------------------------------------------------------------------------

async def login(username: str, password: str, response):
    try:
        tokens = await oauth.async_login(username, password)
    except (InvalidCredentialsError, UnauthorizedError):
        return {"error": "invalid credentials"}, 401

    response.set_cookie(
        key="refresh_token",
        value=tokens["refresh_token"],
        httponly=True,
        secure=True,
        samesite="strict"
    )
    return {"access_token": tokens["access_token"]}


# ---------------------------------------------------------------------------
# Protected functions gatevault.protected works on both sync and async
# ---------------------------------------------------------------------------

@gate.protected
async def get_profile(payload=None):
    user_id = payload["user_id"]
    return await db.get_user(user_id)

@gate.protected
def get_orders(payload=None):
    user_id = payload["user_id"]
    return db.get_orders(user_id)


# ---------------------------------------------------------------------------
# Token refresh client sends the refresh token via httpOnly cookie
# Server issues a new access token
# ---------------------------------------------------------------------------

async def refresh_route(refresh_token: str):
    try:
        payload = tm.decode_token(refresh_token)
    except TokenExpiredError:
        return {"error": "session expired, please log in again"}, 401
    except (GuardError, UnauthorizedError):
        return {"error": "invalid token"}, 401

    if payload["type"] != "refresh":
        return {"error": "wrong token type"}, 400

    await db.revoke_refresh_token(refresh_token)

    new_access = tm.create_access_token(user_id=payload["user_id"])
    new_refresh = tm.create_refresh_token(user_id=payload["user_id"])

    await db.store_refresh_token(new_refresh, user_id=payload["user_id"])

    return {"access_token": new_access}

Password Hashing

gatevault uses bcrypt for password hashing. Passwords are one-way hashed there is no way to reverse a hash back to the original password. If your database is ever compromised, attackers get hashes, not passwords.

Hashing a password

from gatevault import hash_password

hashed = hash_password("user_plain_password")
print(hashed)
# $2b$12$Kq8J3mNrandom...

Always hash at the point of registration and store the result. Never store or log the plain password.

def register(username: str, plain_password: str):
    hashed = hash_password(plain_password)
    db.insert(username=username, hashed_password=hashed)
    return {"message": "account created"}

Verifying a password

from gatevault import verify_password

is_match = verify_password("user_plain_password", stored_hash)
# True or False

verify_password returns a boolean. It never raises on a wrong password it just returns False. What you do with that result is your decision.

from gatevault import verify_password, InvalidCredentialsError, UnauthorizedError

def authenticate(username: str, plain_password: str):
    user = db.get_user(username)
    if not user:
        raise InvalidCredentialsError("user not found")
    if not verify_password(plain_password, user.hashed_password):
        raise UnauthorizedError("wrong password")
    return user

About bcrypt salting

You do not need to manage salts yourself. bcrypt generates a unique random salt for every hash and embeds it in the output string. Two calls to hash_password with the same password produce different hashes both are valid.

h1 = hash_password("same_password")
h2 = hash_password("same_password")

print(h1 == h2)                             # False different salts
print(verify_password("same_password", h1)) # True
print(verify_password("same_password", h2)) # True
print(verify_password("wrong", h1))         # False

Standalone usage

hash_password and verify_password have no dependency on the rest of gatevault. You can use them without setting up TokenManager or anything else:

from gatevault import hash_password, verify_password

stored = hash_password("mypassword")

if verify_password("mypassword", stored):
    print("access granted")
else:
    print("access denied")

Token Management

TokenManager handles all JWT creation and verification. It is the core of gatevault OAuthHandler uses it to create tokens and GateVault uses it to verify them. Create one instance at startup and share it across your app.

Setup

import os
from gatevault import TokenManager

tm = TokenManager(
    secret_key=os.environ["AUTH_SECRET_KEY"],
    access_expiry_minutes=15,
    refresh_expiry_days=7
)

The secret_key is what signs your tokens. Anyone with this key can forge valid tokens keep it in an environment variable, never in source code.

To generate a secure key:

python -c "import secrets; print(secrets.token_hex(32))"

Creating tokens

access_token = tm.create_access_token(user_id=42)
refresh_token = tm.create_refresh_token(user_id=42)

Access tokens are short-lived (minutes) sent with every authenticated request. Refresh tokens are long-lived (days) used only to obtain a new access token when the current one expires.

Extra claims

You can embed additional data in the token payload using keyword arguments. This is useful for role-based access control you can check the role from the token itself without a database lookup on every request:

access_token = tm.create_access_token(user_id=42, role="admin", org_id=7)

payload = tm.decode_token(access_token)
print(payload)
# {
#   "user_id": 42,
#   "exp": 1234567890,
#   "type": "access",
#   "role": "admin",
#   "org_id": 7
# }
@gate.protected
def admin_dashboard(payload=None):
    if payload.get("role") != "admin":
        raise UnauthorizedError("admin access required")
    return get_admin_data()

Decoding tokens

payload = tm.decode_token(token)

user_id = payload["user_id"]
token_type = payload["type"]   # "access" or "refresh"
expiry = payload["exp"]        # unix timestamp

decode_token verifies the signature and checks expiry in one call. It raises specific exceptions on failure:

from gatevault import TokenExpiredError, InvalidTokenError, TokenDecodeError

try:
    payload = tm.decode_token(token)
except TokenExpiredError:
    # token has passed its exp time send client to refresh endpoint
    return {"error": "token expired"}, 401
except InvalidTokenError:
    # signature mismatch token was tampered with
    return {"error": "invalid token"}, 401
except TokenDecodeError:
    # token string is malformed can't be parsed at all
    return {"error": "malformed token"}, 400

Access vs refresh telling them apart

Every token carries a type claim. Always check it on your refresh endpoint you only want refresh tokens there:

payload = tm.decode_token(token)

if payload["type"] != "refresh":
    return {"error": "wrong token type"}, 400

TokenManager is shared

OAuthHandler creates tokens. GateVault verifies them. They don't communicate directly the shared TokenManager instance is the trust anchor. Same secret key in, same secret key out.

tm = TokenManager(secret_key=os.environ["AUTH_SECRET_KEY"], access_expiry_minutes=15, refresh_expiry_days=7)

oauth = OAuthHandler(token_manager=tm, get_user=get_user)  # uses tm to create tokens
gate = GateVault(token_manager=tm)                          # uses tm to verify tokens

Login Flow

OAuthHandler wires together user lookup, password verification, and token creation into one call. It follows the OAuth2 Resource Owner Password Credentials flow.

It supports both synchronous and asynchronous user lookup use login for sync databases (SQLAlchemy sync, Django ORM, raw psycopg2) and async_login for async databases (async SQLAlchemy, asyncpg, Tortoise ORM).

Setup

from gatevault import OAuthHandler

def get_user(username: str):
    # return a user object with `id` and `hashed_password` attributes
    # return None if the user doesn't exist
    return db.query(User).filter(User.email == username).first()

oauth = OAuthHandler(token_manager=tm, get_user=get_user)

Your get_user function must return an object with two attributes:

  • id the user's identifier, embedded in the token payload
  • hashed_password the bcrypt hash stored at registration

If your model uses different field names, add a property:

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String)
    password_hash = Column(String)

    @property
    def hashed_password(self):
        return self.password_hash  # gatevault expects hashed_password

Sync Login

Use login when your get_user function is a regular synchronous function:

tokens = oauth.login("john@example.com", "their_password")

print(tokens)
# {
#     "access_token": "eyJhbGci...",
#     "refresh_token": "eyJhbGci...",
#     "token_type": "bearer"
# }

login does three things in order:

  1. Calls get_user(username) raises InvalidCredentialsError if None is returned
  2. Calls verify_password(password, user.hashed_password) raises UnauthorizedError if it returns False
  3. Calls create_access_token and create_refresh_token raises GuardError if token creation fails

Async Login

Use async_login when your get_user function is defined as async def typically when using async SQLAlchemy, asyncpg, or any async ORM. It performs the exact same three steps as login, but awaits the get_user call.

async def get_user(username: str):
    result = await db.execute(select(User).where(User.email == username))
    return result.scalar_one_or_none()

oauth = OAuthHandler(token_manager=tm, get_user=get_user)

# must be awaited inside an async context
tokens = await oauth.async_login("john@example.com", "their_password")

print(tokens)
# {
#     "access_token": "eyJhbGci...",
#     "refresh_token": "eyJhbGci...",
#     "token_type": "bearer"
# }

Important: Always match the method to your get_user type. Calling async_login with a sync get_user raises TypeError because you cannot await a plain value. Calling login with an async get_user returns a coroutine object instead of a user authentication will silently fail. When in doubt, check whether your get_user is async def.


What to do with the tokens

The access token goes back to the client in the response body. The refresh token should be set as an httpOnly cookie it is invisible to JavaScript and therefore safe from XSS attacks:

async def login_route(username, password, response):
    try:
        tokens = await oauth.async_login(username, password)
    except (InvalidCredentialsError, UnauthorizedError):
        return {"error": "invalid credentials"}, 401

    response.set_cookie(
        key="refresh_token",
        value=tokens["refresh_token"],
        httponly=True,
        secure=True,      # HTTPS only
        samesite="strict"
    )
    return {"access_token": tokens["access_token"]}

The client stores the access token in memory (a JavaScript variable not localStorage) and attaches it to every protected request via the Authorization header:

Authorization: Bearer <access_token>

The refresh token lives in the browser's httpOnly cookie store. The browser sends it automatically on requests to the refresh endpoint the client never touches it directly.

Client-side example:

// After login store access token in memory only
const { access_token } = await fetch("/login", {
    method: "POST",
    body: JSON.stringify({ username, password })
}).then(r => r.json())

// Every protected request send access token in Authorization header
const profile = await fetch("/profile", {
    headers: { "Authorization": `Bearer ${access_token}` }
}).then(r => r.json())

// When access token expires browser sends refresh cookie automatically
const refreshed = await fetch("/refresh", {
    method: "POST",
    credentials: "include"  // tells browser to send the httpOnly cookie
}).then(r => r.json())

const new_access_token = refreshed.access_token

Handling login errors

from gatevault import InvalidCredentialsError, UnauthorizedError, GuardError

try:
    tokens = await oauth.async_login(username, password)
except InvalidCredentialsError:
    return {"error": "invalid credentials"}, 401
except UnauthorizedError:
    return {"error": "invalid credentials"}, 401
except GuardError:
    return {"error": "authentication failed"}, 500

Return the same error message for both InvalidCredentialsError and UnauthorizedError. Distinguishing between them tells an attacker which usernames exist in your system.


Protecting Routes

GateVault wraps any function sync or async with token verification. The wrapped function never executes if the token is missing, expired, or invalid. On success, the decoded payload is injected as the payload keyword argument.

gate.protected automatically detects whether the decorated function is a coroutine (async def) and applies the appropriate wrapper. You use the same decorator for both.

Setup

from gatevault import GateVault

gate = GateVault(token_manager=tm)

Sync Protection

@gate.protected
def get_profile(payload=None):
    user_id = payload["user_id"]
    return db.get_user(user_id)

# call with token= keyword argument
result = get_profile(token="eyJhbGci...")

Async Protection

@gate.protected
async def get_profile(payload=None):
    user_id = payload["user_id"]
    return await db.get_user(user_id)

# await the call gatevault detected async def and returns a coroutine
result = await get_profile(token="eyJhbGci...")

The token extraction and verification logic is identical in both cases. The only difference is that the async wrapper awaits the decorated function after injecting the payload.


How the token reaches your function

In a real app, the client sends the token in the Authorization header. Your framework gives you access to that header. You extract the token string and pass it to the protected function:

# FastAPI example
@app.get("/profile")
async def profile_route(authorization: str = Header(...)):
    token = authorization.replace("Bearer ", "")
    try:
        return await get_profile(token=token)
    except (GuardError, UnauthorizedError):
        raise HTTPException(status_code=401, detail="unauthorized")

Multiple protected routes

@gate.protected
async def get_profile(payload=None):
    return await db.get_user(payload["user_id"])

@gate.protected
async def get_orders(payload=None):
    return await db.get_orders(payload["user_id"])

@gate.protected
async def update_settings(settings: dict, payload=None):
    return await db.update_settings(payload["user_id"], settings)

Passing arguments alongside the token

Your function can accept any arguments alongside payload:

@gate.protected
async def get_post(post_id: int, payload=None):
    user_id = payload["user_id"]
    post = await db.get_post(post_id)
    if post.owner_id != user_id:
        raise UnauthorizedError("not your post")
    return post

result = await get_post(post_id=7, token="eyJhbGci...")

Role-based access using claims

@gate.protected
def admin_only(payload=None):
    if payload.get("role") != "admin":
        raise UnauthorizedError("admin access required")
    return get_admin_data()

@gate.protected
async def moderator_or_above(payload=None):
    if payload.get("role") not in ("admin", "moderator"):
        raise UnauthorizedError("insufficient permissions")
    return await get_mod_tools()

Handling guard errors

from gatevault import GuardError, UnauthorizedError

try:
    result = await get_profile(token=incoming_token)
except GuardError as e:
    # token missing, expired, or malformed
    return {"error": str(e)}, 401
except UnauthorizedError as e:
    # invalid signature or permission check failed inside the function
    return {"error": str(e)}, 403

Exception Handling

All gatevault exceptions inherit from GatevaultError. Catch broadly or specifically depending on what you need.

GatevaultError
├── TokenError
│   ├── TokenExpiredError
│   ├── InvalidTokenError
│   └── TokenDecodeError
├── HashingError
└── GuardError
    ├── UnauthorizedError
    └── InvalidCredentialsError

Importing exceptions

from gatevault import (
    GatevaultError,
    TokenError,
    TokenExpiredError,
    InvalidTokenError,
    TokenDecodeError,
    HashingError,
    GuardError,
    UnauthorizedError,
    InvalidCredentialsError,
)

Catching broadly one handler for everything

try:
    tokens = await oauth.async_login(username, password)
except GatevaultError as e:
    return {"error": str(e)}, 401

Catching specifically different response per failure

try:
    payload = tm.decode_token(token)
except TokenExpiredError:
    # access token expired tell client to use refresh token
    return {"error": "token expired", "action": "refresh"}, 401
except InvalidTokenError:
    # signature mismatch possible tampering, do not trust
    return {"error": "invalid token"}, 401
except TokenDecodeError:
    # token string is completely malformed bad format
    return {"error": "malformed token"}, 400

Catching by category

try:
    payload = tm.decode_token(token)
except TokenError:
    # catches all three: TokenExpiredError, InvalidTokenError, TokenDecodeError
    return {"error": "token error"}, 401
try:
    tokens = await oauth.async_login(username, password)
except GuardError:
    # catches InvalidCredentialsError and UnauthorizedError
    return {"error": "authentication failed"}, 401

Real-world error handling pattern

from gatevault import (
    InvalidCredentialsError, UnauthorizedError,
    TokenExpiredError, TokenDecodeError, InvalidTokenError, GuardError
)

async def handle_login(username: str, password: str):
    try:
        tokens = await oauth.async_login(username, password)
        return {"access_token": tokens["access_token"]}, 200
    except (InvalidCredentialsError, UnauthorizedError):
        return {"error": "invalid credentials"}, 401
    except GuardError:
        return {"error": "authentication failed"}, 500

async def handle_protected_request(token: str):
    try:
        return await get_profile(token=token)
    except TokenExpiredError:
        return {"error": "token expired", "action": "refresh"}, 401
    except (InvalidTokenError, TokenDecodeError, GuardError):
        return {"error": "unauthorized"}, 401
    except UnauthorizedError:
        return {"error": "forbidden"}, 403

Warnings

ShortKeyWarning

Issued at TokenManager creation if the secret key is shorter than 32 bytes. HS256 requires at least 32 bytes per RFC 7518. This is a warning, not an error your app will still run, but your tokens will be less secure.

import warnings
from gatevault import TokenManager, ShortKeyWarning

# Suppress in tests where key length doesn't matter
warnings.filterwarnings("ignore", category=ShortKeyWarning)
tm = TokenManager(secret_key="short", access_expiry_minutes=15, refresh_expiry_days=7)

Treating as an error in CI

# Catch misconfigurations early fail the build if key is too short
warnings.filterwarnings("error", category=ShortKeyWarning)

Framework Integration

gatevault is framework-agnostic. The same library works across FastAPI, Flask, and Django you wire it into each framework's request/response cycle in the same way: initialize once at startup, define protected functions with @gate.protected, and pass the token from the Authorization header into the protected function at request time.

Framework Recommended login method Async support
FastAPI + async SQLAlchemy async_login Full use async def protected functions
FastAPI + sync DB login N/A
Flask login Not applicable (Flask is sync)
Django (sync ORM) login N/A
Django 4.1+ async views async_login Full use async def protected functions
Django REST Framework login N/A

FastAPI (Async)

The recommended FastAPI setup uses async_login, async def protected functions, and async SQLAlchemy:

import os
from fastapi import FastAPI, HTTPException, Response, Cookie, Depends, Header
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from gatevault import TokenManager, OAuthHandler, GateVault, hash_password
from gatevault import (
    InvalidCredentialsError, UnauthorizedError,
    GuardError, TokenExpiredError, InvalidTokenError, TokenDecodeError
)

app = FastAPI()

tm = TokenManager(
    secret_key=os.environ["AUTH_SECRET_KEY"],
    access_expiry_minutes=15,
    refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)


# Protected functions async, defined once, reused across routes
@gate.protected
async def get_current_profile(payload=None):
    return await db.get_user(payload["user_id"])

@gate.protected
async def get_user_orders(payload=None):
    return await db.get_orders(payload["user_id"])


# Registration
@app.post("/register")
async def register(body: UserCreate, db: AsyncSession = Depends(get_db)):
    hashed = hash_password(body.password)
    new_user = User(email=body.email, hashed_password=hashed)
    db.add(new_user)
    await db.commit()
    return {"message": "registered"}


# Login async DB lookup requires async_login
@app.post("/login")
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    response: Response = None,
    db: AsyncSession = Depends(get_db)
):
    async def _get_user(username: str):
        result = await db.execute(select(User).where(User.email == username))
        return result.scalar_one_or_none()

    oauth = OAuthHandler(token_manager=tm, get_user=_get_user)

    try:
        tokens = await oauth.async_login(form_data.username, form_data.password)
    except (InvalidCredentialsError, UnauthorizedError):
        raise HTTPException(status_code=401, detail="invalid credentials")

    response.set_cookie(
        key="refresh_token",
        value=tokens["refresh_token"],
        httponly=True,
        secure=True,
        samesite="strict"
    )
    return {"access_token": tokens["access_token"], "token_type": "bearer"}


# Protected routes extract token from header, pass to protected function
@app.get("/profile")
async def profile(authorization: str = Header(...)):
    token = authorization.replace("Bearer ", "")
    try:
        return await get_current_profile(token=token)
    except (GuardError, UnauthorizedError):
        raise HTTPException(status_code=401, detail="unauthorized")


# Token refresh
@app.post("/refresh")
async def refresh(response: Response, refresh_token: str = Cookie(...)):
    try:
        payload = tm.decode_token(refresh_token)
    except TokenExpiredError:
        raise HTTPException(status_code=401, detail="session expired")
    except (InvalidTokenError, TokenDecodeError):
        raise HTTPException(status_code=401, detail="invalid token")

    if payload["type"] != "refresh":
        raise HTTPException(status_code=400, detail="wrong token type")

    new_access = tm.create_access_token(user_id=payload["user_id"])
    new_refresh = tm.create_refresh_token(user_id=payload["user_id"])

    response.set_cookie(
        key="refresh_token",
        value=new_refresh,
        httponly=True,
        secure=True,
        samesite="strict"
    )
    return {"access_token": new_access}


# Logout clear the refresh token cookie
@app.post("/logout")
def logout(response: Response, refresh_token: str = Cookie(None)):
    response.delete_cookie("refresh_token")
    return {"message": "logged out"}

FastAPI (Sync)

If you are using a synchronous database driver, use login and regular def protected functions:

def get_user_from_db(username: str):
    return db.query(User).filter(User.email == username).first()

oauth = OAuthHandler(token_manager=tm, get_user=get_user_from_db)

@gate.protected
def get_current_profile(payload=None):
    return db.get_user(payload["user_id"])

@app.post("/login")
def login(form_data: OAuth2PasswordRequestForm = Depends(), response: Response = None):
    try:
        tokens = oauth.login(form_data.username, form_data.password)
    except (InvalidCredentialsError, UnauthorizedError):
        raise HTTPException(status_code=401, detail="invalid credentials")

    response.set_cookie(
        key="refresh_token",
        value=tokens["refresh_token"],
        httponly=True,
        secure=True,
        samesite="strict"
    )
    return {"access_token": tokens["access_token"], "token_type": "bearer"}

@app.get("/profile")
def profile(authorization: str = Header(...)):
    token = authorization.replace("Bearer ", "")
    try:
        return get_current_profile(token=token)
    except (GuardError, UnauthorizedError):
        raise HTTPException(status_code=401, detail="unauthorized")

Flask

import os
from flask import Flask, request, jsonify, make_response
from gatevault import TokenManager, OAuthHandler, GateVault, hash_password
from gatevault import (
    InvalidCredentialsError, UnauthorizedError,
    GuardError, TokenExpiredError, InvalidTokenError, TokenDecodeError
)

app = Flask(__name__)

tm = TokenManager(
    secret_key=os.environ["AUTH_SECRET_KEY"],
    access_expiry_minutes=15,
    refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
oauth = OAuthHandler(token_manager=tm, get_user=get_user_from_db)


def get_token():
    return request.headers.get("Authorization", "").replace("Bearer ", "")


@gate.protected
def get_current_profile(payload=None):
    return db.get_user(payload["user_id"])


@app.post("/register")
def register():
    data = request.json
    hashed = hash_password(data["password"])
    db.create_user(username=data["username"], hashed_password=hashed)
    return jsonify({"message": "registered"})


@app.post("/login")
def login():
    data = request.json
    try:
        tokens = oauth.login(data["username"], data["password"])
    except (InvalidCredentialsError, UnauthorizedError):
        return jsonify({"error": "invalid credentials"}), 401

    response = make_response(jsonify({"access_token": tokens["access_token"]}))
    response.set_cookie(
        "refresh_token",
        tokens["refresh_token"],
        httponly=True,
        secure=True,
        samesite="Strict"
    )
    return response


@app.get("/profile")
def profile():
    try:
        return jsonify(get_current_profile(token=get_token()))
    except (GuardError, UnauthorizedError):
        return jsonify({"error": "unauthorized"}), 401


@app.post("/refresh")
def refresh():
    refresh_token = request.cookies.get("refresh_token")
    if not refresh_token:
        return jsonify({"error": "no refresh token"}), 401

    try:
        payload = tm.decode_token(refresh_token)
    except TokenExpiredError:
        return jsonify({"error": "session expired"}), 401
    except (InvalidTokenError, TokenDecodeError):
        return jsonify({"error": "invalid token"}), 401

    if payload["type"] != "refresh":
        return jsonify({"error": "wrong token type"}), 400

    new_access = tm.create_access_token(user_id=payload["user_id"])
    new_refresh = tm.create_refresh_token(user_id=payload["user_id"])

    response = make_response(jsonify({"access_token": new_access}))
    response.set_cookie(
        "refresh_token", new_refresh,
        httponly=True, secure=True, samesite="Strict"
    )
    return response


@app.post("/logout")
def logout():
    response = make_response(jsonify({"message": "logged out"}))
    response.delete_cookie("refresh_token")
    return response

Django

gatevault works with Django's sync ORM out of the box. Use login with a sync get_user. For async views (Django 4.1+), use async_login with an async get_user.

Setup auth/gatevault_setup.py

Create a dedicated setup file and import from it across your views. This avoids reinitializing gatevault objects on every request.

# auth/gatevault_setup.py
import os
from gatevault import TokenManager, OAuthHandler, GateVault

tm = TokenManager(
    secret_key=os.environ["AUTH_SECRET_KEY"],
    access_expiry_minutes=15,
    refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)


def get_user_from_db(username: str):
    from myapp.models import User
    try:
        return User.objects.get(email=username)
    except User.DoesNotExist:
        return None


oauth = OAuthHandler(token_manager=tm, get_user=get_user_from_db)

Views auth/views.py

import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST, require_GET
from gatevault import hash_password
from gatevault import (
    InvalidCredentialsError, UnauthorizedError,
    GuardError, TokenExpiredError, InvalidTokenError, TokenDecodeError
)
from .gatevault_setup import tm, oauth, gate
from .models import User


# Protected functions defined once, called from any view
@gate.protected
def get_current_profile(payload=None):
    return User.objects.get(id=payload["user_id"])


# Registration
@csrf_exempt
@require_POST
def register(request):
    data = json.loads(request.body)
    hashed = hash_password(data["password"])
    User.objects.create(email=data["email"], hashed_password=hashed)
    return JsonResponse({"message": "registered"}, status=201)


# Login
@csrf_exempt
@require_POST
def login(request):
    data = json.loads(request.body)
    try:
        tokens = oauth.login(data["username"], data["password"])
    except (InvalidCredentialsError, UnauthorizedError):
        return JsonResponse({"error": "invalid credentials"}, status=401)

    response = JsonResponse({"access_token": tokens["access_token"], "token_type": "bearer"})
    response.set_cookie(
        "refresh_token",
        tokens["refresh_token"],
        httponly=True,
        secure=True,        # set to False in local development
        samesite="Strict"
    )
    return response


# Protected route extract token from header, pass to protected function
@require_GET
def profile(request):
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    try:
        user = get_current_profile(token=token)
        return JsonResponse({"id": user.id, "email": user.email})
    except GuardError:
        return JsonResponse({"error": "unauthorized"}, status=401)
    except UnauthorizedError:
        return JsonResponse({"error": "forbidden"}, status=403)


# Token refresh reads refresh token from httpOnly cookie
@csrf_exempt
@require_POST
def refresh(request):
    refresh_token = request.COOKIES.get("refresh_token")
    if not refresh_token:
        return JsonResponse({"error": "no refresh token"}, status=401)

    try:
        payload = tm.decode_token(refresh_token)
    except TokenExpiredError:
        return JsonResponse({"error": "session expired"}, status=401)
    except (InvalidTokenError, TokenDecodeError):
        return JsonResponse({"error": "invalid token"}, status=401)

    if payload["type"] != "refresh":
        return JsonResponse({"error": "wrong token type"}, status=400)

    new_access = tm.create_access_token(user_id=payload["user_id"])
    new_refresh = tm.create_refresh_token(user_id=payload["user_id"])

    response = JsonResponse({"access_token": new_access})
    response.set_cookie(
        "refresh_token", new_refresh,
        httponly=True, secure=True, samesite="Strict"
    )
    return response


# Logout clear the refresh token cookie
@csrf_exempt
@require_POST
def logout(request):
    response = JsonResponse({"message": "logged out"})
    response.delete_cookie("refresh_token")
    return response

URL configuration auth/urls.py

from django.urls import path
from . import views

urlpatterns = [
    path("register/", views.register),
    path("login/", views.login),
    path("refresh/", views.refresh),
    path("logout/", views.logout),
    path("profile/", views.profile),
]

Django REST Framework

If you are using DRF, the same gate.protected functions work inside APIView or @api_view extract the token from the Authorization header and pass it in:

from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
from gatevault import GuardError, UnauthorizedError
from .gatevault_setup import gate


@gate.protected
def get_current_profile(payload=None):
    from myapp.models import User
    return User.objects.get(id=payload["user_id"])


@api_view(["GET"])
def profile(request):
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    try:
        user = get_current_profile(token=token)
        return Response({"id": user.id, "email": user.email})
    except GuardError:
        return Response({"error": "unauthorized"}, status=status.HTTP_401_UNAUTHORIZED)
    except UnauthorizedError:
        return Response({"error": "forbidden"}, status=status.HTTP_403_FORBIDDEN)

You can also use APIView for class-based views:

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status


class ProfileView(APIView):
    def get(self, request):
        token = request.headers.get("Authorization", "").replace("Bearer ", "")
        try:
            user = get_current_profile(token=token)
            return Response({"id": user.id, "email": user.email})
        except (GuardError, UnauthorizedError):
            return Response({"error": "unauthorized"}, status=status.HTTP_401_UNAUTHORIZED)

Async Django views (Django 4.1+)

For async Django views, define an async get_user and use async_login:

# auth/gatevault_setup.py async variant
from django.contrib.auth import get_user_model

async def get_user_async(username: str):
    User = get_user_model()
    try:
        return await User.objects.aget(email=username)
    except User.DoesNotExist:
        return None

oauth_async = OAuthHandler(token_manager=tm, get_user=get_user_async)
# auth/views.py async login view
import json
from django.http import JsonResponse
from .gatevault_setup import oauth_async
from gatevault import InvalidCredentialsError, UnauthorizedError


async def login_async(request):
    if request.method != "POST":
        return JsonResponse({"error": "method not allowed"}, status=405)
    data = json.loads(request.body)
    try:
        tokens = await oauth_async.async_login(data["username"], data["password"])
    except (InvalidCredentialsError, UnauthorizedError):
        return JsonResponse({"error": "invalid credentials"}, status=401)

    response = JsonResponse({"access_token": tokens["access_token"], "token_type": "bearer"})
    response.set_cookie(
        "refresh_token", tokens["refresh_token"],
        httponly=True, secure=True, samesite="Strict"
    )
    return response

Using gatevault in Parts

Just Hashing

from gatevault import hash_password, verify_password

stored = hash_password("user_password")

if verify_password("user_password", stored):
    print("access granted")
else:
    print("access denied")

Just Tokens

from gatevault import TokenManager
from gatevault import TokenExpiredError, InvalidTokenError, TokenDecodeError

tm = TokenManager(
    secret_key="your-very-secure-secret-key-32-bytes",
    access_expiry_minutes=30,
    refresh_expiry_days=14
)

# Create tokens after your own auth check
access = tm.create_access_token(user_id=1, role="admin", org_id=7)
refresh = tm.create_refresh_token(user_id=1)

# Decode and verify
try:
    payload = tm.decode_token(access)
    print(payload["user_id"])  # 1
    print(payload["role"])     # "admin"
    print(payload["type"])     # "access"
except TokenExpiredError:
    print("expired")
except InvalidTokenError:
    print("tampered")
except TokenDecodeError:
    print("malformed")

Just Guards

from gatevault import TokenManager, GateVault
from gatevault import GuardError, UnauthorizedError

tm = TokenManager(secret_key="...", access_expiry_minutes=15, refresh_expiry_days=7)
gate = GateVault(token_manager=tm)

# Works on both sync and async functions
@gate.protected
def get_dashboard(payload=None):
    return {"user_id": payload["user_id"]}

@gate.protected
async def get_dashboard_async(payload=None):
    return {"user_id": payload["user_id"]}

try:
    result = get_dashboard(token=incoming_token)
    result = await get_dashboard_async(token=incoming_token)
except GuardError:
    return {"error": "unauthorized"}, 401
except UnauthorizedError:
    return {"error": "forbidden"}, 403

Token Refresh & Rotation

gatevault creates tokens on demand but does not manage storage or invalidation that lives in your application. Without a refresh token store, a stolen refresh token is valid until it naturally expires, potentially days later.

Why you need a refresh token store

With a store you can:

  • Revoke tokens immediately on logout
  • Detect token reuse (a sign of theft)
  • Force re-login on password change or suspicious activity

A minimal SQL table for this:

CREATE TABLE refresh_tokens (
    token TEXT PRIMARY KEY,
    user_id INTEGER NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    expires_at TIMESTAMP NOT NULL
);

Full rotation pattern

from gatevault import TokenExpiredError, InvalidTokenError, TokenDecodeError

def rotate_tokens(refresh_token: str):
    # 1. Verify the refresh token
    try:
        payload = tm.decode_token(refresh_token)
    except TokenExpiredError:
        return {"error": "session expired, please log in again"}, 401
    except (InvalidTokenError, TokenDecodeError):
        return {"error": "invalid token"}, 401

    # 2. Confirm it is a refresh token
    if payload["type"] != "refresh":
        return {"error": "wrong token type"}, 400

    # 3. Check for reuse if already rotated, someone may have stolen it
    if not db.is_refresh_token_valid(refresh_token):
        db.revoke_all_tokens_for_user(payload["user_id"])
        return {"error": "token reuse detected, please log in again"}, 401

    # 4. Invalidate the old token
    db.revoke_refresh_token(refresh_token)

    # 5. Issue new pair
    new_access = tm.create_access_token(user_id=payload["user_id"])
    new_refresh = tm.create_refresh_token(user_id=payload["user_id"])

    # 6. Store the new refresh token
    db.store_refresh_token(new_refresh, user_id=payload["user_id"])

    return {"access_token": new_access, "refresh_token": new_refresh}

Security Guide

Secret key

  • Use at least 32 bytes gatevault warns you if you don't
  • Generate with: python -c "import secrets; print(secrets.token_hex(32))"
  • Store in an environment variable never hardcode in source
  • Rotating the key invalidates all existing tokens immediately plan accordingly

Token storage on the client

Token Where to store Why
Access token Memory (JS variable) Short-lived, wiped on tab close, never persisted
Refresh token httpOnly cookie Invisible to JavaScript safe from XSS

Never store tokens in localStorage any JavaScript running on the page, including injected scripts, can read it.

What not to put in the payload

The JWT payload is base64 encoded, not encrypted. Anyone with the token string can decode and read it:

# Fine identifiers and non-sensitive metadata
tm.create_access_token(user_id=42, role="admin", org_id=7)

# Never do this readable by anyone
tm.create_access_token(user_id=42, email="john@example.com", password_hash="$2b$...")

Login enumeration

Return the same error for "user not found" and "wrong password":

# Good attacker learns nothing
except (InvalidCredentialsError, UnauthorizedError):
    return {"error": "invalid credentials"}, 401

# Bad confirms the username exists
except InvalidCredentialsError:
    return {"error": "user not found"}, 404

API Reference

TokenManager(secret_key, access_expiry_minutes, refresh_expiry_days)

Parameter Type Description
secret_key str Secret for signing tokens. Minimum 32 bytes recommended.
access_expiry_minutes int Access token lifetime in minutes.
refresh_expiry_days int Refresh token lifetime in days.
Method Returns Description
create_access_token(user_id, **kwargs) str Creates a signed access token. Extra kwargs embedded in payload.
create_refresh_token(user_id, **kwargs) str Creates a signed refresh token. Extra kwargs embedded in payload.
decode_token(token) dict Verifies signature and expiry. Returns payload dict. Raises on failure.

OAuthHandler(token_manager, get_user)

Parameter Type Description
token_manager TokenManager Configured TokenManager instance.
get_user Callable[[str], Any | None] User lookup function. Sync or async. Must return object with id and hashed_password, or None.
Method Returns Description
login(username, password) dict Authenticates synchronously. Use when get_user is a regular function. Returns {"access_token", "refresh_token", "token_type"}.
async_login(username, password) Coroutine[dict] Authenticates asynchronously. Use when get_user is async def. Must be called with await.

GateVault(token_manager)

Parameter Type Description
token_manager TokenManager Configured TokenManager instance.
Method Returns Description
protected(f) Callable Decorator. Verifies token before executing f. Works on both sync and async functions. Injects decoded payload as payload kwarg on success.

hash_password(plain) -> str

Parameter Type Description
plain str Plain text password.

Returns bcrypt hash string. Raises HashingError on unexpected failure.


verify_password(plain, hashed) -> bool

Parameter Type Description
plain str Plain text password to check.
hashed str Stored bcrypt hash.

Returns True if match, False otherwise. Never raises on wrong password.


Design Decisions

Framework-agnostic

Tying gatevault to FastAPI or Flask would limit who can use it. Auth logic hashing, signing, verifying has nothing to do with HTTP. Pure Python means it works anywhere Python runs.

Both sync and async throughout

async_login and async-aware gate.protected were added to support async ORMs like async SQLAlchemy without requiring a separate library or workaround. The sync and async paths are intentionally separate in OAuthHandler calling the wrong one for your get_user type fails loudly rather than silently misbehaving. gate.protected unifies both under one decorator by detecting the function type at decoration time.

Class-based TokenManager

The secret key and expiry settings are configuration they belong on an instance, not passed into every function call. Configure once at startup, share everywhere without threading arguments through every call.

Shared TokenManager across OAuthHandler and GateVault

OAuthHandler creates tokens. GateVault verifies them. The shared TokenManager instance is the trust anchor same secret key in, same secret key out.

Payload injected as a keyword argument

payload=payload is explicit. The decorated function always knows where its auth data comes from. Positional injection would silently break functions whose arguments don't match the expected order.

Wrapping third-party exceptions

PyJWT and bcrypt exceptions never surface through the gatevault API. Consumers only need to know gatevault exceptions. If an underlying library changes its exception names in a future version, only gatevault updates.

verify_password returns bool, not raises

A wrong password is an expected outcome, not an exceptional one. The caller decides whether to raise, log, increment a failed attempt counter, or do something else entirely.


Known Limitations

  • Refresh token invalidation is not built in you need a database or cache to track and revoke issued refresh tokens
  • Only HS256 (symmetric signing) is supported RS256 (asymmetric keypair) is not yet available
  • No built-in rate limiting on login attempts implement at the application or infrastructure level

Future Improvements

  • RS256 support for asymmetric key signing
  • Built-in token blocklist interface for revocation
  • Role-based access control helpers on GateVault
  • FastAPI and Flask integration packages as optional extras

Contributing

Contributions are welcome. Open an issue first to discuss what you want to change, especially for anything touching the security-sensitive parts.

git clone https://github.com/RichardOyelowo/gatevault
cd gatevault
pip install -e ".[dev]"
pytest tests/ -v

License

Apache 2.0 see LICENSE for details.


Built by Richard for the love of development.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

richard_gatevault-1.0.5.post1.tar.gz (115.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

richard_gatevault-1.0.5.post1-py3-none-any.whl (20.5 kB view details)

Uploaded Python 3

File details

Details for the file richard_gatevault-1.0.5.post1.tar.gz.

File metadata

  • Download URL: richard_gatevault-1.0.5.post1.tar.gz
  • Upload date:
  • Size: 115.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for richard_gatevault-1.0.5.post1.tar.gz
Algorithm Hash digest
SHA256 62be7a8a032c3c4961f7854bf5b0fef76412778c13cfa9c648bbae6655ed4eb2
MD5 650f8c4d919f3f7574857818fbcb4c3f
BLAKE2b-256 a0f2479409e9ba8ba6f4dd63fed544cce981385ea729876fff25811474be808d

See more details on using hashes here.

Provenance

The following attestation bundles were made for richard_gatevault-1.0.5.post1.tar.gz:

Publisher: publish.yml on RichardOyelowo/gatevault

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file richard_gatevault-1.0.5.post1-py3-none-any.whl.

File metadata

File hashes

Hashes for richard_gatevault-1.0.5.post1-py3-none-any.whl
Algorithm Hash digest
SHA256 cbcdb60e805389ed0d2a099a0a4103f579d32b3716f685d53a72089ea2e9b84c
MD5 6f5133202cfe3b99e599aeee0db90a7d
BLAKE2b-256 9596671fd2a6328bd66d1786a651ea76db1daca44656973d8921a4a427d0d914

See more details on using hashes here.

Provenance

The following attestation bundles were made for richard_gatevault-1.0.5.post1-py3-none-any.whl:

Publisher: publish.yml on RichardOyelowo/gatevault

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page