Skip to main content

Unified Redis Python SDK for the Safari Pro architecture.

Project description

Unified Redis Python SDK

A unified, robust Python wrapper for interacting with multiple Redis architectures seamlessly. This SDK uses the Factory Design Pattern to abstract away the complexity of connecting to different Redis setups. It natively enforces data standardization (JSON enveloping) and integrates enterprise-grade features out of the box.

Core Features

  • Automatic Data Standardization: All payloads sent to Redis are automatically wrapped in a standard JSON envelope containing request_id (a UUID), event, type, service, payload, and timestamp. The SDK transparently unwraps this when reading data so your application only ever sees the raw data it passed.
  • Key Namespacing: All keys are prefixed with {service_name}:{action}:{key} to prevent collisions across microservices.
  • Intelligent Routing: Use UseCase.AUTO to automatically route cache commands to a Native Cluster, Pub/Sub to Sentinel, and Streams to a Hybrid proxy.
  • Enterprise-Grade Configurations: Built-in support for Connection Pooling, SSL/TLS, Exponential Backoff Retries, and Socket Keepalives.

Supported Architectures & Use Cases

Architecture UseCase Enum Underlying Driver Best For
Intelligent Router AUTO IntelligentRouterAdapter Hands-off optimal routing based on command type.
Native Redis Cluster HIGH_CONCURRENCY redis.cluster.RedisCluster Horizontal scaling, massive reads/writes.
Sentinel + HAProxy HEAVY_TRANSACTIONS redis.Redis (HAProxy) Pub/Sub, robust transactions.
Hybrid (Predixy Proxy) MICROSERVICES redis.Redis (Standard) Easy multi-tenant connections, stateless clients.

Usage Guide

1. Basic Import

from smartinno.config import RedisConfig, UseCase
from smartinno.factory import RedisClientFactory

2. Configuration & Enterprise Settings

Initialize your configuration and pass service_name to define your application's namespace. You can also pass enterprise features like retries and connection limits.

cluster_config = RedisConfig(
    host="redis-node-1", 
    port=6379, 
    password="your_password",
    startup_nodes=[{"host": "redis-node-1", "port": 6379}],
    
    # Core SDK configs
    service_name="my_billing_app",
    
    # Enterprise configs
    ssl=False,
    max_connections=100,
    socket_timeout=5.0,
    retry_on_timeout=True,
    retry_backoff=True, # Enables Exponential Backoff
    retry_backoff_retries=3
)

3. Native Cluster (HIGH_CONCURRENCY)

client = RedisClientFactory.get_client(cluster_config, UseCase.HIGH_CONCURRENCY)

# Automatically formats key to 'my_billing_app:cache:foo'
# Automatically wraps data in a JSON envelope.
client.set("foo", {"some": "data"})

# Transparently unwraps the envelope and returns {"some": "data"}
print(client.get("foo"))

4. Hybrid Proxy (MICROSERVICES)

Connects to the Native Cluster through a Predixy proxy. The client patches pipeline transactions automatically because Predixy handles clustering efficiently.

hybrid_config = RedisConfig(
    host="localhost", 
    port=6381, 
    password="your_password",
    service_name="my_billing_app"
)

client = RedisClientFactory.get_client(hybrid_config, UseCase.MICROSERVICES)

# Pipelines are patched automatically!
pipe = client.pipeline(transaction=True)
pipe.set("a", 1)
pipe.set("b", 2)
pipe.execute()

Note on Predixy Pipelines: Predixy Proxy does not support cross-node MULTI/EXEC efficiently via python pipelines. If you pass transaction=True, the Hybrid adapter catches it, prints a notice, and gracefully overrides it to transaction=False.

5. Intelligent Router (AUTO)

The absolute easiest way to consume Redis. Provide the configurations for all environments to the factory, request UseCase.AUTO, and the router will send your queries to the optimal architecture.

router_client = RedisClientFactory.get_client(
    config=global_config, 
    use_case=UseCase.AUTO
)

# Routes to Native Cluster
router_client.set("cache_key", "fast_data")

# Routes to Sentinel
router_client.publish("global_events", "system_ready")

# Routes to Hybrid Proxy
router_client.xadd("audit_trail", {"action": "login"})

Running the Tests

To verify that the SDK routes correctly to all three architectures:

  1. Inside Docker (Recommended): Use run_tests_in_docker.ps1 to run the tests securely inside the same docker network as the Redis instances.
  2. Local Windows: Run python test_sdk.py. (Note: Native Cluster tests may fail gracefully if it requires docker DNS resolution).

Redis SDK Developer Guide

This developer guide provides step-by-step instructions on how to use the Safari Pro Redis SDK, including how to connect to Redis, read data (both pending and new) from streams, and utilize the built-in Communication Service for sending SMS, emails, and in-app notifications.

1. Connecting to Redis

The SDK abstracts the underlying Redis architecture (Cluster, Sentinel, or Hybrid/Predixy) using the RedisClientFactory and the UseCase enum.

from smartinno.config import RedisConfig, UseCase
from smartinno.factory import RedisClientFactory

# Initialize the configuration with your service namespace
config = RedisConfig(
    host="redis-node-1",
    port=6379,
    password="your_password",
    startup_nodes=[{"host": "redis-node-1", "port": 6379}],
    service_name="my_service"
)

# Request a client tailored for high concurrency (e.g., Native Cluster)
client = RedisClientFactory.get_client(config, UseCase.HIGH_CONCURRENCY)

2. Reading Data from Redis Streams

Redis Streams is the backbone of the SDK's messaging architecture. You can use the unified xread method to consume data from streams. The SDK handles JSON envelop unwrapping automatically.

Reading Pending Messages

To read all existing/pending messages in a stream from the beginning, pass 0 (or "0-0") as the stream ID.

# Read pending messages from the beginning (ID 0)
messages = client.xread({"communication:events": 0}, count=10)

if messages:
    for stream_name, events in messages:
        print(f"Found {len(events)} pending messages in {stream_name}")
        for msg_id, msg_data in events:
            print(f"Message ID: {msg_id}, Data: {msg_data}")

Reading New Messages

To listen for new messages, pass $ as the stream ID and use the block parameter (in milliseconds). The client will block and wait until a new message arrives.

# Block and wait for up to 5000ms for new messages
new_messages = client.xread({"communication:events": "$"}, count=1, block=5000)

if new_messages:
    for stream_name, events in new_messages:
        for msg_id, msg_data in events:
            print(f"New event received: {msg_id}")

[!TIP] The Intelligent Router (UseCase.AUTO) will automatically route stream commands like XADD and XREAD to the Hybrid architecture!


3. Using the Communication Service

The CommunicationService is a robust module integrated into the SDK that queues messages onto Redis Streams for external communication workers to process.

from smartinno.communication_service import communication_service

Sending SMS

You can send SMS messages to one or multiple recipients.

success = communication_service.send_sms(
    recipients=["+254700123456", "+254711654321"],
    message="Your Safari Pro booking is confirmed!",
    template="booking_confirmation",
    metadata={"booking_id": "BK-001"}
)

Sending Emails

The email functionality supports plain text, HTML, and templates.

email_result = communication_service.send_email(
    recipients=["guest@example.com"],
    subject="Booking Confirmation",
    body="Your safari booking BK-001 has been confirmed.",
    html_content="<h1>Booking Confirmed</h1>",
    template_name="booking_email",
    context_data={"guest_name": "Alice"},
    metadata={"booking_id": "BK-001"}
)

print(email_result["message_id"]) # Message ID tracking

Sending In-App Notifications

Queue an in-app notification that will be pushed to the user's notification center in real-time.

inapp_result = communication_service.send_inapp_notification(
    user_id="user_123",
    title="Booking Confirmed",
    message="Your booking #BK-001 has been confirmed",
    notification_type="booking_confirmed",
    action_url="/bookings/BK-001",
    priority="high"
)

Multi-Channel Notifications (All-in-One)

Send to multiple channels simultaneously using send_notification().

results = communication_service.send_notification(
    notification_type="payment_received",
    recipients={
        "email": ["finance@safari.co"],
        "sms": ["+254700999888"],
        "user_id": "user_456",
    },
    content={
        "email_subject": "Payment Received",
        "email_body": "Payment of $500 received for booking BK-002.",
        "sms_message": "Payment of $500 received. Ref: BK-002",
        "inapp_title": "Payment Received",
        "inapp_message": "We received your $500 payment for booking BK-002",
    },
    metadata={"booking_id": "BK-002", "amount": "500.00", "priority": "high"}
)

print(results) # {"email": {...}, "sms": True, "inapp": True}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

smartinno-1.0.2.tar.gz (25.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

smartinno-1.0.2-py3-none-any.whl (23.6 kB view details)

Uploaded Python 3

File details

Details for the file smartinno-1.0.2.tar.gz.

File metadata

  • Download URL: smartinno-1.0.2.tar.gz
  • Upload date:
  • Size: 25.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.2.tar.gz
Algorithm Hash digest
SHA256 864c4b708ee8c32339a142e84adbad9db30a216db7247ea4a17e083832905fd4
MD5 52989bb8b8a5fe74f7e325f72ee6af6d
BLAKE2b-256 8770a3f8ebba31177c9a2d51e61a4af8a84887f9d4b4cc626a5df6770a52fb32

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.2.tar.gz:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file smartinno-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: smartinno-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 23.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3ffe4e801b0bcd673251c67da15ada6ba40397318e0c54360cdd434b85d4d17c
MD5 1daa95e0e8575ecde4529cb2d2a980a4
BLAKE2b-256 73170a327bb7b31997848abf2abfdb8cc70de2d484bde9e59f434f61bbc7559a

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.2-py3-none-any.whl:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page