Skip to main content

Redisify is a lightweight Python library that provides Redis-backed data structures like dicts, queues, locks, and semaphores, designed for distributed systems.

Project description

Redisify

Redisify is a lightweight Python library that provides Redis-backed data structures and distributed synchronization primitives. It is designed for distributed systems where persistent, shared, and async-compatible data structures are needed.

Python 3.10+ License: MIT PyPI version

🚀 Features

📦 Data Structures

  • RedisDict: A dictionary-like interface backed by Redis hash with full CRUD operations
  • RedisList: A list-like structure supporting indexing, insertion, deletion, and iteration
  • RedisQueue: A FIFO queue with blocking and async operations
  • RedisSet: A set-like structure with union, intersection, difference operations

🔐 Distributed Synchronization

  • RedisLock: Distributed locking mechanism with automatic cleanup

  • RedisSemaphore: Semaphore for controlling concurrent access

  • RedisLimiter: Rate limiting with token bucket algorithm

⚡ Advanced Features

  • Async/Await Support: All operations are async-compatible
  • Smart Serialization: Automatic serialization of complex objects using dill
  • Context Manager Support: Use with async with statements
  • Comprehensive Testing: Full test coverage for all components
  • Type Safety: Full type hints and documentation
  • Thread-Safe: All operations are thread and process safe

📦 Installation

pip install redisify

Or for development and testing:

git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]

⚡ Quick Start

import asyncio
from redisify import RedisDict, RedisList, RedisQueue, RedisSet, RedisLock, RedisSemaphore, RedisLimiter, connect_to_redis

async def main():
    # Connect to Redis
    connect_to_redis(host="localhost", port=6379, db=0)
    
    # Dictionary operations
    rdict = RedisDict("example:dict")
    await rdict["user:1"] = {"name": "Alice", "age": 30}
    user = await rdict["user:1"]
    print(user)  # {'name': 'Alice', 'age': 30}
    
    # List operations
    rlist = RedisList("example:list")
    await rlist.append("item1")
    await rlist.append("item2")
    first_item = await rlist[0]
    print(first_item)  # item1
    
    # Queue operations
    rqueue = RedisQueue("example:queue")
    await rqueue.put("task1")
    await rqueue.put("task2")
    task = await rqueue.get()
    print(task)  # task1
    
    # Set operations
    rset = RedisSet("example:set")
    await rset.add("item1")
    await rset.add("item2")
    items = await rset.to_set()
    print(items)  # {'item1', 'item2'}

asyncio.run(main())

📚 Detailed Usage

RedisDict

A distributed dictionary that supports any serializable Python objects as keys and values.

from redisify import RedisDict

rdict = RedisDict("users")

# Basic operations
await rdict["user1"] = {"name": "Alice", "age": 30}
await rdict["user2"] = {"name": "Bob", "age": 25}

# Get values
user1 = await rdict["user1"]
print(user1)  # {'name': 'Alice', 'age': 30}

# Check existence
if "user1" in rdict:
    print("User exists")

# Delete items
del await rdict["user2"]

# Iterate over items
async for key, value in rdict.items():
    print(f"{key}: {value}")

# Get with default
user = await rdict.get("user3", {"name": "Default", "age": 0})

RedisList

A distributed list with full indexing and slicing support.

from redisify import RedisList

rlist = RedisList("tasks")

# Add items
await rlist.append("task1")
await rlist.append("task2")
await rlist.insert(0, "priority_task")

# Access by index
first_task = await rlist[0]
print(first_task)  # priority_task

# Slicing support
tasks = await rlist[1:3]  # Get items at index 1 and 2

# Get length
length = await len(rlist)
print(length)  # 3

# Iterate
async for item in rlist:
    print(item)

# Remove items
await rlist.remove("task1", count=1)

RedisQueue

A distributed FIFO queue with blocking and non-blocking operations.

from redisify import RedisQueue

rqueue = RedisQueue(redis, "job_queue", maxsize=100)

# Producer
await rqueue.put("job1")
await rqueue.put("job2")

# Consumer (blocking)
job = await rqueue.get()  # Blocks until item available
print(job)  # job1

# Non-blocking get
try:
    job = await rqueue.get_nowait()
except asyncio.QueueEmpty:
    print("Queue is empty")

# Peek at next item without removing
next_job = await rqueue.peek()

# Check queue status
size = await rqueue.qsize()
is_empty = await rqueue.empty()

RedisSet

A distributed set with full set operations support.

from redisify import RedisSet

set1 = RedisSet(redis, "set1")
set2 = RedisSet(redis, "set2")

# Add items
await set1.add("item1")
await set1.add("item2")
await set2.add("item2")
await set2.add("item3")

# Set operations
union = await set1.union(set2)
intersection = await set1.intersection(set2)
difference = await set1.difference(set2)

print(union)  # {'item1', 'item2', 'item3'}
print(intersection)  # {'item2'}
print(difference)  # {'item1'}

# Membership testing
if "item1" in set1:
    print("Item exists")

# Convert to Python set
python_set = await set1.to_set()

RedisLock

A distributed lock for critical section protection.

from redisify import RedisLock

lock = RedisLock(redis, "resource_lock")

# Manual lock/unlock
await lock.acquire()
try:
    # Critical section
    print("Resource locked")
finally:
    await lock.release()

# Context manager (recommended)
async with RedisLock(redis, "resource_lock"):
    print("Resource locked automatically")
    # Lock is automatically released

RedisSemaphore

A distributed semaphore for controlling concurrent access.

from redisify import RedisSemaphore

# Limit to 3 concurrent operations
semaphore = RedisSemaphore("api_limit", 3)

async def api_call():
    async with semaphore:
        print("API call executing")
        await asyncio.sleep(1)

# Run multiple concurrent calls
tasks = [api_call() for _ in range(10)]
await asyncio.gather(*tasks)

# Check current semaphore value
current_value = await semaphore.value()
print(f"Currently {current_value} semaphores are acquired")

# Non-blocking check
if await semaphore.can_acquire():
    await semaphore.acquire()

RedisLimiter

A distributed rate limiter using token bucket algorithm.

from redisify import RedisLimiter

# Rate limit: 10 requests per minute
limiter = RedisLimiter("api_rate", 10, 60)

async def make_request():
    if await limiter.acquire():
        print("Request allowed")
        # Make API call
    else:
        print("Rate limit exceeded")

# Context manager with automatic retry
async with RedisLimiter("api_rate", 10, 60):
    print("Request allowed")
    # Make API call

🔧 Serialization

Redisify includes a smart serializer that handles complex objects using dill:

from pydantic import BaseModel
from redisify import RedisDict

class User(BaseModel):
    name: str
    age: int

user = User(name="Alice", age=30)
rdict = RedisDict("users")

# Pydantic models are automatically serialized
await rdict["user1"] = user

# And automatically deserialized
retrieved_user = await rdict["user1"]
print(type(retrieved_user))  # <class '__main__.User'>
print(retrieved_user.name)  # Alice

# Custom objects work too
class CustomObject:
    def __init__(self, data):
        self.data = data
    
    def __repr__(self):
        return f"CustomObject({self.data})"

obj = CustomObject("test")
await rdict["custom"] = obj
retrieved_obj = await rdict["custom"]
print(retrieved_obj)  # CustomObject(test)

📖 API Documentation

For detailed API documentation, see the docstrings in the source code:

⚡ Performance Considerations

Memory Usage

  • All objects are serialized before storage, which increases memory usage
  • Consider using simple data types for large datasets
  • Use clear() method to free memory when structures are no longer needed

Network Latency

  • All operations are async and non-blocking
  • Use connection pooling for better performance
  • Consider using Redis clusters for high-availability setups

Serialization Overhead

  • Complex objects take longer to serialize/deserialize
  • Consider using simple data structures for frequently accessed data
  • The dill serializer handles most Python objects efficiently

🧪 Testing

Make sure you have Redis running (locally or via Docker), then:

# Run all tests
pytest -v tests

# Run with coverage
pytest --cov=redisify tests

# Run specific test file
pytest tests/test_redis_dict.py -v

# Run with async support
pytest --asyncio-mode=auto tests/

Running Redis with Docker

# Start Redis server
docker run -d -p 6379:6379 redis:latest

# Or with Redis Stack (includes RedisInsight)
docker run -d -p 6379:6379 -p 8001:8001 redis/redis-stack:latest

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for new functionality
  5. Run the test suite (pytest tests/)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

Development Setup

git clone https://github.com/Hambaobao/redisify.git
cd redisify
pip install -e .[test]
pre-commit install  # Optional: for code formatting

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

📋 Requirements

  • Python 3.10+
  • Redis server (local or remote)
  • redis Python client (redis-py)
  • dill (for object serialization)

🙏 Acknowledgments

📞 Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redisify-0.2.0.tar.gz (24.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redisify-0.2.0-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file redisify-0.2.0.tar.gz.

File metadata

  • Download URL: redisify-0.2.0.tar.gz
  • Upload date:
  • Size: 24.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for redisify-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d309d60334575aaf5b1f773f3292098b01b6b35cfb9a42aedd212ee4eae3ad82
MD5 630f876684873eec911c097b97ba7dd6
BLAKE2b-256 c1eed405aca52db537c0642ac779b485140ce3fe672e17ccb2034ecc1353510a

See more details on using hashes here.

Provenance

The following attestation bundles were made for redisify-0.2.0.tar.gz:

Publisher: publish-pypi.yml on Hambaobao/redisify

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redisify-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: redisify-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 23.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for redisify-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6040dc28cd2c35ed17e9522f514344f7fdd7e4917a6f1591852a0c9dba82cc4e
MD5 a5fcf92ecdac2f11baf0496e70d80ff0
BLAKE2b-256 03dd0845ea8c23d850974c61a4344328ae1c841999b78e5619683c31bf5db7c5

See more details on using hashes here.

Provenance

The following attestation bundles were made for redisify-0.2.0-py3-none-any.whl:

Publisher: publish-pypi.yml on Hambaobao/redisify

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page