Unified Redis Python SDK for the Safari Pro architecture.
Project description
Unified Redis Python SDK
A unified, robust Python wrapper for interacting with multiple Redis architectures seamlessly. This SDK uses the Factory Design Pattern to abstract away the complexity of connecting to different Redis setups. It natively enforces data standardization (JSON enveloping) and integrates enterprise-grade features out of the box.
Core Features
- Automatic Data Standardization: All payloads sent to Redis are automatically wrapped in a standard JSON envelope containing
request_id(a UUID),event,type,service,payload, andtimestamp. The SDK transparently unwraps this when reading data so your application only ever sees the raw data it passed. - Key Namespacing: All keys are prefixed with
{service_name}:{action}:{key}to prevent collisions across microservices. - Intelligent Routing: Use
UseCase.AUTOto automatically route cache commands to a Native Cluster, Pub/Sub to Sentinel, and Streams to a Hybrid proxy. - Enterprise-Grade Configurations: Built-in support for Connection Pooling, SSL/TLS, Exponential Backoff Retries, and Socket Keepalives.
Supported Architectures & Use Cases
| Architecture | UseCase Enum |
Underlying Driver | Best For |
|---|---|---|---|
| Intelligent Router | AUTO |
IntelligentRouterAdapter |
Hands-off optimal routing based on command type. |
| Native Redis Cluster | HIGH_CONCURRENCY |
redis.cluster.RedisCluster |
Horizontal scaling, massive reads/writes. |
| Sentinel + HAProxy | HEAVY_TRANSACTIONS |
redis.Redis (HAProxy) |
Pub/Sub, robust transactions. |
| Hybrid (Predixy Proxy) | MICROSERVICES |
redis.Redis (Standard) |
Easy multi-tenant connections, stateless clients. |
Usage Guide
1. Basic Import
from redis_sdk.config import RedisConfig, UseCase
from redis_sdk.factory import RedisClientFactory
2. Configuration & Enterprise Settings
Initialize your configuration and pass service_name to define your application's namespace. You can also pass enterprise features like retries and connection limits.
cluster_config = RedisConfig(
host="redis-node-1",
port=6379,
password="your_password",
startup_nodes=[{"host": "redis-node-1", "port": 6379}],
# Core SDK configs
service_name="my_billing_app",
# Enterprise configs
ssl=False,
max_connections=100,
socket_timeout=5.0,
retry_on_timeout=True,
retry_backoff=True, # Enables Exponential Backoff
retry_backoff_retries=3
)
3. Native Cluster (HIGH_CONCURRENCY)
client = RedisClientFactory.get_client(cluster_config, UseCase.HIGH_CONCURRENCY)
# Automatically formats key to 'my_billing_app:cache:foo'
# Automatically wraps data in a JSON envelope.
client.set("foo", {"some": "data"})
# Transparently unwraps the envelope and returns {"some": "data"}
print(client.get("foo"))
4. Hybrid Proxy (MICROSERVICES)
Connects to the Native Cluster through a Predixy proxy. The client patches pipeline transactions automatically because Predixy handles clustering efficiently.
hybrid_config = RedisConfig(
host="localhost",
port=6381,
password="your_password",
service_name="my_billing_app"
)
client = RedisClientFactory.get_client(hybrid_config, UseCase.MICROSERVICES)
# Pipelines are patched automatically!
pipe = client.pipeline(transaction=True)
pipe.set("a", 1)
pipe.set("b", 2)
pipe.execute()
Note on Predixy Pipelines: Predixy Proxy does not support cross-node
MULTI/EXECefficiently via python pipelines. If you passtransaction=True, the Hybrid adapter catches it, prints a notice, and gracefully overrides it totransaction=False.
5. Intelligent Router (AUTO)
The absolute easiest way to consume Redis. Provide the configurations for all environments to the factory, request UseCase.AUTO, and the router will send your queries to the optimal architecture.
router_client = RedisClientFactory.get_client(
config=global_config,
use_case=UseCase.AUTO
)
# Routes to Native Cluster
router_client.set("cache_key", "fast_data")
# Routes to Sentinel
router_client.publish("global_events", "system_ready")
# Routes to Hybrid Proxy
router_client.xadd("audit_trail", {"action": "login"})
Running the Tests
To verify that the SDK routes correctly to all three architectures:
- Inside Docker (Recommended): Use
run_tests_in_docker.ps1to run the tests securely inside the same docker network as the Redis instances. - Local Windows: Run
python test_sdk.py. (Note: Native Cluster tests may fail gracefully if it requires docker DNS resolution).
Redis SDK Developer Guide
This developer guide provides step-by-step instructions on how to use the Safari Pro Redis SDK, including how to connect to Redis, read data (both pending and new) from streams, and utilize the built-in Communication Service for sending SMS, emails, and in-app notifications.
1. Connecting to Redis
The SDK abstracts the underlying Redis architecture (Cluster, Sentinel, or Hybrid/Predixy) using the RedisClientFactory and the UseCase enum.
from redis_sdk.config import RedisConfig, UseCase
from redis_sdk.factory import RedisClientFactory
# Initialize the configuration with your service namespace
config = RedisConfig(
host="redis-node-1",
port=6379,
password="your_password",
startup_nodes=[{"host": "redis-node-1", "port": 6379}],
service_name="my_service"
)
# Request a client tailored for high concurrency (e.g., Native Cluster)
client = RedisClientFactory.get_client(config, UseCase.HIGH_CONCURRENCY)
2. Reading Data from Redis Streams
Redis Streams is the backbone of the SDK's messaging architecture. You can use the unified xread method to consume data from streams. The SDK handles JSON envelop unwrapping automatically.
Reading Pending Messages
To read all existing/pending messages in a stream from the beginning, pass 0 (or "0-0") as the stream ID.
# Read pending messages from the beginning (ID 0)
messages = client.xread({"communication:events": 0}, count=10)
if messages:
for stream_name, events in messages:
print(f"Found {len(events)} pending messages in {stream_name}")
for msg_id, msg_data in events:
print(f"Message ID: {msg_id}, Data: {msg_data}")
Reading New Messages
To listen for new messages, pass $ as the stream ID and use the block parameter (in milliseconds). The client will block and wait until a new message arrives.
# Block and wait for up to 5000ms for new messages
new_messages = client.xread({"communication:events": "$"}, count=1, block=5000)
if new_messages:
for stream_name, events in new_messages:
for msg_id, msg_data in events:
print(f"New event received: {msg_id}")
[!TIP] The Intelligent Router (
UseCase.AUTO) will automatically route stream commands likeXADDandXREADto the Hybrid architecture!
3. Using the Communication Service
The CommunicationService is a robust module integrated into the SDK that queues messages onto Redis Streams for external communication workers to process.
from redis_sdk.communication_service import communication_service
Sending SMS
You can send SMS messages to one or multiple recipients.
success = communication_service.send_sms(
recipients=["+254700123456", "+254711654321"],
message="Your Safari Pro booking is confirmed!",
template="booking_confirmation",
metadata={"booking_id": "BK-001"}
)
Sending Emails
The email functionality supports plain text, HTML, and templates.
email_result = communication_service.send_email(
recipients=["guest@example.com"],
subject="Booking Confirmation",
body="Your safari booking BK-001 has been confirmed.",
html_content="<h1>Booking Confirmed</h1>",
template_name="booking_email",
context_data={"guest_name": "Alice"},
metadata={"booking_id": "BK-001"}
)
print(email_result["message_id"]) # Message ID tracking
Sending In-App Notifications
Queue an in-app notification that will be pushed to the user's notification center in real-time.
inapp_result = communication_service.send_inapp_notification(
user_id="user_123",
title="Booking Confirmed",
message="Your booking #BK-001 has been confirmed",
notification_type="booking_confirmed",
action_url="/bookings/BK-001",
priority="high"
)
Multi-Channel Notifications (All-in-One)
Send to multiple channels simultaneously using send_notification().
results = communication_service.send_notification(
notification_type="payment_received",
recipients={
"email": ["finance@safari.co"],
"sms": ["+254700999888"],
"user_id": "user_456",
},
content={
"email_subject": "Payment Received",
"email_body": "Payment of $500 received for booking BK-002.",
"sms_message": "Payment of $500 received. Ref: BK-002",
"inapp_title": "Payment Received",
"inapp_message": "We received your $500 payment for booking BK-002",
},
metadata={"booking_id": "BK-002", "amount": "500.00", "priority": "high"}
)
print(results) # {"email": {...}, "sms": True, "inapp": True}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file smartinno-1.0.0.tar.gz.
File metadata
- Download URL: smartinno-1.0.0.tar.gz
- Upload date:
- Size: 26.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a60f2c5005e28aadaf1dce551296c0477dfa268a673aaf7c25266223e771a241
|
|
| MD5 |
b8fe8a13b4a4809761b2b41222c7fd63
|
|
| BLAKE2b-256 |
49a5553e52a90df21ce35842d160a7ba20f9aebd4b6398423ad60e582d0a2331
|
Provenance
The following attestation bundles were made for smartinno-1.0.0.tar.gz:
Publisher:
publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
smartinno-1.0.0.tar.gz -
Subject digest:
a60f2c5005e28aadaf1dce551296c0477dfa268a673aaf7c25266223e771a241 - Sigstore transparency entry: 1911435700
- Sigstore integration time:
-
Permalink:
smart-Ino-Engineering/redis-pulsar-temporal-setups@a6adb71b78d8f229d9b5e14ac98680d8cdf1d562 -
Branch / Tag:
refs/tags/v0.0.0.1 - Owner: https://github.com/smart-Ino-Engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish_redis_sdk.yml@a6adb71b78d8f229d9b5e14ac98680d8cdf1d562 -
Trigger Event:
push
-
Statement type:
File details
Details for the file smartinno-1.0.0-py3-none-any.whl.
File metadata
- Download URL: smartinno-1.0.0-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a1a1598e39664c681939ef5b4dd3aecdab85b8816e8211aa52eddaf747af05c5
|
|
| MD5 |
47847d915923e333c5244abe6cdf2d21
|
|
| BLAKE2b-256 |
527697bb20d5dc28580fec1e63deeb7bfee9479cc308be1b8bd39138b55bd8da
|
Provenance
The following attestation bundles were made for smartinno-1.0.0-py3-none-any.whl:
Publisher:
publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
smartinno-1.0.0-py3-none-any.whl -
Subject digest:
a1a1598e39664c681939ef5b4dd3aecdab85b8816e8211aa52eddaf747af05c5 - Sigstore transparency entry: 1911435832
- Sigstore integration time:
-
Permalink:
smart-Ino-Engineering/redis-pulsar-temporal-setups@a6adb71b78d8f229d9b5e14ac98680d8cdf1d562 -
Branch / Tag:
refs/tags/v0.0.0.1 - Owner: https://github.com/smart-Ino-Engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish_redis_sdk.yml@a6adb71b78d8f229d9b5e14ac98680d8cdf1d562 -
Trigger Event:
push
-
Statement type: