A Python auth library that handles JWT token management, password hashing, OAuth2 login flow, and route protection with full sync and async support so you don't have to wire it together yourself.
Project description
A Python auth library that handles JWT token management, password hashing, OAuth2 login flow, and route protection with full sync and async support so you don't have to wire it together yourself.
Most auth libraries do one thing. PyJWT gives you JWT encoding. bcrypt gives you password hashing. You still have to write the login flow, build the guards, handle the exceptions, and repeat that boilerplate across every project. gatevault wraps all of it into one coherent package with a clean API you can drop into any Python project regardless of framework whether you're using FastAPI with async SQLAlchemy, Flask with a sync ORM, or Django with its built-in ORM.
pip install richard-gatevault
Table of Contents
- Installation
- Quick Start
- The Full Picture
- Password Hashing
- Token Management
- Login Flow
- Protecting Routes
- Exception Handling
- Warnings
- Framework Integration
- Using gatevault in Parts
- Token Refresh & Rotation
- Security Guide
- API Reference
- Design Decisions
- Known Limitations
- Future Improvements
- Contributing
- License
Installation
pip install richard-gatevault
Requires Python 3.9+. Dependencies PyJWT and bcrypt are installed automatically no extra setup needed.
Everything in gatevault is importable from the top level:
from gatevault import (
TokenManager,
OAuthHandler,
GateVault,
hash_password,
verify_password,
)
Quick Start
The fastest path from zero to a working auth setup. This assumes a sync database swap login for async_login and add async def if you're using an async ORM.
import os
from gatevault import TokenManager, OAuthHandler, GateVault, hash_password
from gatevault import InvalidCredentialsError, UnauthorizedError, GuardError
# 1. Initialize once at startup
tm = TokenManager(
secret_key=os.environ["AUTH_SECRET_KEY"],
access_expiry_minutes=15,
refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
oauth = OAuthHandler(token_manager=tm, get_user=lambda u: db.get_user(u))
# 2. Hash passwords at registration
hashed = hash_password(plain_password)
db.save_user(email=email, hashed_password=hashed)
# 3. Login returns tokens
tokens = oauth.login(email, password)
# → {"access_token": "eyJ...", "refresh_token": "eyJ...", "token_type": "bearer"}
# 4. Protect any function with a decorator
@gate.protected
def get_profile(payload=None):
return db.get_user(payload["user_id"])
# Works on async functions too
@gate.protected
async def get_profile_async(payload=None):
return await db.get_user(payload["user_id"])
# 5. Call the protected function with the token
result = get_profile(token=access_token)
result = await get_profile_async(token=access_token)
Choosing the right method
| Situation | Use |
|---|---|
| Sync database (Django ORM, SQLAlchemy sync) | oauth.login(...) |
| Async database (async SQLAlchemy, asyncpg) | await oauth.async_login(...) |
| Protecting a regular function | @gate.protected + def |
| Protecting an async route/function | @gate.protected + async def |
The Full Picture
Here is what a complete auth setup looks like end to end registration, login, token storage, protected routes, and token refresh.
This example uses async_login and async def protected routes for an async SQLAlchemy setup. If you are using a synchronous database, replace async_login with login, async def protected functions with regular def, and remove the async/await keywords from the login route.
import os
from gatevault import (
TokenManager, OAuthHandler, GateVault,
hash_password,
InvalidCredentialsError, UnauthorizedError,
TokenExpiredError, GuardError
)
# ---------------------------------------------------------------------------
# Setup do this once at app startup
# ---------------------------------------------------------------------------
tm = TokenManager(
secret_key=os.environ["AUTH_SECRET_KEY"],
access_expiry_minutes=15,
refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
# ---------------------------------------------------------------------------
# get_user async version for use with async_login
# ---------------------------------------------------------------------------
async def get_user(username: str):
result = await db.execute(select(User).where(User.email == username))
return result.scalar_one_or_none()
oauth = OAuthHandler(token_manager=tm, get_user=get_user)
# ---------------------------------------------------------------------------
# Registration hash and store the password, never store plain text
# ---------------------------------------------------------------------------
async def register(username: str, plain_password: str):
hashed = hash_password(plain_password)
await db.create_user(username=username, hashed_password=hashed)
return {"message": "registered"}
# ---------------------------------------------------------------------------
# Login uses async_login since get_user is async
# Access token goes in the response body.
# Refresh token goes in an httpOnly cookie never in the body.
# ---------------------------------------------------------------------------
async def login(username: str, password: str, response):
try:
tokens = await oauth.async_login(username, password)
except (InvalidCredentialsError, UnauthorizedError):
return {"error": "invalid credentials"}, 401
response.set_cookie(
key="refresh_token",
value=tokens["refresh_token"],
httponly=True,
secure=True,
samesite="strict"
)
return {"access_token": tokens["access_token"]}
# ---------------------------------------------------------------------------
# Protected functions gatevault.protected works on both sync and async
# ---------------------------------------------------------------------------
@gate.protected
async def get_profile(payload=None):
user_id = payload["user_id"]
return await db.get_user(user_id)
@gate.protected
def get_orders(payload=None):
user_id = payload["user_id"]
return db.get_orders(user_id)
# ---------------------------------------------------------------------------
# Token refresh client sends the refresh token via httpOnly cookie
# Server issues a new access token
# ---------------------------------------------------------------------------
async def refresh_route(refresh_token: str):
try:
payload = tm.decode_token(refresh_token)
except TokenExpiredError:
return {"error": "session expired, please log in again"}, 401
except (GuardError, UnauthorizedError):
return {"error": "invalid token"}, 401
if payload["type"] != "refresh":
return {"error": "wrong token type"}, 400
await db.revoke_refresh_token(refresh_token)
new_access = tm.create_access_token(user_id=payload["user_id"])
new_refresh = tm.create_refresh_token(user_id=payload["user_id"])
await db.store_refresh_token(new_refresh, user_id=payload["user_id"])
return {"access_token": new_access}
Password Hashing
gatevault uses bcrypt for password hashing. Passwords are one-way hashed there is no way to reverse a hash back to the original password. If your database is ever compromised, attackers get hashes, not passwords.
Hashing a password
from gatevault import hash_password
hashed = hash_password("user_plain_password")
print(hashed)
# $2b$12$Kq8J3mNrandom...
Always hash at the point of registration and store the result. Never store or log the plain password.
def register(username: str, plain_password: str):
hashed = hash_password(plain_password)
db.insert(username=username, hashed_password=hashed)
return {"message": "account created"}
Verifying a password
from gatevault import verify_password
is_match = verify_password("user_plain_password", stored_hash)
# True or False
verify_password returns a boolean. It never raises on a wrong password it just returns False. What you do with that result is your decision.
from gatevault import verify_password, InvalidCredentialsError, UnauthorizedError
def authenticate(username: str, plain_password: str):
user = db.get_user(username)
if not user:
raise InvalidCredentialsError("user not found")
if not verify_password(plain_password, user.hashed_password):
raise UnauthorizedError("wrong password")
return user
About bcrypt salting
You do not need to manage salts yourself. bcrypt generates a unique random salt for every hash and embeds it in the output string. Two calls to hash_password with the same password produce different hashes both are valid.
h1 = hash_password("same_password")
h2 = hash_password("same_password")
print(h1 == h2) # False different salts
print(verify_password("same_password", h1)) # True
print(verify_password("same_password", h2)) # True
print(verify_password("wrong", h1)) # False
Standalone usage
hash_password and verify_password have no dependency on the rest of gatevault. You can use them without setting up TokenManager or anything else:
from gatevault import hash_password, verify_password
stored = hash_password("mypassword")
if verify_password("mypassword", stored):
print("access granted")
else:
print("access denied")
Token Management
TokenManager handles all JWT creation and verification. It is the core of gatevault OAuthHandler uses it to create tokens and GateVault uses it to verify them. Create one instance at startup and share it across your app.
Setup
import os
from gatevault import TokenManager
tm = TokenManager(
secret_key=os.environ["AUTH_SECRET_KEY"],
access_expiry_minutes=15,
refresh_expiry_days=7
)
The secret_key is what signs your tokens. Anyone with this key can forge valid tokens keep it in an environment variable, never in source code.
To generate a secure key:
python -c "import secrets; print(secrets.token_hex(32))"
Creating tokens
access_token = tm.create_access_token(user_id=42)
refresh_token = tm.create_refresh_token(user_id=42)
Access tokens are short-lived (minutes) sent with every authenticated request. Refresh tokens are long-lived (days) used only to obtain a new access token when the current one expires.
Extra claims
You can embed additional data in the token payload using keyword arguments. This is useful for role-based access control you can check the role from the token itself without a database lookup on every request:
access_token = tm.create_access_token(user_id=42, role="admin", org_id=7)
payload = tm.decode_token(access_token)
print(payload)
# {
# "user_id": 42,
# "exp": 1234567890,
# "type": "access",
# "role": "admin",
# "org_id": 7
# }
@gate.protected
def admin_dashboard(payload=None):
if payload.get("role") != "admin":
raise UnauthorizedError("admin access required")
return get_admin_data()
Decoding tokens
payload = tm.decode_token(token)
user_id = payload["user_id"]
token_type = payload["type"] # "access" or "refresh"
expiry = payload["exp"] # unix timestamp
decode_token verifies the signature and checks expiry in one call. It raises specific exceptions on failure:
from gatevault import TokenExpiredError, InvalidTokenError, TokenDecodeError
try:
payload = tm.decode_token(token)
except TokenExpiredError:
# token has passed its exp time send client to refresh endpoint
return {"error": "token expired"}, 401
except InvalidTokenError:
# signature mismatch token was tampered with
return {"error": "invalid token"}, 401
except TokenDecodeError:
# token string is malformed can't be parsed at all
return {"error": "malformed token"}, 400
Access vs refresh telling them apart
Every token carries a type claim. Always check it on your refresh endpoint you only want refresh tokens there:
payload = tm.decode_token(token)
if payload["type"] != "refresh":
return {"error": "wrong token type"}, 400
TokenManager is shared
OAuthHandler creates tokens. GateVault verifies them. They don't communicate directly the shared TokenManager instance is the trust anchor. Same secret key in, same secret key out.
tm = TokenManager(secret_key=os.environ["AUTH_SECRET_KEY"], access_expiry_minutes=15, refresh_expiry_days=7)
oauth = OAuthHandler(token_manager=tm, get_user=get_user) # uses tm to create tokens
gate = GateVault(token_manager=tm) # uses tm to verify tokens
Login Flow
OAuthHandler wires together user lookup, password verification, and token creation into one call. It follows the OAuth2 Resource Owner Password Credentials flow.
It supports both synchronous and asynchronous user lookup use login for sync databases (SQLAlchemy sync, Django ORM, raw psycopg2) and async_login for async databases (async SQLAlchemy, asyncpg, Tortoise ORM).
Setup
from gatevault import OAuthHandler
def get_user(username: str):
# return a user object with `id` and `hashed_password` attributes
# return None if the user doesn't exist
return db.query(User).filter(User.email == username).first()
oauth = OAuthHandler(token_manager=tm, get_user=get_user)
Your get_user function must return an object with two attributes:
idthe user's identifier, embedded in the token payloadhashed_passwordthe bcrypt hash stored at registration
If your model uses different field names, add a property:
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String)
password_hash = Column(String)
@property
def hashed_password(self):
return self.password_hash # gatevault expects hashed_password
Sync Login
Use login when your get_user function is a regular synchronous function:
tokens = oauth.login("john@example.com", "their_password")
print(tokens)
# {
# "access_token": "eyJhbGci...",
# "refresh_token": "eyJhbGci...",
# "token_type": "bearer"
# }
login does three things in order:
- Calls
get_user(username)raisesInvalidCredentialsErrorifNoneis returned - Calls
verify_password(password, user.hashed_password)raisesUnauthorizedErrorif it returnsFalse - Calls
create_access_tokenandcreate_refresh_tokenraisesGuardErrorif token creation fails
Async Login
Use async_login when your get_user function is defined as async def typically when using async SQLAlchemy, asyncpg, or any async ORM. It performs the exact same three steps as login, but awaits the get_user call.
async def get_user(username: str):
result = await db.execute(select(User).where(User.email == username))
return result.scalar_one_or_none()
oauth = OAuthHandler(token_manager=tm, get_user=get_user)
# must be awaited inside an async context
tokens = await oauth.async_login("john@example.com", "their_password")
print(tokens)
# {
# "access_token": "eyJhbGci...",
# "refresh_token": "eyJhbGci...",
# "token_type": "bearer"
# }
Important: Always match the method to your
get_usertype. Callingasync_loginwith a syncget_userraisesTypeErrorbecause you cannotawaita plain value. Callingloginwith an asyncget_userreturns a coroutine object instead of a user authentication will silently fail. When in doubt, check whether yourget_userisasync def.
What to do with the tokens
The access token goes back to the client in the response body. The refresh token should be set as an httpOnly cookie it is invisible to JavaScript and therefore safe from XSS attacks:
async def login_route(username, password, response):
try:
tokens = await oauth.async_login(username, password)
except (InvalidCredentialsError, UnauthorizedError):
return {"error": "invalid credentials"}, 401
response.set_cookie(
key="refresh_token",
value=tokens["refresh_token"],
httponly=True,
secure=True, # HTTPS only
samesite="strict"
)
return {"access_token": tokens["access_token"]}
The client stores the access token in memory (a JavaScript variable not localStorage) and attaches it to every protected request via the Authorization header:
Authorization: Bearer <access_token>
The refresh token lives in the browser's httpOnly cookie store. The browser sends it automatically on requests to the refresh endpoint the client never touches it directly.
Client-side example:
// After login store access token in memory only
const { access_token } = await fetch("/login", {
method: "POST",
body: JSON.stringify({ username, password })
}).then(r => r.json())
// Every protected request send access token in Authorization header
const profile = await fetch("/profile", {
headers: { "Authorization": `Bearer ${access_token}` }
}).then(r => r.json())
// When access token expires browser sends refresh cookie automatically
const refreshed = await fetch("/refresh", {
method: "POST",
credentials: "include" // tells browser to send the httpOnly cookie
}).then(r => r.json())
const new_access_token = refreshed.access_token
Handling login errors
from gatevault import InvalidCredentialsError, UnauthorizedError, GuardError
try:
tokens = await oauth.async_login(username, password)
except InvalidCredentialsError:
return {"error": "invalid credentials"}, 401
except UnauthorizedError:
return {"error": "invalid credentials"}, 401
except GuardError:
return {"error": "authentication failed"}, 500
Return the same error message for both
InvalidCredentialsErrorandUnauthorizedError. Distinguishing between them tells an attacker which usernames exist in your system.
Protecting Routes
GateVault wraps any function sync or async with token verification. The wrapped function never executes if the token is missing, expired, or invalid. On success, the decoded payload is injected as the payload keyword argument.
gate.protected automatically detects whether the decorated function is a coroutine (async def) and applies the appropriate wrapper. You use the same decorator for both.
Setup
from gatevault import GateVault
gate = GateVault(token_manager=tm)
Sync Protection
@gate.protected
def get_profile(payload=None):
user_id = payload["user_id"]
return db.get_user(user_id)
# call with token= keyword argument
result = get_profile(token="eyJhbGci...")
Async Protection
@gate.protected
async def get_profile(payload=None):
user_id = payload["user_id"]
return await db.get_user(user_id)
# await the call gatevault detected async def and returns a coroutine
result = await get_profile(token="eyJhbGci...")
The token extraction and verification logic is identical in both cases. The only difference is that the async wrapper awaits the decorated function after injecting the payload.
How the token reaches your function
In a real app, the client sends the token in the Authorization header. Your framework gives you access to that header. You extract the token string and pass it to the protected function:
# FastAPI example
@app.get("/profile")
async def profile_route(authorization: str = Header(...)):
token = authorization.replace("Bearer ", "")
try:
return await get_profile(token=token)
except (GuardError, UnauthorizedError):
raise HTTPException(status_code=401, detail="unauthorized")
Multiple protected routes
@gate.protected
async def get_profile(payload=None):
return await db.get_user(payload["user_id"])
@gate.protected
async def get_orders(payload=None):
return await db.get_orders(payload["user_id"])
@gate.protected
async def update_settings(settings: dict, payload=None):
return await db.update_settings(payload["user_id"], settings)
Passing arguments alongside the token
Your function can accept any arguments alongside payload:
@gate.protected
async def get_post(post_id: int, payload=None):
user_id = payload["user_id"]
post = await db.get_post(post_id)
if post.owner_id != user_id:
raise UnauthorizedError("not your post")
return post
result = await get_post(post_id=7, token="eyJhbGci...")
Role-based access using claims
@gate.protected
def admin_only(payload=None):
if payload.get("role") != "admin":
raise UnauthorizedError("admin access required")
return get_admin_data()
@gate.protected
async def moderator_or_above(payload=None):
if payload.get("role") not in ("admin", "moderator"):
raise UnauthorizedError("insufficient permissions")
return await get_mod_tools()
Handling guard errors
from gatevault import GuardError, UnauthorizedError
try:
result = await get_profile(token=incoming_token)
except GuardError as e:
# token missing, expired, or malformed
return {"error": str(e)}, 401
except UnauthorizedError as e:
# invalid signature or permission check failed inside the function
return {"error": str(e)}, 403
Exception Handling
All gatevault exceptions inherit from GatevaultError. Catch broadly or specifically depending on what you need.
GatevaultError
├── TokenError
│ ├── TokenExpiredError
│ ├── InvalidTokenError
│ └── TokenDecodeError
├── HashingError
└── GuardError
├── UnauthorizedError
└── InvalidCredentialsError
Importing exceptions
from gatevault import (
GatevaultError,
TokenError,
TokenExpiredError,
InvalidTokenError,
TokenDecodeError,
HashingError,
GuardError,
UnauthorizedError,
InvalidCredentialsError,
)
Catching broadly one handler for everything
try:
tokens = await oauth.async_login(username, password)
except GatevaultError as e:
return {"error": str(e)}, 401
Catching specifically different response per failure
try:
payload = tm.decode_token(token)
except TokenExpiredError:
# access token expired tell client to use refresh token
return {"error": "token expired", "action": "refresh"}, 401
except InvalidTokenError:
# signature mismatch possible tampering, do not trust
return {"error": "invalid token"}, 401
except TokenDecodeError:
# token string is completely malformed bad format
return {"error": "malformed token"}, 400
Catching by category
try:
payload = tm.decode_token(token)
except TokenError:
# catches all three: TokenExpiredError, InvalidTokenError, TokenDecodeError
return {"error": "token error"}, 401
try:
tokens = await oauth.async_login(username, password)
except GuardError:
# catches InvalidCredentialsError and UnauthorizedError
return {"error": "authentication failed"}, 401
Real-world error handling pattern
from gatevault import (
InvalidCredentialsError, UnauthorizedError,
TokenExpiredError, TokenDecodeError, InvalidTokenError, GuardError
)
async def handle_login(username: str, password: str):
try:
tokens = await oauth.async_login(username, password)
return {"access_token": tokens["access_token"]}, 200
except (InvalidCredentialsError, UnauthorizedError):
return {"error": "invalid credentials"}, 401
except GuardError:
return {"error": "authentication failed"}, 500
async def handle_protected_request(token: str):
try:
return await get_profile(token=token)
except TokenExpiredError:
return {"error": "token expired", "action": "refresh"}, 401
except (InvalidTokenError, TokenDecodeError, GuardError):
return {"error": "unauthorized"}, 401
except UnauthorizedError:
return {"error": "forbidden"}, 403
Warnings
ShortKeyWarning
Issued at TokenManager creation if the secret key is shorter than 32 bytes. HS256 requires at least 32 bytes per RFC 7518. This is a warning, not an error your app will still run, but your tokens will be less secure.
import warnings
from gatevault import TokenManager, ShortKeyWarning
# Suppress in tests where key length doesn't matter
warnings.filterwarnings("ignore", category=ShortKeyWarning)
tm = TokenManager(secret_key="short", access_expiry_minutes=15, refresh_expiry_days=7)
Treating as an error in CI
# Catch misconfigurations early fail the build if key is too short
warnings.filterwarnings("error", category=ShortKeyWarning)
Framework Integration
gatevault is framework-agnostic. The same library works across FastAPI, Flask, and Django you wire it into each framework's request/response cycle in the same way: initialize once at startup, define protected functions with @gate.protected, and pass the token from the Authorization header into the protected function at request time.
| Framework | Recommended login method | Async support |
|---|---|---|
| FastAPI + async SQLAlchemy | async_login |
Full use async def protected functions |
| FastAPI + sync DB | login |
N/A |
| Flask | login |
Not applicable (Flask is sync) |
| Django (sync ORM) | login |
N/A |
| Django 4.1+ async views | async_login |
Full use async def protected functions |
| Django REST Framework | login |
N/A |
FastAPI (Async)
The recommended FastAPI setup uses async_login, async def protected functions, and async SQLAlchemy:
import os
from fastapi import FastAPI, HTTPException, Response, Cookie, Depends, Header
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from gatevault import TokenManager, OAuthHandler, GateVault, hash_password
from gatevault import (
InvalidCredentialsError, UnauthorizedError,
GuardError, TokenExpiredError, InvalidTokenError, TokenDecodeError
)
app = FastAPI()
tm = TokenManager(
secret_key=os.environ["AUTH_SECRET_KEY"],
access_expiry_minutes=15,
refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
# Protected functions async, defined once, reused across routes
@gate.protected
async def get_current_profile(payload=None):
return await db.get_user(payload["user_id"])
@gate.protected
async def get_user_orders(payload=None):
return await db.get_orders(payload["user_id"])
# Registration
@app.post("/register")
async def register(body: UserCreate, db: AsyncSession = Depends(get_db)):
hashed = hash_password(body.password)
new_user = User(email=body.email, hashed_password=hashed)
db.add(new_user)
await db.commit()
return {"message": "registered"}
# Login async DB lookup requires async_login
@app.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
response: Response = None,
db: AsyncSession = Depends(get_db)
):
async def _get_user(username: str):
result = await db.execute(select(User).where(User.email == username))
return result.scalar_one_or_none()
oauth = OAuthHandler(token_manager=tm, get_user=_get_user)
try:
tokens = await oauth.async_login(form_data.username, form_data.password)
except (InvalidCredentialsError, UnauthorizedError):
raise HTTPException(status_code=401, detail="invalid credentials")
response.set_cookie(
key="refresh_token",
value=tokens["refresh_token"],
httponly=True,
secure=True,
samesite="strict"
)
return {"access_token": tokens["access_token"], "token_type": "bearer"}
# Protected routes extract token from header, pass to protected function
@app.get("/profile")
async def profile(authorization: str = Header(...)):
token = authorization.replace("Bearer ", "")
try:
return await get_current_profile(token=token)
except (GuardError, UnauthorizedError):
raise HTTPException(status_code=401, detail="unauthorized")
# Token refresh
@app.post("/refresh")
async def refresh(response: Response, refresh_token: str = Cookie(...)):
try:
payload = tm.decode_token(refresh_token)
except TokenExpiredError:
raise HTTPException(status_code=401, detail="session expired")
except (InvalidTokenError, TokenDecodeError):
raise HTTPException(status_code=401, detail="invalid token")
if payload["type"] != "refresh":
raise HTTPException(status_code=400, detail="wrong token type")
new_access = tm.create_access_token(user_id=payload["user_id"])
new_refresh = tm.create_refresh_token(user_id=payload["user_id"])
response.set_cookie(
key="refresh_token",
value=new_refresh,
httponly=True,
secure=True,
samesite="strict"
)
return {"access_token": new_access}
# Logout clear the refresh token cookie
@app.post("/logout")
def logout(response: Response, refresh_token: str = Cookie(None)):
response.delete_cookie("refresh_token")
return {"message": "logged out"}
FastAPI (Sync)
If you are using a synchronous database driver, use login and regular def protected functions:
def get_user_from_db(username: str):
return db.query(User).filter(User.email == username).first()
oauth = OAuthHandler(token_manager=tm, get_user=get_user_from_db)
@gate.protected
def get_current_profile(payload=None):
return db.get_user(payload["user_id"])
@app.post("/login")
def login(form_data: OAuth2PasswordRequestForm = Depends(), response: Response = None):
try:
tokens = oauth.login(form_data.username, form_data.password)
except (InvalidCredentialsError, UnauthorizedError):
raise HTTPException(status_code=401, detail="invalid credentials")
response.set_cookie(
key="refresh_token",
value=tokens["refresh_token"],
httponly=True,
secure=True,
samesite="strict"
)
return {"access_token": tokens["access_token"], "token_type": "bearer"}
@app.get("/profile")
def profile(authorization: str = Header(...)):
token = authorization.replace("Bearer ", "")
try:
return get_current_profile(token=token)
except (GuardError, UnauthorizedError):
raise HTTPException(status_code=401, detail="unauthorized")
Flask
import os
from flask import Flask, request, jsonify, make_response
from gatevault import TokenManager, OAuthHandler, GateVault, hash_password
from gatevault import (
InvalidCredentialsError, UnauthorizedError,
GuardError, TokenExpiredError, InvalidTokenError, TokenDecodeError
)
app = Flask(__name__)
tm = TokenManager(
secret_key=os.environ["AUTH_SECRET_KEY"],
access_expiry_minutes=15,
refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
oauth = OAuthHandler(token_manager=tm, get_user=get_user_from_db)
def get_token():
return request.headers.get("Authorization", "").replace("Bearer ", "")
@gate.protected
def get_current_profile(payload=None):
return db.get_user(payload["user_id"])
@app.post("/register")
def register():
data = request.json
hashed = hash_password(data["password"])
db.create_user(username=data["username"], hashed_password=hashed)
return jsonify({"message": "registered"})
@app.post("/login")
def login():
data = request.json
try:
tokens = oauth.login(data["username"], data["password"])
except (InvalidCredentialsError, UnauthorizedError):
return jsonify({"error": "invalid credentials"}), 401
response = make_response(jsonify({"access_token": tokens["access_token"]}))
response.set_cookie(
"refresh_token",
tokens["refresh_token"],
httponly=True,
secure=True,
samesite="Strict"
)
return response
@app.get("/profile")
def profile():
try:
return jsonify(get_current_profile(token=get_token()))
except (GuardError, UnauthorizedError):
return jsonify({"error": "unauthorized"}), 401
@app.post("/refresh")
def refresh():
refresh_token = request.cookies.get("refresh_token")
if not refresh_token:
return jsonify({"error": "no refresh token"}), 401
try:
payload = tm.decode_token(refresh_token)
except TokenExpiredError:
return jsonify({"error": "session expired"}), 401
except (InvalidTokenError, TokenDecodeError):
return jsonify({"error": "invalid token"}), 401
if payload["type"] != "refresh":
return jsonify({"error": "wrong token type"}), 400
new_access = tm.create_access_token(user_id=payload["user_id"])
new_refresh = tm.create_refresh_token(user_id=payload["user_id"])
response = make_response(jsonify({"access_token": new_access}))
response.set_cookie(
"refresh_token", new_refresh,
httponly=True, secure=True, samesite="Strict"
)
return response
@app.post("/logout")
def logout():
response = make_response(jsonify({"message": "logged out"}))
response.delete_cookie("refresh_token")
return response
Django
gatevault works with Django's sync ORM out of the box. Use login with a sync get_user. For async views (Django 4.1+), use async_login with an async get_user.
Setup auth/gatevault_setup.py
Create a dedicated setup file and import from it across your views. This avoids reinitializing gatevault objects on every request.
# auth/gatevault_setup.py
import os
from gatevault import TokenManager, OAuthHandler, GateVault
tm = TokenManager(
secret_key=os.environ["AUTH_SECRET_KEY"],
access_expiry_minutes=15,
refresh_expiry_days=7
)
gate = GateVault(token_manager=tm)
def get_user_from_db(username: str):
from myapp.models import User
try:
return User.objects.get(email=username)
except User.DoesNotExist:
return None
oauth = OAuthHandler(token_manager=tm, get_user=get_user_from_db)
Views auth/views.py
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST, require_GET
from gatevault import hash_password
from gatevault import (
InvalidCredentialsError, UnauthorizedError,
GuardError, TokenExpiredError, InvalidTokenError, TokenDecodeError
)
from .gatevault_setup import tm, oauth, gate
from .models import User
# Protected functions defined once, called from any view
@gate.protected
def get_current_profile(payload=None):
return User.objects.get(id=payload["user_id"])
# Registration
@csrf_exempt
@require_POST
def register(request):
data = json.loads(request.body)
hashed = hash_password(data["password"])
User.objects.create(email=data["email"], hashed_password=hashed)
return JsonResponse({"message": "registered"}, status=201)
# Login
@csrf_exempt
@require_POST
def login(request):
data = json.loads(request.body)
try:
tokens = oauth.login(data["username"], data["password"])
except (InvalidCredentialsError, UnauthorizedError):
return JsonResponse({"error": "invalid credentials"}, status=401)
response = JsonResponse({"access_token": tokens["access_token"], "token_type": "bearer"})
response.set_cookie(
"refresh_token",
tokens["refresh_token"],
httponly=True,
secure=True, # set to False in local development
samesite="Strict"
)
return response
# Protected route extract token from header, pass to protected function
@require_GET
def profile(request):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
try:
user = get_current_profile(token=token)
return JsonResponse({"id": user.id, "email": user.email})
except GuardError:
return JsonResponse({"error": "unauthorized"}, status=401)
except UnauthorizedError:
return JsonResponse({"error": "forbidden"}, status=403)
# Token refresh reads refresh token from httpOnly cookie
@csrf_exempt
@require_POST
def refresh(request):
refresh_token = request.COOKIES.get("refresh_token")
if not refresh_token:
return JsonResponse({"error": "no refresh token"}, status=401)
try:
payload = tm.decode_token(refresh_token)
except TokenExpiredError:
return JsonResponse({"error": "session expired"}, status=401)
except (InvalidTokenError, TokenDecodeError):
return JsonResponse({"error": "invalid token"}, status=401)
if payload["type"] != "refresh":
return JsonResponse({"error": "wrong token type"}, status=400)
new_access = tm.create_access_token(user_id=payload["user_id"])
new_refresh = tm.create_refresh_token(user_id=payload["user_id"])
response = JsonResponse({"access_token": new_access})
response.set_cookie(
"refresh_token", new_refresh,
httponly=True, secure=True, samesite="Strict"
)
return response
# Logout clear the refresh token cookie
@csrf_exempt
@require_POST
def logout(request):
response = JsonResponse({"message": "logged out"})
response.delete_cookie("refresh_token")
return response
URL configuration auth/urls.py
from django.urls import path
from . import views
urlpatterns = [
path("register/", views.register),
path("login/", views.login),
path("refresh/", views.refresh),
path("logout/", views.logout),
path("profile/", views.profile),
]
Django REST Framework
If you are using DRF, the same gate.protected functions work inside APIView or @api_view extract the token from the Authorization header and pass it in:
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
from gatevault import GuardError, UnauthorizedError
from .gatevault_setup import gate
@gate.protected
def get_current_profile(payload=None):
from myapp.models import User
return User.objects.get(id=payload["user_id"])
@api_view(["GET"])
def profile(request):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
try:
user = get_current_profile(token=token)
return Response({"id": user.id, "email": user.email})
except GuardError:
return Response({"error": "unauthorized"}, status=status.HTTP_401_UNAUTHORIZED)
except UnauthorizedError:
return Response({"error": "forbidden"}, status=status.HTTP_403_FORBIDDEN)
You can also use APIView for class-based views:
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
class ProfileView(APIView):
def get(self, request):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
try:
user = get_current_profile(token=token)
return Response({"id": user.id, "email": user.email})
except (GuardError, UnauthorizedError):
return Response({"error": "unauthorized"}, status=status.HTTP_401_UNAUTHORIZED)
Async Django views (Django 4.1+)
For async Django views, define an async get_user and use async_login:
# auth/gatevault_setup.py async variant
from django.contrib.auth import get_user_model
async def get_user_async(username: str):
User = get_user_model()
try:
return await User.objects.aget(email=username)
except User.DoesNotExist:
return None
oauth_async = OAuthHandler(token_manager=tm, get_user=get_user_async)
# auth/views.py async login view
import json
from django.http import JsonResponse
from .gatevault_setup import oauth_async
from gatevault import InvalidCredentialsError, UnauthorizedError
async def login_async(request):
if request.method != "POST":
return JsonResponse({"error": "method not allowed"}, status=405)
data = json.loads(request.body)
try:
tokens = await oauth_async.async_login(data["username"], data["password"])
except (InvalidCredentialsError, UnauthorizedError):
return JsonResponse({"error": "invalid credentials"}, status=401)
response = JsonResponse({"access_token": tokens["access_token"], "token_type": "bearer"})
response.set_cookie(
"refresh_token", tokens["refresh_token"],
httponly=True, secure=True, samesite="Strict"
)
return response
Using gatevault in Parts
Just Hashing
from gatevault import hash_password, verify_password
stored = hash_password("user_password")
if verify_password("user_password", stored):
print("access granted")
else:
print("access denied")
Just Tokens
from gatevault import TokenManager
from gatevault import TokenExpiredError, InvalidTokenError, TokenDecodeError
tm = TokenManager(
secret_key="your-very-secure-secret-key-32-bytes",
access_expiry_minutes=30,
refresh_expiry_days=14
)
# Create tokens after your own auth check
access = tm.create_access_token(user_id=1, role="admin", org_id=7)
refresh = tm.create_refresh_token(user_id=1)
# Decode and verify
try:
payload = tm.decode_token(access)
print(payload["user_id"]) # 1
print(payload["role"]) # "admin"
print(payload["type"]) # "access"
except TokenExpiredError:
print("expired")
except InvalidTokenError:
print("tampered")
except TokenDecodeError:
print("malformed")
Just Guards
from gatevault import TokenManager, GateVault
from gatevault import GuardError, UnauthorizedError
tm = TokenManager(secret_key="...", access_expiry_minutes=15, refresh_expiry_days=7)
gate = GateVault(token_manager=tm)
# Works on both sync and async functions
@gate.protected
def get_dashboard(payload=None):
return {"user_id": payload["user_id"]}
@gate.protected
async def get_dashboard_async(payload=None):
return {"user_id": payload["user_id"]}
try:
result = get_dashboard(token=incoming_token)
result = await get_dashboard_async(token=incoming_token)
except GuardError:
return {"error": "unauthorized"}, 401
except UnauthorizedError:
return {"error": "forbidden"}, 403
Token Refresh & Rotation
gatevault creates tokens on demand but does not manage storage or invalidation that lives in your application. Without a refresh token store, a stolen refresh token is valid until it naturally expires, potentially days later.
Why you need a refresh token store
With a store you can:
- Revoke tokens immediately on logout
- Detect token reuse (a sign of theft)
- Force re-login on password change or suspicious activity
A minimal SQL table for this:
CREATE TABLE refresh_tokens (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP NOT NULL
);
Full rotation pattern
from gatevault import TokenExpiredError, InvalidTokenError, TokenDecodeError
def rotate_tokens(refresh_token: str):
# 1. Verify the refresh token
try:
payload = tm.decode_token(refresh_token)
except TokenExpiredError:
return {"error": "session expired, please log in again"}, 401
except (InvalidTokenError, TokenDecodeError):
return {"error": "invalid token"}, 401
# 2. Confirm it is a refresh token
if payload["type"] != "refresh":
return {"error": "wrong token type"}, 400
# 3. Check for reuse if already rotated, someone may have stolen it
if not db.is_refresh_token_valid(refresh_token):
db.revoke_all_tokens_for_user(payload["user_id"])
return {"error": "token reuse detected, please log in again"}, 401
# 4. Invalidate the old token
db.revoke_refresh_token(refresh_token)
# 5. Issue new pair
new_access = tm.create_access_token(user_id=payload["user_id"])
new_refresh = tm.create_refresh_token(user_id=payload["user_id"])
# 6. Store the new refresh token
db.store_refresh_token(new_refresh, user_id=payload["user_id"])
return {"access_token": new_access, "refresh_token": new_refresh}
Security Guide
Secret key
- Use at least 32 bytes gatevault warns you if you don't
- Generate with:
python -c "import secrets; print(secrets.token_hex(32))" - Store in an environment variable never hardcode in source
- Rotating the key invalidates all existing tokens immediately plan accordingly
Token storage on the client
| Token | Where to store | Why |
|---|---|---|
| Access token | Memory (JS variable) | Short-lived, wiped on tab close, never persisted |
| Refresh token | httpOnly cookie | Invisible to JavaScript safe from XSS |
Never store tokens in localStorage any JavaScript running on the page, including injected scripts, can read it.
What not to put in the payload
The JWT payload is base64 encoded, not encrypted. Anyone with the token string can decode and read it:
# Fine identifiers and non-sensitive metadata
tm.create_access_token(user_id=42, role="admin", org_id=7)
# Never do this readable by anyone
tm.create_access_token(user_id=42, email="john@example.com", password_hash="$2b$...")
Login enumeration
Return the same error for "user not found" and "wrong password":
# Good attacker learns nothing
except (InvalidCredentialsError, UnauthorizedError):
return {"error": "invalid credentials"}, 401
# Bad confirms the username exists
except InvalidCredentialsError:
return {"error": "user not found"}, 404
API Reference
TokenManager(secret_key, access_expiry_minutes, refresh_expiry_days)
| Parameter | Type | Description |
|---|---|---|
secret_key |
str |
Secret for signing tokens. Minimum 32 bytes recommended. |
access_expiry_minutes |
int |
Access token lifetime in minutes. |
refresh_expiry_days |
int |
Refresh token lifetime in days. |
| Method | Returns | Description |
|---|---|---|
create_access_token(user_id, **kwargs) |
str |
Creates a signed access token. Extra kwargs embedded in payload. |
create_refresh_token(user_id, **kwargs) |
str |
Creates a signed refresh token. Extra kwargs embedded in payload. |
decode_token(token) |
dict |
Verifies signature and expiry. Returns payload dict. Raises on failure. |
OAuthHandler(token_manager, get_user)
| Parameter | Type | Description |
|---|---|---|
token_manager |
TokenManager |
Configured TokenManager instance. |
get_user |
Callable[[str], Any | None] |
User lookup function. Sync or async. Must return object with id and hashed_password, or None. |
| Method | Returns | Description |
|---|---|---|
login(username, password) |
dict |
Authenticates synchronously. Use when get_user is a regular function. Returns {"access_token", "refresh_token", "token_type"}. |
async_login(username, password) |
Coroutine[dict] |
Authenticates asynchronously. Use when get_user is async def. Must be called with await. |
GateVault(token_manager)
| Parameter | Type | Description |
|---|---|---|
token_manager |
TokenManager |
Configured TokenManager instance. |
| Method | Returns | Description |
|---|---|---|
protected(f) |
Callable |
Decorator. Verifies token before executing f. Works on both sync and async functions. Injects decoded payload as payload kwarg on success. |
hash_password(plain) -> str
| Parameter | Type | Description |
|---|---|---|
plain |
str |
Plain text password. |
Returns bcrypt hash string. Raises HashingError on unexpected failure.
verify_password(plain, hashed) -> bool
| Parameter | Type | Description |
|---|---|---|
plain |
str |
Plain text password to check. |
hashed |
str |
Stored bcrypt hash. |
Returns True if match, False otherwise. Never raises on wrong password.
Design Decisions
Framework-agnostic
Tying gatevault to FastAPI or Flask would limit who can use it. Auth logic hashing, signing, verifying has nothing to do with HTTP. Pure Python means it works anywhere Python runs.
Both sync and async throughout
async_login and async-aware gate.protected were added to support async ORMs like async SQLAlchemy without requiring a separate library or workaround. The sync and async paths are intentionally separate in OAuthHandler calling the wrong one for your get_user type fails loudly rather than silently misbehaving. gate.protected unifies both under one decorator by detecting the function type at decoration time.
Class-based TokenManager
The secret key and expiry settings are configuration they belong on an instance, not passed into every function call. Configure once at startup, share everywhere without threading arguments through every call.
Shared TokenManager across OAuthHandler and GateVault
OAuthHandler creates tokens. GateVault verifies them. The shared TokenManager instance is the trust anchor same secret key in, same secret key out.
Payload injected as a keyword argument
payload=payload is explicit. The decorated function always knows where its auth data comes from. Positional injection would silently break functions whose arguments don't match the expected order.
Wrapping third-party exceptions
PyJWT and bcrypt exceptions never surface through the gatevault API. Consumers only need to know gatevault exceptions. If an underlying library changes its exception names in a future version, only gatevault updates.
verify_password returns bool, not raises
A wrong password is an expected outcome, not an exceptional one. The caller decides whether to raise, log, increment a failed attempt counter, or do something else entirely.
Known Limitations
- Refresh token invalidation is not built in you need a database or cache to track and revoke issued refresh tokens
- Only HS256 (symmetric signing) is supported RS256 (asymmetric keypair) is not yet available
- No built-in rate limiting on login attempts implement at the application or infrastructure level
Future Improvements
- RS256 support for asymmetric key signing
- Built-in token blocklist interface for revocation
- Role-based access control helpers on
GateVault - FastAPI and Flask integration packages as optional extras
Contributing
Contributions are welcome. Open an issue first to discuss what you want to change, especially for anything touching the security-sensitive parts.
git clone https://github.com/RichardOyelowo/gatevault
cd gatevault
pip install -e ".[dev]"
pytest tests/ -v
License
Apache 2.0 see LICENSE for details.
Built by Richard for the love of development.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file richard_gatevault-1.0.5.tar.gz.
File metadata
- Download URL: richard_gatevault-1.0.5.tar.gz
- Upload date:
- Size: 114.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2c613e46e3d29c296055686fdf4826299c5be23fcbd65956b882533b08045fa6
|
|
| MD5 |
1f6be00c181a054af33c33d2bd040a98
|
|
| BLAKE2b-256 |
3d6a845f9c968c3c2d6a9ff4e68f0b9cd3bb11d2913f569d24591d01cd687a12
|
Provenance
The following attestation bundles were made for richard_gatevault-1.0.5.tar.gz:
Publisher:
publish.yml on RichardOyelowo/gatevault
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
richard_gatevault-1.0.5.tar.gz -
Subject digest:
2c613e46e3d29c296055686fdf4826299c5be23fcbd65956b882533b08045fa6 - Sigstore transparency entry: 1359620827
- Sigstore integration time:
-
Permalink:
RichardOyelowo/gatevault@94cd275b24f45d3e731b1bccb390ac3160ba1389 -
Branch / Tag:
refs/tags/v1.0.5 - Owner: https://github.com/RichardOyelowo
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@94cd275b24f45d3e731b1bccb390ac3160ba1389 -
Trigger Event:
push
-
Statement type:
File details
Details for the file richard_gatevault-1.0.5-py3-none-any.whl.
File metadata
- Download URL: richard_gatevault-1.0.5-py3-none-any.whl
- Upload date:
- Size: 20.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
642b4655425e28ce074e144658608790d0c22514b88affe198fb51562d468d3c
|
|
| MD5 |
0483fca9690bd4ef486b3b9665d9bb72
|
|
| BLAKE2b-256 |
7879dfcff240b52ee481d4d9a2d2cfe5239e8389bcde4ab9563d4681ab6add77
|
Provenance
The following attestation bundles were made for richard_gatevault-1.0.5-py3-none-any.whl:
Publisher:
publish.yml on RichardOyelowo/gatevault
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
richard_gatevault-1.0.5-py3-none-any.whl -
Subject digest:
642b4655425e28ce074e144658608790d0c22514b88affe198fb51562d468d3c - Sigstore transparency entry: 1359620996
- Sigstore integration time:
-
Permalink:
RichardOyelowo/gatevault@94cd275b24f45d3e731b1bccb390ac3160ba1389 -
Branch / Tag:
refs/tags/v1.0.5 - Owner: https://github.com/RichardOyelowo
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@94cd275b24f45d3e731b1bccb390ac3160ba1389 -
Trigger Event:
push
-
Statement type: