Skip to main content

Unified Redis Python SDK for the Safari Pro architecture.

Project description

Unified Redis Python SDK

A unified, robust Python wrapper for interacting with multiple Redis architectures seamlessly. This SDK uses the Factory Design Pattern to abstract away the complexity of connecting to different Redis setups. It natively enforces data standardization (JSON enveloping) and integrates enterprise-grade features out of the box.

Core Features

  • Automatic Data Standardization: All payloads sent to Redis are automatically wrapped in a standard JSON envelope containing request_id (a UUID), event, type, service, payload, and timestamp. The SDK transparently unwraps this when reading data so your application only ever sees the raw data it passed.
  • Key Namespacing: All keys are prefixed with {service_name}:{action}:{key} to prevent collisions across microservices.
  • Intelligent Routing: Use UseCase.AUTO to automatically route cache commands to a Native Cluster, Pub/Sub to Sentinel, and Streams to a Hybrid proxy.
  • Enterprise-Grade Configurations: Built-in support for Connection Pooling, SSL/TLS, Exponential Backoff Retries, and Socket Keepalives.

Supported Architectures & Use Cases

Architecture UseCase Enum Underlying Driver Best For
Intelligent Router AUTO IntelligentRouterAdapter Hands-off optimal routing based on command type.
Native Redis Cluster HIGH_CONCURRENCY redis.cluster.RedisCluster Horizontal scaling, massive reads/writes.
Sentinel + HAProxy HEAVY_TRANSACTIONS redis.Redis (HAProxy) Pub/Sub, robust transactions.
Hybrid (Predixy Proxy) MICROSERVICES redis.Redis (Standard) Easy multi-tenant connections, stateless clients.

Usage Guide

1. Basic Import

from redis_sdk.config import RedisConfig, UseCase
from redis_sdk.factory import RedisClientFactory

2. Configuration & Enterprise Settings

Initialize your configuration and pass service_name to define your application's namespace. You can also pass enterprise features like retries and connection limits.

cluster_config = RedisConfig(
    host="redis-node-1", 
    port=6379, 
    password="your_password",
    startup_nodes=[{"host": "redis-node-1", "port": 6379}],
    
    # Core SDK configs
    service_name="my_billing_app",
    
    # Enterprise configs
    ssl=False,
    max_connections=100,
    socket_timeout=5.0,
    retry_on_timeout=True,
    retry_backoff=True, # Enables Exponential Backoff
    retry_backoff_retries=3
)

3. Native Cluster (HIGH_CONCURRENCY)

client = RedisClientFactory.get_client(cluster_config, UseCase.HIGH_CONCURRENCY)

# Automatically formats key to 'my_billing_app:cache:foo'
# Automatically wraps data in a JSON envelope.
client.set("foo", {"some": "data"})

# Transparently unwraps the envelope and returns {"some": "data"}
print(client.get("foo"))

4. Hybrid Proxy (MICROSERVICES)

Connects to the Native Cluster through a Predixy proxy. The client patches pipeline transactions automatically because Predixy handles clustering efficiently.

hybrid_config = RedisConfig(
    host="localhost", 
    port=6381, 
    password="your_password",
    service_name="my_billing_app"
)

client = RedisClientFactory.get_client(hybrid_config, UseCase.MICROSERVICES)

# Pipelines are patched automatically!
pipe = client.pipeline(transaction=True)
pipe.set("a", 1)
pipe.set("b", 2)
pipe.execute()

Note on Predixy Pipelines: Predixy Proxy does not support cross-node MULTI/EXEC efficiently via python pipelines. If you pass transaction=True, the Hybrid adapter catches it, prints a notice, and gracefully overrides it to transaction=False.

5. Intelligent Router (AUTO)

The absolute easiest way to consume Redis. Provide the configurations for all environments to the factory, request UseCase.AUTO, and the router will send your queries to the optimal architecture.

router_client = RedisClientFactory.get_client(
    config=global_config, 
    use_case=UseCase.AUTO
)

# Routes to Native Cluster
router_client.set("cache_key", "fast_data")

# Routes to Sentinel
router_client.publish("global_events", "system_ready")

# Routes to Hybrid Proxy
router_client.xadd("audit_trail", {"action": "login"})

Running the Tests

To verify that the SDK routes correctly to all three architectures:

  1. Inside Docker (Recommended): Use run_tests_in_docker.ps1 to run the tests securely inside the same docker network as the Redis instances.
  2. Local Windows: Run python test_sdk.py. (Note: Native Cluster tests may fail gracefully if it requires docker DNS resolution).

Redis SDK Developer Guide

This developer guide provides step-by-step instructions on how to use the Safari Pro Redis SDK, including how to connect to Redis, read data (both pending and new) from streams, and utilize the built-in Communication Service for sending SMS, emails, and in-app notifications.

1. Connecting to Redis

The SDK abstracts the underlying Redis architecture (Cluster, Sentinel, or Hybrid/Predixy) using the RedisClientFactory and the UseCase enum.

from redis_sdk.config import RedisConfig, UseCase
from redis_sdk.factory import RedisClientFactory

# Initialize the configuration with your service namespace
config = RedisConfig(
    host="redis-node-1",
    port=6379,
    password="your_password",
    startup_nodes=[{"host": "redis-node-1", "port": 6379}],
    service_name="my_service"
)

# Request a client tailored for high concurrency (e.g., Native Cluster)
client = RedisClientFactory.get_client(config, UseCase.HIGH_CONCURRENCY)

2. Reading Data from Redis Streams

Redis Streams is the backbone of the SDK's messaging architecture. You can use the unified xread method to consume data from streams. The SDK handles JSON envelop unwrapping automatically.

Reading Pending Messages

To read all existing/pending messages in a stream from the beginning, pass 0 (or "0-0") as the stream ID.

# Read pending messages from the beginning (ID 0)
messages = client.xread({"communication:events": 0}, count=10)

if messages:
    for stream_name, events in messages:
        print(f"Found {len(events)} pending messages in {stream_name}")
        for msg_id, msg_data in events:
            print(f"Message ID: {msg_id}, Data: {msg_data}")

Reading New Messages

To listen for new messages, pass $ as the stream ID and use the block parameter (in milliseconds). The client will block and wait until a new message arrives.

# Block and wait for up to 5000ms for new messages
new_messages = client.xread({"communication:events": "$"}, count=1, block=5000)

if new_messages:
    for stream_name, events in new_messages:
        for msg_id, msg_data in events:
            print(f"New event received: {msg_id}")

[!TIP] The Intelligent Router (UseCase.AUTO) will automatically route stream commands like XADD and XREAD to the Hybrid architecture!


3. Using the Communication Service

The CommunicationService is a robust module integrated into the SDK that queues messages onto Redis Streams for external communication workers to process.

from redis_sdk.communication_service import communication_service

Sending SMS

You can send SMS messages to one or multiple recipients.

success = communication_service.send_sms(
    recipients=["+254700123456", "+254711654321"],
    message="Your Safari Pro booking is confirmed!",
    template="booking_confirmation",
    metadata={"booking_id": "BK-001"}
)

Sending Emails

The email functionality supports plain text, HTML, and templates.

email_result = communication_service.send_email(
    recipients=["guest@example.com"],
    subject="Booking Confirmation",
    body="Your safari booking BK-001 has been confirmed.",
    html_content="<h1>Booking Confirmed</h1>",
    template_name="booking_email",
    context_data={"guest_name": "Alice"},
    metadata={"booking_id": "BK-001"}
)

print(email_result["message_id"]) # Message ID tracking

Sending In-App Notifications

Queue an in-app notification that will be pushed to the user's notification center in real-time.

inapp_result = communication_service.send_inapp_notification(
    user_id="user_123",
    title="Booking Confirmed",
    message="Your booking #BK-001 has been confirmed",
    notification_type="booking_confirmed",
    action_url="/bookings/BK-001",
    priority="high"
)

Multi-Channel Notifications (All-in-One)

Send to multiple channels simultaneously using send_notification().

results = communication_service.send_notification(
    notification_type="payment_received",
    recipients={
        "email": ["finance@safari.co"],
        "sms": ["+254700999888"],
        "user_id": "user_456",
    },
    content={
        "email_subject": "Payment Received",
        "email_body": "Payment of $500 received for booking BK-002.",
        "sms_message": "Payment of $500 received. Ref: BK-002",
        "inapp_title": "Payment Received",
        "inapp_message": "We received your $500 payment for booking BK-002",
    },
    metadata={"booking_id": "BK-002", "amount": "500.00", "priority": "high"}
)

print(results) # {"email": {...}, "sms": True, "inapp": True}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

smartinno-1.0.0.tar.gz (26.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

smartinno-1.0.0-py3-none-any.whl (25.3 kB view details)

Uploaded Python 3

File details

Details for the file smartinno-1.0.0.tar.gz.

File metadata

  • Download URL: smartinno-1.0.0.tar.gz
  • Upload date:
  • Size: 26.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.0.tar.gz
Algorithm Hash digest
SHA256 a60f2c5005e28aadaf1dce551296c0477dfa268a673aaf7c25266223e771a241
MD5 b8fe8a13b4a4809761b2b41222c7fd63
BLAKE2b-256 49a5553e52a90df21ce35842d160a7ba20f9aebd4b6398423ad60e582d0a2331

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.0.tar.gz:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file smartinno-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: smartinno-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 25.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a1a1598e39664c681939ef5b4dd3aecdab85b8816e8211aa52eddaf747af05c5
MD5 47847d915923e333c5244abe6cdf2d21
BLAKE2b-256 527697bb20d5dc28580fec1e63deeb7bfee9479cc308be1b8bd39138b55bd8da

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.0-py3-none-any.whl:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page