Redisify is a lightweight Python library that provides Redis-backed data structures like dicts, queues, locks, and semaphores, designed for distributed systems.
Project description
Redisify
Redisify is a lightweight Python library that provides Redis-backed data structures and distributed synchronization primitives. It is designed for distributed systems where persistent, shared, and async-compatible data structures are needed.
🚀 Features
📦 Data Structures
- RedisDict: A dictionary-like interface backed by Redis hash with full CRUD operations
- RedisList: A list-like structure supporting indexing, insertion, deletion, and iteration
- RedisQueue: A FIFO queue with blocking and async operations
- RedisSet: A set-like structure with union, intersection, difference operations
🔐 Distributed Synchronization
-
RedisLock: Distributed locking mechanism with automatic cleanup
-
RedisSemaphore: Semaphore for controlling concurrent access
-
RedisLimiter: Rate limiting with token bucket algorithm
⚡ Advanced Features
- Async/Await Support: All operations are async-compatible
- Smart Serialization: Automatic serialization of complex objects using dill
- Context Manager Support: Use with
async withstatements - Comprehensive Testing: Full test coverage for all components
- Type Safety: Full type hints and documentation
- Thread-Safe: All operations are thread and process safe
📦 Installation
pip install redisify
Or for development and testing:
git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]
⚡ Quick Start
import asyncio
from redisify import RedisDict, RedisList, RedisQueue, RedisSet, RedisLock, RedisSemaphore, RedisLimiter, connect_to_redis
async def main():
# Connect to Redis
connect_to_redis(host="localhost", port=6379, db=0)
# Dictionary operations
rdict = RedisDict("example:dict")
await rdict["user:1"] = {"name": "Alice", "age": 30}
user = await rdict["user:1"]
print(user) # {'name': 'Alice', 'age': 30}
# List operations
rlist = RedisList("example:list")
await rlist.append("item1")
await rlist.append("item2")
first_item = await rlist[0]
print(first_item) # item1
# Queue operations
rqueue = RedisQueue("example:queue")
await rqueue.put("task1")
await rqueue.put("task2")
task = await rqueue.get()
print(task) # task1
# Set operations
rset = RedisSet("example:set")
await rset.add("item1")
await rset.add("item2")
items = await rset.to_set()
print(items) # {'item1', 'item2'}
asyncio.run(main())
📚 Detailed Usage
RedisDict
A distributed dictionary that supports any serializable Python objects as keys and values.
from redisify import RedisDict
rdict = RedisDict("users")
# Basic operations
await rdict["user1"] = {"name": "Alice", "age": 30}
await rdict["user2"] = {"name": "Bob", "age": 25}
# Get values
user1 = await rdict["user1"]
print(user1) # {'name': 'Alice', 'age': 30}
# Check existence
if "user1" in rdict:
print("User exists")
# Delete items
del await rdict["user2"]
# Iterate over items
async for key, value in rdict.items():
print(f"{key}: {value}")
# Get with default
user = await rdict.get("user3", {"name": "Default", "age": 0})
RedisList
A distributed list with full indexing and slicing support.
from redisify import RedisList
rlist = RedisList("tasks")
# Add items
await rlist.append("task1")
await rlist.append("task2")
await rlist.insert(0, "priority_task")
# Access by index
first_task = await rlist[0]
print(first_task) # priority_task
# Slicing support
tasks = await rlist[1:3] # Get items at index 1 and 2
# Get length
length = await len(rlist)
print(length) # 3
# Iterate
async for item in rlist:
print(item)
# Remove items
await rlist.remove("task1", count=1)
RedisQueue
A distributed FIFO queue with blocking and non-blocking operations.
from redisify import RedisQueue
rqueue = RedisQueue(redis, "job_queue", maxsize=100)
# Producer
await rqueue.put("job1")
await rqueue.put("job2")
# Consumer (blocking)
job = await rqueue.get() # Blocks until item available
print(job) # job1
# Non-blocking get
try:
job = await rqueue.get_nowait()
except asyncio.QueueEmpty:
print("Queue is empty")
# Peek at next item without removing
next_job = await rqueue.peek()
# Check queue status
size = await rqueue.qsize()
is_empty = await rqueue.empty()
RedisSet
A distributed set with full set operations support.
from redisify import RedisSet
set1 = RedisSet(redis, "set1")
set2 = RedisSet(redis, "set2")
# Add items
await set1.add("item1")
await set1.add("item2")
await set2.add("item2")
await set2.add("item3")
# Set operations
union = await set1.union(set2)
intersection = await set1.intersection(set2)
difference = await set1.difference(set2)
print(union) # {'item1', 'item2', 'item3'}
print(intersection) # {'item2'}
print(difference) # {'item1'}
# Membership testing
if "item1" in set1:
print("Item exists")
# Convert to Python set
python_set = await set1.to_set()
RedisLock
A distributed lock for critical section protection.
from redisify import RedisLock
lock = RedisLock(redis, "resource_lock")
# Manual lock/unlock
await lock.acquire()
try:
# Critical section
print("Resource locked")
finally:
await lock.release()
# Context manager (recommended)
async with RedisLock(redis, "resource_lock"):
print("Resource locked automatically")
# Lock is automatically released
RedisSemaphore
A distributed semaphore for controlling concurrent access.
from redisify import RedisSemaphore
# Limit to 3 concurrent operations
semaphore = RedisSemaphore("api_limit", 3)
async def api_call():
async with semaphore:
print("API call executing")
await asyncio.sleep(1)
# Run multiple concurrent calls
tasks = [api_call() for _ in range(10)]
await asyncio.gather(*tasks)
# Check current semaphore value
current_value = await semaphore.value()
print(f"Currently {current_value} semaphores are acquired")
# Non-blocking check
if await semaphore.can_acquire():
await semaphore.acquire()
RedisLimiter
A distributed rate limiter using token bucket algorithm.
from redisify import RedisLimiter
# Rate limit: 10 requests per minute
limiter = RedisLimiter("api_rate", 10, 60)
async def make_request():
if await limiter.acquire():
print("Request allowed")
# Make API call
else:
print("Rate limit exceeded")
# Context manager with automatic retry
async with RedisLimiter("api_rate", 10, 60):
print("Request allowed")
# Make API call
🔧 Serialization
Redisify includes a smart serializer that handles complex objects using dill:
from pydantic import BaseModel
from redisify import RedisDict
class User(BaseModel):
name: str
age: int
user = User(name="Alice", age=30)
rdict = RedisDict("users")
# Pydantic models are automatically serialized
await rdict["user1"] = user
# And automatically deserialized
retrieved_user = await rdict["user1"]
print(type(retrieved_user)) # <class '__main__.User'>
print(retrieved_user.name) # Alice
# Custom objects work too
class CustomObject:
def __init__(self, data):
self.data = data
def __repr__(self):
return f"CustomObject({self.data})"
obj = CustomObject("test")
await rdict["custom"] = obj
retrieved_obj = await rdict["custom"]
print(retrieved_obj) # CustomObject(test)
📖 API Documentation
For detailed API documentation, see the docstrings in the source code:
-
RedisDict - Distributed dictionary
-
RedisList - Distributed list
-
RedisQueue - Distributed queue
-
RedisSet - Distributed set
-
RedisLock - Distributed lock
-
RedisSemaphore - Distributed semaphore
-
RedisLimiter - Rate limiter
-
Serializer - Object serialization
⚡ Performance Considerations
Memory Usage
- All objects are serialized before storage, which increases memory usage
- Consider using simple data types for large datasets
- Use
clear()method to free memory when structures are no longer needed
Network Latency
- All operations are async and non-blocking
- Use connection pooling for better performance
- Consider using Redis clusters for high-availability setups
Serialization Overhead
- Complex objects take longer to serialize/deserialize
- Consider using simple data structures for frequently accessed data
- The dill serializer handles most Python objects efficiently
🧪 Testing
Make sure you have Redis running (locally or via Docker), then:
# Run all tests
pytest -v tests
# Run with coverage
pytest --cov=redisify tests
# Run specific test file
pytest tests/test_redis_dict.py -v
# Run with async support
pytest --asyncio-mode=auto tests/
Running Redis with Docker
# Start Redis server
docker run -d -p 6379:6379 redis:latest
# Or with Redis Stack (includes RedisInsight)
docker run -d -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for new functionality
- Run the test suite (
pytest tests/) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Development Setup
git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]
pre-commit install # Optional: for code formatting
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
📋 Requirements
- Python 3.10+
- Redis server (local or remote)
- redis Python client (redis-py)
- dill (for object serialization)
🙏 Acknowledgments
- redis-py - Redis Python client
- dill - Object serialization
- pytest-asyncio - Async testing support
📞 Support
- 📧 Email: jameszhang2880@gmail.com
- 🐛 Issues: GitHub Issues
- 📖 Documentation: Source Code
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redisify-0.1.5.tar.gz.
File metadata
- Download URL: redisify-0.1.5.tar.gz
- Upload date:
- Size: 23.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e746c0383aed9932ed9502df5fea56c021926abddfb7d86027c236dcdf7d00a0
|
|
| MD5 |
fa65c711c5f5dd081f93a13e4bbb2529
|
|
| BLAKE2b-256 |
dfb4760d6c1b76e6994cd79dff0e1c0b19d8c8eebe428903cc2c4fd9a9d2bbbc
|
Provenance
The following attestation bundles were made for redisify-0.1.5.tar.gz:
Publisher:
publish-pypi.yml on Hambaobao/redisify
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redisify-0.1.5.tar.gz -
Subject digest:
e746c0383aed9932ed9502df5fea56c021926abddfb7d86027c236dcdf7d00a0 - Sigstore transparency entry: 452315496
- Sigstore integration time:
-
Permalink:
Hambaobao/redisify@b4d378953ffdfb77d63004dc0337e2f13730c036 -
Branch / Tag:
refs/tags/v0.1.5 - Owner: https://github.com/Hambaobao
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@b4d378953ffdfb77d63004dc0337e2f13730c036 -
Trigger Event:
push
-
Statement type:
File details
Details for the file redisify-0.1.5-py3-none-any.whl.
File metadata
- Download URL: redisify-0.1.5-py3-none-any.whl
- Upload date:
- Size: 23.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6d24ebc56c18fbe13940d16d461d1d88c0543535cdafc3052f260ce7b0a34a91
|
|
| MD5 |
eecaad46934ae3a5942ca4d8d3f0e565
|
|
| BLAKE2b-256 |
cda2d36ba76f2aae8f22870b3c091f620936b97b7b22ae9df168a9782e0fcdc3
|
Provenance
The following attestation bundles were made for redisify-0.1.5-py3-none-any.whl:
Publisher:
publish-pypi.yml on Hambaobao/redisify
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redisify-0.1.5-py3-none-any.whl -
Subject digest:
6d24ebc56c18fbe13940d16d461d1d88c0543535cdafc3052f260ce7b0a34a91 - Sigstore transparency entry: 452315503
- Sigstore integration time:
-
Permalink:
Hambaobao/redisify@b4d378953ffdfb77d63004dc0337e2f13730c036 -
Branch / Tag:
refs/tags/v0.1.5 - Owner: https://github.com/Hambaobao
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@b4d378953ffdfb77d63004dc0337e2f13730c036 -
Trigger Event:
push
-
Statement type: