This project/library contains common elements related to Redis integration...
Project description
core-redis
This project/library contains common elements related to Redis integration.
Installation
pip install core-redis
uv pip install core-redis # or using UV
Features
RedisClient: thin connection wrapper that decouples the ecosystem from the underlying redis library.
cache_redis_based: write-through caching decorator backed by Redis (L2) with an in-memory LRU as L1.
FixedWindow: fixed-window rate limiter backed by Redis.
SlidingWindowLog: sliding-window log rate limiter backed by Redis; eliminates the burst problem at the cost of per-request sorted-set writes.
TokenBucket: token-bucket rate limiter backed by Redis; supports bursts up to bucket capacity while enforcing a smooth long-term refill rate.
LeakyBucket: leaky-bucket rate limiter backed by Redis; enforces a strictly constant output rate by queuing requests and draining at a fixed leak rate.
RedisClient
RedisClient abstracts the redis library so the rest of the ecosystem never imports it directly. The connection is created lazily on first use and is thread-safe.
from core_redis import RedisClient
client = RedisClient(host="localhost", port=6379, db=0)
client.set("key", b"value", ex=60) # store with 60 s TTL
data = client.get("key") # b"value" or None
count = client.delete("key") # 1
n = client.exists("key", "other") # 0–N
alive = client.ping() # True
Additional keyword arguments are forwarded verbatim to redis.Redis (e.g. ssl=True, socket_timeout=5).
cache_redis_based
Write-through caching decorator: L1 is a bounded in-memory LRU; L2 is Redis. TTL is handled natively by Redis (SET … EX), so no background threads or manual expiry are needed.
from core_redis.decorators import cache_redis_based
@cache_redis_based(
key_prefix="myapp/",
ttl=3600,
redis_kwargs={"host": "localhost", "port": 6379},
)
def fetch_reference_data(dataset: str) -> dict:
...
Rate Limiters
Rate-limiting algorithms that count requests in Redis and reject traffic once a threshold is reached. Each algorithm lives in core_redis.rate_limits.
FixedWindow
Divides time into fixed-size buckets and tracks a request counter per bucket. A request is allowed while the counter is within limit; once the bucket rolls over the counter resets.
from core_redis.rate_limits import FixedWindow
limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
allowed = limiter.is_allowed("user_123", limit=100, window=60)
The counter is incremented and the TTL is set in a single Redis pipeline call (INCR + EXPIRE), keeping round-trips to one per request.
A common pattern is to guard outbound HTTP calls so a client never exceeds an upstream API’s rate limit:
import requests
from core_redis.rate_limits import FixedWindow
limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
def call_api(user_id: str) -> None:
if not limiter.is_allowed(user_id, limit=100, window=60):
print(f"[{user_id}] BLOCKED —> rate limit exceeded")
return
response = requests.get("https://api.example.com/data", timeout=5)
print(f"[{user_id}] {response.status_code}")
SlidingWindowLog
Stores a timestamp for every request in a Redis sorted set. On each call, entries older than now − window are pruned before counting, so the window always reflects exactly the last window seconds, the burst problem does not occur.
Returns a (allowed, remaining) tuple so callers know how many slots are left without a second round-trip.
from core_redis.rate_limits import SlidingWindowLog
limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60)
if not allowed:
print("Rate limit exceeded")
else:
print(f"{remaining} requests remaining in this window")
The same HTTP-guard pattern works here:
import requests
from core_redis.rate_limits import SlidingWindowLog
limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
def call_api(user_id: str) -> None:
allowed, remaining = limiter.is_allowed(user_id, limit=100, window=60)
if not allowed:
print(f"[{user_id}] BLOCKED —> rate limit exceeded")
return
response = requests.get("https://api.example.com/data", timeout=5)
print(f"[{user_id}] {response.status_code} ({remaining} remaining)")
TokenBucket
Maintains a virtual token bucket per identifier in a Redis hash. Tokens refill continuously at refill_rate per second up to capacity. Each request consumes tokens_per_request tokens. A request is allowed when the bucket has enough tokens; otherwise it is rejected.
Returns a (allowed, available_tokens) tuple.
from core_redis.rate_limits import TokenBucket
limiter = TokenBucket(redis_kwargs={"host": "localhost", "port": 6379})
allowed, tokens = limiter.is_allowed(
"user_123",
capacity=100, # max burst size
refill_rate=10.0, # tokens added per second
)
if not allowed:
print(f"Rate limited —> {tokens} tokens available")
else:
print(f"Allowed —> {tokens} tokens remaining")
Variable-cost operations are supported via tokens_per_request:
# A bulk export costs 10 tokens; a lightweight read costs 1
allowed, tokens = limiter.is_allowed(
"user_123", capacity=100, refill_rate=10.0, tokens_per_request=10
)
LeakyBucket
Maintains a virtual queue per identifier in a Redis hash. Incoming requests fill the queue; the queue drains at a fixed leak_rate requests per second regardless of arrival rate. A request is accepted when the queue has room; otherwise it is rejected immediately. Unlike TokenBucket, the output rate is strictly constant, bursts are absorbed into the queue and processed at the leak rate, never served faster.
Returns a (allowed, available) tuple where available is the number of free queue slots after this request (0 when blocked).
from core_redis.rate_limits import LeakyBucket
limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})
allowed, available = limiter.is_allowed(
"user_123",
capacity=100, # max queue depth
leak_rate=10.0, # requests drained per second
)
if not allowed:
print("Queue full - retry later")
else:
print(f"Queued - {available} slots remaining")
HTTP-guard pattern:
import requests
from core_redis.rate_limits import LeakyBucket
limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})
def call_api(user_id: str) -> None:
allowed, available = limiter.is_allowed(user_id, capacity=100, leak_rate=10.0)
if not allowed:
print(f"[{user_id}] BLOCKED - queue full")
return
response = requests.get("https://api.example.com/data", timeout=5)
print(f"[{user_id}] {response.status_code} ({available} slots remaining)")
Local Redis with Docker
Start a Redis server on the default port:
docker run -d --name redis-local -p 6379:6379 redis:latest
Stop and remove it when done:
docker stop redis-local && docker rm redis-local
Setting Up for Development
pip install --upgrade pip
pip install virtualenv
virtualenv --python=python3.12 .venv
source .venv/bin/activate
pip install -e ".[dev]"
Running Tests
python manager.py run-tests # unit tests
python manager.py run-tests --test-type integration
python manager.py run-coverage # unit + coverage
Functional tests require a running Redis server (see Local Redis with Docker):
# defaults: REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=15
python manager.py run-tests --test-type functional --pattern "*.py"
Contributing
Contributions are welcome! Please:
Fork the repository
Create a feature branch
Write tests for new functionality
Ensure all tests pass: python manager.py run-tests
Run linting: pylint core_redis
Run security checks: bandit -r core_redis
Submit a pull request
License
This project is licensed under the MIT License. See the LICENSE file for details.
Links
Support
For questions or support, please open an issue on GitLab or contact the maintainers.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file core_redis-1.2.0.tar.gz.
File metadata
- Download URL: core_redis-1.2.0.tar.gz
- Upload date:
- Size: 18.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b0bf8e852a76ee561d9392acd56d42d8a5f33b483c8943668892f5cf71eee7da
|
|
| MD5 |
23a74f20447c077e98b40b9dfa274121
|
|
| BLAKE2b-256 |
44ff3155ccff25978eb7a5058e5373d7abe95bb4915b935dd975dda0f5ec2c23
|
File details
Details for the file core_redis-1.2.0-py3-none-any.whl.
File metadata
- Download URL: core_redis-1.2.0-py3-none-any.whl
- Upload date:
- Size: 20.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a833ecb2976ca2c54f3a09daf46b28fc09491cd07500222f2470cdef94cd1a0c
|
|
| MD5 |
e919a4376b044857bd0d1904521a319b
|
|
| BLAKE2b-256 |
5097741807065dca118bfc7ff54dced2e24c5fcf4562e31eabde043b52b63c22
|