Skip to main content

Unified Redis Python SDK for the Safari Pro architecture.

Project description

Unified Redis Python SDK

A unified, robust Python wrapper for interacting with multiple Redis architectures seamlessly. This SDK uses the Factory Design Pattern to abstract away the complexity of connecting to different Redis setups. It natively enforces data standardization (JSON enveloping) and integrates enterprise-grade features out of the box.

Core Features

  • Automatic Data Standardization: All payloads sent to Redis are automatically wrapped in a standard JSON envelope containing request_id (a UUID), event, type, service, payload, and timestamp. The SDK transparently unwraps this when reading data so your application only ever sees the raw data it passed.
  • Key Namespacing: All keys are prefixed with {service_name}:{action}:{key} to prevent collisions across microservices.
  • Intelligent Routing: Use UseCase.AUTO to automatically route cache commands to a Native Cluster, Pub/Sub to Sentinel, and Streams to a Hybrid proxy.
  • Enterprise-Grade Configurations: Built-in support for Connection Pooling, SSL/TLS, Exponential Backoff Retries, and Socket Keepalives.

Supported Architectures & Use Cases

Architecture UseCase Enum Underlying Driver Best For
Intelligent Router AUTO IntelligentRouterAdapter Hands-off optimal routing based on command type.
Native Redis Cluster HIGH_CONCURRENCY redis.cluster.RedisCluster Horizontal scaling, massive reads/writes.
Sentinel + HAProxy HEAVY_TRANSACTIONS redis.Redis (HAProxy) Pub/Sub, robust transactions.
Hybrid (Predixy Proxy) MICROSERVICES redis.Redis (Standard) Easy multi-tenant connections, stateless clients.

Usage Guide

1. Basic Import

from smartinno.config import RedisConfig, UseCase
from smartinno.factory import RedisClientFactory

2. Configuration & Enterprise Settings

Initialize your configuration and pass service_name to define your application's namespace. You can also pass enterprise features like retries and connection limits.

cluster_config = RedisConfig(
    host="redis-node-1", 
    port=6379, 
    password="your_password",
    startup_nodes=[{"host": "redis-node-1", "port": 6379}],
    
    # Core SDK configs
    service_name="my_billing_app",
    
    # Enterprise configs
    ssl=False,
    max_connections=100,
    socket_timeout=5.0,
    retry_on_timeout=True,
    retry_backoff=True, # Enables Exponential Backoff
    retry_backoff_retries=3
)

3. Native Cluster (HIGH_CONCURRENCY)

client = RedisClientFactory.get_client(cluster_config, UseCase.HIGH_CONCURRENCY)

# Automatically formats key to 'my_billing_app:cache:foo'
# Automatically wraps data in a JSON envelope.
client.set("foo", {"some": "data"})

# Transparently unwraps the envelope and returns {"some": "data"}
print(client.get("foo"))

4. Hybrid Proxy (MICROSERVICES)

Connects to the Native Cluster through a Predixy proxy. The client patches pipeline transactions automatically because Predixy handles clustering efficiently.

hybrid_config = RedisConfig(
    host="localhost", 
    port=6381, 
    password="your_password",
    service_name="my_billing_app"
)

client = RedisClientFactory.get_client(hybrid_config, UseCase.MICROSERVICES)

# Pipelines are patched automatically!
pipe = client.pipeline(transaction=True)
pipe.set("a", 1)
pipe.set("b", 2)
pipe.execute()

Note on Predixy Pipelines: Predixy Proxy does not support cross-node MULTI/EXEC efficiently via python pipelines. If you pass transaction=True, the Hybrid adapter catches it, prints a notice, and gracefully overrides it to transaction=False.

5. Intelligent Router (AUTO)

The absolute easiest way to consume Redis. Provide the configurations for all environments to the factory, request UseCase.AUTO, and the router will send your queries to the optimal architecture.

router_client = RedisClientFactory.get_client(
    config=global_config, 
    use_case=UseCase.AUTO
)

# Routes to Native Cluster
router_client.set("cache_key", "fast_data")

# Routes to Sentinel
router_client.publish("global_events", "system_ready")

# Routes to Hybrid Proxy
router_client.xadd("audit_trail", {"action": "login"})

Running the Tests

To verify that the SDK routes correctly to all three architectures:

  1. Inside Docker (Recommended): Use run_tests_in_docker.ps1 to run the tests securely inside the same docker network as the Redis instances.
  2. Local Windows: Run python test_sdk.py. (Note: Native Cluster tests may fail gracefully if it requires docker DNS resolution).

Redis SDK Developer Guide

This developer guide provides step-by-step instructions on how to use the Safari Pro Redis SDK, including how to connect to Redis, read data (both pending and new) from streams, and utilize the built-in Communication Service for sending SMS, emails, and in-app notifications.

1. Connecting to Redis

The SDK abstracts the underlying Redis architecture (Cluster, Sentinel, or Hybrid/Predixy) using the RedisClientFactory and the UseCase enum.

from smartinno.config import RedisConfig, UseCase
from smartinno.factory import RedisClientFactory

# Initialize the configuration with your service namespace
config = RedisConfig(
    host="redis-node-1",
    port=6379,
    password="your_password",
    startup_nodes=[{"host": "redis-node-1", "port": 6379}],
    service_name="my_service"
)

# Request a client tailored for high concurrency (e.g., Native Cluster)
client = RedisClientFactory.get_client(config, UseCase.HIGH_CONCURRENCY)

2. Reading Data from Redis Streams

Redis Streams is the backbone of the SDK's messaging architecture. You can use the unified xread method to consume data from streams. The SDK handles JSON envelop unwrapping automatically.

Reading Pending Messages

To read all existing/pending messages in a stream from the beginning, pass 0 (or "0-0") as the stream ID.

# Read pending messages from the beginning (ID 0)
messages = client.xread({"communication:events": 0}, count=10)

if messages:
    for stream_name, events in messages:
        print(f"Found {len(events)} pending messages in {stream_name}")
        for msg_id, msg_data in events:
            print(f"Message ID: {msg_id}, Data: {msg_data}")

Reading New Messages

To listen for new messages, pass $ as the stream ID and use the block parameter (in milliseconds). The client will block and wait until a new message arrives.

# Block and wait for up to 5000ms for new messages
new_messages = client.xread({"communication:events": "$"}, count=1, block=5000)

if new_messages:
    for stream_name, events in new_messages:
        for msg_id, msg_data in events:
            print(f"New event received: {msg_id}")

[!TIP] The Intelligent Router (UseCase.AUTO) will automatically route stream commands like XADD and XREAD to the Hybrid architecture!


3. Using the Communication Service

The CommunicationService is a robust module integrated into the SDK that queues messages onto Redis Streams for external communication workers to process.

from smartinno.communication_service import communication_service

Sending SMS

You can send SMS messages to one or multiple recipients.

success = communication_service.send_sms(
    recipients=["+254700123456", "+254711654321"],
    message="Your Safari Pro booking is confirmed!",
    template="booking_confirmation",
    metadata={"booking_id": "BK-001"}
)

Sending Emails

The email functionality supports plain text, HTML, and templates.

email_result = communication_service.send_email(
    recipients=["guest@example.com"],
    subject="Booking Confirmation",
    body="Your safari booking BK-001 has been confirmed.",
    html_content="<h1>Booking Confirmed</h1>",
    template_name="booking_email",
    context_data={"guest_name": "Alice"},
    metadata={"booking_id": "BK-001"}
)

print(email_result["message_id"]) # Message ID tracking

Sending In-App Notifications

Queue an in-app notification that will be pushed to the user's notification center in real-time.

inapp_result = communication_service.send_inapp_notification(
    user_id="user_123",
    title="Booking Confirmed",
    message="Your booking #BK-001 has been confirmed",
    notification_type="booking_confirmed",
    action_url="/bookings/BK-001",
    priority="high"
)

Multi-Channel Notifications (All-in-One)

Send to multiple channels simultaneously using send_notification().

results = communication_service.send_notification(
    notification_type="payment_received",
    recipients={
        "email": ["finance@safari.co"],
        "sms": ["+254700999888"],
        "user_id": "user_456",
    },
    content={
        "email_subject": "Payment Received",
        "email_body": "Payment of $500 received for booking BK-002.",
        "sms_message": "Payment of $500 received. Ref: BK-002",
        "inapp_title": "Payment Received",
        "inapp_message": "We received your $500 payment for booking BK-002",
    },
    metadata={"booking_id": "BK-002", "amount": "500.00", "priority": "high"}
)

print(results) # {"email": {...}, "sms": True, "inapp": True}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

smartinno-1.0.1.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

smartinno-1.0.1-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file smartinno-1.0.1.tar.gz.

File metadata

  • Download URL: smartinno-1.0.1.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.1.tar.gz
Algorithm Hash digest
SHA256 affc19eb30540402ee673a4c591f09d5a502f74ba2f1dace8ae3e0680c38397f
MD5 fc647cb11f1bbd878f5cee82d575f739
BLAKE2b-256 3147db58b65f84d5104d60137a30ac41e666d2cc502bdb60a39cb2a28a0c3523

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.1.tar.gz:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file smartinno-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: smartinno-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2793518e2f133528c10647c0b079676453005f635e0138dfd0348a8bf5383d81
MD5 7ac1469bbdc9b4cd62fc867fbd80a24b
BLAKE2b-256 e6f008a34bf2530acda2ff3a2951ef5f053df9b615c5fbe6290f53984332f97e

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.1-py3-none-any.whl:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page