Skip to main content

Unified Redis Python SDK for the Safari Pro architecture.

Project description

🦁 Safari Pro – Enterprise Unified SDK Master API Reference

Welcome to the Safari Pro Unified SDK Master API Reference. This guide is designed to help developers—both new and experienced—understand, configure, and operate the core infrastructure of the Safari Pro Super App.

This SDK unifies Temporal Orchestration, Enterprise Custom Redis Connectivity, Resilient Messaging (Pulsar/Redis Failover), and Observability Monitoring into a single, high-performance toolkit with built-in failover capabilities.


📐 Core SDK Architecture Overview

The Safari Pro SDK suite provides a high-reliability abstraction layer over databases, message buses, and workflow systems. It simplifies building microservices by exposing unified clients with built-in resiliency patterns.

graph TD
    User[Application Code] --> tc[TemporalClient]
    User --> rc[RedisClientFactory]
    User --> mc[MessageClient]
    User --> mon[Observability Monitors]
    
    tc -->|Orchestration| TempServer[Temporal Cluster]
    rc -->|Key-Value / Streams| RedisCluster[Redis Sentinel / Cluster]
    mc -->|Resilient Messaging| PubSub[Pulsar / Redis PubSub]

🛠️ Complete Enums & Configurations Guide

1. Architectural & Routing Enums

smartinno.redis_custom.config.Architecture

Defines the backend deployment topology for Redis connections.

  • SENTINEL ("sentinel"): High-Availability Sentinel setup with active-passive replication and automated master failover.
  • CLUSTER ("cluster"): Cryptographically sharded cluster distributing keys across 16,384 slots over multiple master nodes.
  • HYBRID ("hybrid"): Routes through a Predixy/Envoy proxy gateway to abstract cluster topology away from clients.
  • SMART_ROUTER ("smart_router"): Meta-architecture routing commands dynamically to different Redis topologies.

smartinno.redis_custom.config.UseCase

Guides the factory to auto-select the optimal Redis architecture for a given workload.

  • HIGH_CONCURRENCY: Maps to CLUSTER (Ideal for massive horizontal read/write scale).
  • HEAVY_TRANSACTIONS: Maps to SENTINEL (Ideal for multi-key pipelines and transactions).
  • MICROSERVICES: Maps to HYBRID (Ideal for polyglot systems and proxy gateways).
  • AUTO: Maps to SMART_ROUTER (Dynamically selects based on payload structure).

smartinno.temporal.client.Frequency

Defines scheduling intervals for recurring cron orchestrations.

  • ONCE ("once"): Fires exactly once.
  • REPEATED ("repeated"): Fires every minute.
  • HOURLY ("hourly"): Fires at the start of every hour.
  • DAILY ("daily"): Fires at midnight every day.
  • WEEKLY ("weekly"): Fires at midnight every Sunday.
  • MONTHLY ("monthly"): Fires at midnight on the first day of every month.

smartinno.redis_custom.smart_router.DataSensitivity

Categorizes data payloads based on PII/critical parameters to dictate caching TTL policies.

  • CRITICAL (Matches passport, visa): Bypasses persistent storage; cached for 60 seconds max.
  • HIGH (Matches password, payment, card, cvv, secret, ssn, token): Cached for 300 seconds.
  • MEDIUM (Matches email, phone, user_name): Standard caching rules apply.
  • LOW: Default category for standard operational logs or cache elements.

🔑 Redis Key Generation & Data Serialization Envelopes

To prevent key collision and ensure clean data parsing across heterogeneous systems (e.g. Go, Python, and Node.js microservices), the SDK enforces standardized key structures and JSON packaging envelopes.

1. Standardized Key Formatting Pattern

All keys written to Redis are transparently ran through the BaseRedisAdapter._format_key() handler. This prefixes keys according to a strict namespace convention:

$$\text{redis_key} = \text{service_name} : \text{action} : \text{original_key}$$

  • Parameters:
    • service_name (str, default "safari_pro"): Declared in RedisConfig. Prevents cross-app database contamination.
    • action (str, default "data"): Scopes key usage (e.g., data, cache, stream, queue).
  • Example Generation:
    • Original Key: "booking:8848"
    • Generated Key: safari_pro:data:booking:8848

2. Cache Data envelope Serialization

For caching payloads (strings, hashes, simple JSON targets), keys are packed into a typed JSON envelope containing contextual metadata.

{
    "request_id": "4020a6e3-fcf3-40e1-b4f6-6cbe702e5b85",
    "event": "update",
    "type": "cache",
    "service": "safari_pro",
    "tenant_name": null,
    "payload": {
        "booking_id": "BK-9989",
        "total_amount": 1500.0
    },
    "timestamp": "2026-06-29T10:24:18.123456"
}
  • Envelope Fields:

    • request_id (str): UUIDv4 tracking identifier.
    • event (str): Action trigger description (e.g., update, invalidate).
    • type (str): Target cache category type.
    • service (str): Originating application namespace name.
    • tenant_name (Optional[str]): Partition indicator for multi-tenancy.
    • payload (Any): The actual payload data structure.
    • timestamp (str): UTC timestamp of serialization in ISO-8601 formatting.
  • Note: Read operations automatically deserialize the envelope, returning only the underlying payload value.

3. Messaging Emergency Fallback Envelopes

When circuit breakers trip or connection drops occur, message payloads are wrapped with failover metadata.

Redis Emergency Fallback

Stranded messages waiting for Pulsar recovery are written to a Redis Stream with the key pattern: $$\text{key} = \text{emergency-fallback} : {topic}$$

  • Envelope Schema:
    {
        "origin_topic": "booking-requests",
        "failed_at": 1782728658.452,
        "data": { ... }
    }
    

Pulsar Emergency Fallback

When Redis is down and fallback is routed directly to Pulsar, messages are packaged with origin metadata:

  • Target Topic: persistent://public/default/emergency-fallback-redis-{topic}
  • Envelope Schema:
    {
        "origin": "booking-requests",
        "time": 1782728658.452,
        "data": { ... }
    }
    

⏳ 1. Temporal Orchestration SDK

TemporalConfig

A configuration dataclass used to initialize the TemporalClient.

Field Name Type Default Description
host str "localhost" Endpoint hostname of the Temporal frontend service.
port int 7233 Endpoint port of the Temporal frontend service.
namespace str "default" Target namespace partition.
ssl bool False Set to True to enable secure TLS communication.
client_cert_path Optional[str] None Absolute path to the client certificate (.crt/.pem).
client_key_path Optional[str] None Absolute path to the client private key (.key).
auto_start_worker bool False Automatically boots background worker thread on execution.
queues List[str] ["default"] List of task queues the background worker monitors.
mock_mode Optional[bool] None Explicit override. If omitted, mode is auto-detected from environment variables.

@tc.workflow (Alias: @tc.process)

Marks an async function as a stateful workflow process coordinator.

  • Parameters:
    • queue (str, default "default"): The task queue where this workflow registration and its tasks reside.
  • Attributes Attached to Wrapper:
    • .delay(*args, **kwargs): Triggers the workflow asynchronously, returning a TaskHandle.
    • .workflow_name: Registered string identifier (the decorated function's name).
    • ._queue: Target queue.
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Orchestration logic goes here...
    return {"status": "success"}

@tc.task (Alias: @tc.step)

Marks a function (sync or async) as an activity step. Activity steps execute real network operations, queries, or API calls.

  • Parameters:
    • queue (str, default "default"): Task queue where this step executes.
    • retry_attempts (Optional[int], default None): Maximum times to retry if an exception occurs.
    • backoff_seconds (Optional[int], default None): Initial backoff delay (seconds) before retrying.
    • fallback (Optional[Callable], default None): Backup function run if all retries fail.
    • timeout (int, default 300): Maximum time allowed for a single run in seconds.
    • heartbeat_timeout (Optional[int], default None): Time limit between activity status heartbeats.
def log_failed_payment(booking_id: str, amount: float):
    print(f"Payment failed for {booking_id} after retries.")
    return {"status": "payment_failed_logged"}

@tc.step(
    queue="billing-queue",
    retry_attempts=3,
    backoff_seconds=5,
    fallback=log_failed_payment,
    timeout=60
)
def process_payment(booking_id: str, amount: float):
    # Charge gateway API call...
    return {"txn_id": "TXN-8848"}

tc.execute_step(step_func, *args, **kwargs)

Runs an activity step durably within the workflow context.

  • Parameters:
    • step_func (Callable): The decorated @tc.step function to execute.
    • *args: Positional arguments forwarded to step_func.
    • **kwargs: Keyword arguments forwarded to step_func.
  • Returns: Any return value from step_func.
result = await tc.execute_step(process_payment, "booking-id-123", 450.0)

tc.execute_local_step(step_func, *args, **kwargs)

Runs an activity step locally inside the worker process running the workflow. Bypasses the Temporal server queues.

  • Parameters:
    • step_func (Callable): Decorated @tc.step function.
    • *args / **kwargs: Forwarded arguments.
    • timeout (int, default 5): Maximum local timeout in seconds.
    • retry_attempts (int, default 3): Maximum retries.
  • Returns: Any return value from step_func.
is_valid = await tc.execute_local_step(check_promo_validity, "SAFARI10", 450.0)

tc.execute_in_parallel(*calls) (Alias: fan_out)

Executes multiple step functions concurrently (Fan-Out/Fan-In).

  • Parameters:
    • *calls (tuple): Varargs of tuples. Each tuple is formatted as:
      1. step_func (Callable): The @tc.step function to run.
      2. args_list (list): List of positional arguments.
      3. kwargs_dict (dict, optional): Dict of keyword arguments.
  • Returns: list containing results in corresponding order.
results = await tc.execute_in_parallel(
    (search_hotels, ["Arusha"]),
    (search_flights, ["JRO"])
)
hotels, flights = results[0], results[1]

tc.execute_in_series(*calls)

Executes multiple step functions sequentially.

  • Parameters: Same format as execute_in_parallel.
  • Returns: list containing results in order of completion.
results = await tc.execute_in_series(
    (create_invoice, ["book-99"]),
    (send_receipt, ["book-99"])
)

tc.start_process(process_name, queue, *args, **kwargs)

Launches a workflow process asynchronously and returns a TaskHandle.

  • Parameters:
    • process_name (str): Name of the @tc.process workflow.
    • queue (str): Target queue.
    • *args / **kwargs: Inputs forwarded to the workflow.
  • Returns: TaskHandle to query or await workflow status and results.
handle = tc.start_process("safari_booking_workflow", "booking-queue", {"tour_id": "T1"})

TaskHandle Methods

  • get(timeout: Optional[float] = None) -> Any: Blocks until completion and returns workflow output.
  • cancel() -> None: Sends a cancellation request to the workflow.
  • then(next_process_func: Callable, queue: Optional[str] = None, *args, **kwargs) -> TaskHandle: Chains another process that starts automatically with the current workflow's result.
  • status (str): Returns "RUNNING", "COMPLETED", or "CANCELLED".
result = handle.get(timeout=60.0)

tc.set_state(process_id, key, value)

Signals a running workflow to update its state dict out-of-band.

  • Parameters:
    • process_id (str): The workflow's ID.
    • key (str): State key name.
    • value (Any): Value to set.
tc.set_state(process_id, "guide_assigned", "Alex Kibona")

tc.get_state(process_id, key) -> Any

Queries a running workflow for a state value.

guide = tc.get_state(process_id, "guide_assigned")

tc.side_effect(func, *args) -> Any

Safely records non-deterministic operations (UUIDs, timestamps, database lookups) into workflow history.

  • Parameters:
    • func (Callable): A function returning a non-deterministic value.
    • *args: Positional arguments passed to func.
  • Returns: The result of func(*args).
txn_ref = tc.side_effect(lambda: f"REF-{uuid.uuid4().hex[:8].upper()}")

tc.continue_process_as_new(process_func, *args, **kwargs)

Restarts the active workflow with a clean history to prevent unbounded log growth.

tc.continue_process_as_new(safari_booking_workflow, new_payload)

Saga Transaction Coordinator

Rolls back actions in Last-In-First-Out (LIFO) order if an error is encountered.

from smartinno.temporal.client import Saga

@tc.process(queue="booking-queue")
async def booking_saga_workflow(payload: dict):
    saga = Saga(tc)
    try:
        # Step 1: Reserve Hotel
        hotel = await tc.execute_step(reserve_hotel, payload["hotel_id"])
        saga.add_compensation(release_hotel_reservation, hotel["reservation_id"])
        
        # Step 2: Book Tour (might fail)
        await tc.execute_step(book_safari_tour, payload["tour_id"])
        
    except Exception as e:
        await saga.compensate()
        raise e

Dynamic Queue Routing (tc.route_step)

Routes steps dynamically to specific queues at runtime.

  • Parameters:
    • step_func (Callable): @tc.step function to execute.
    • router (Callable[..., str]): Function returning the target queue string.
    • *args / **kwargs: Arguments passed to step_func and router.
def select_billing_queue(amount: float):
    return "high-value-payments" if amount >= 10000 else "standard-payments"

await tc.route_step(process_payment, select_billing_queue, amount=12500)

Namespace Management APIs

  • create_namespace(namespace_name: str, retention_days: int = 3) -> bool
  • describe_namespace(namespace_name: str) -> dict
  • update_namespace(namespace_name: str, retention_days: Optional[int] = None, description: Optional[str] = None, owner_email: Optional[str] = None) -> bool
  • delete_namespace(namespace_name: str) -> bool
tc.create_namespace("tenant-serengeti-lodges", retention_days=5)

Scheduler (tc.create_schedule)

  • schedule_id (str): Unique identifier for the scheduled task.
  • process_func (Callable): Workflow process to launch.
  • frequency (Frequency): Pre-defined frequency (ONCE, REPEATED, HOURLY, DAILY, WEEKLY, MONTHLY).
  • cron_expression (Optional[str]): Raw cron string (e.g., 0 0 * * 1-5).
  • count (Optional[int]): Maximum execution runs allowed before termination.
  • interval_minutes (Optional[int]): Run workflow at specific minute intervals.
await tc.create_schedule(
    schedule_id="daily-reconciliation",
    process_func=reconciliation_workflow,
    frequency=Frequency.DAILY
)

2. 🔌 Custom Redis Connectivity SDK

Provides a unified interface IRedisClient with support for Sentinel, Cluster, Hybrid proxies (e.g., Predixy), and an auto-routing Intelligent Router.

RedisConfig

Configuration settings for Redis instances. Full configuration properties:

Parameter Type Default Description
host str "localhost" Redis hostname or IP address.
port int 6379 Redis port number.
password Optional[str] None Password authentication string.
db int 0 Redis logical database index.
service_name str "safari_pro" Key prefix namespace prefix to prevent collisions.
max_connections int 100 Maximum size of the client connection pool.
socket_timeout float 0.5 Socket read/write timeout in seconds (500ms).
socket_connect_timeout float 0.2 Initial socket connection timeout in seconds (200ms).
socket_keepalive bool True Sends TCP keepalive probes to keep connection open.
health_check_interval int 30 Liveness check interval in seconds.
retry_on_timeout bool True Retries commands automatically when socket timeouts happen.
retry_backoff bool True Enforce exponential backoff on retry operations.
retry_backoff_retries int 3 Maximum retry attempts under backoff (Range: 3-5).
retry_backoff_min float 0.2 Minimum backoff delay in seconds (200ms).
retry_backoff_max float 0.5 Maximum backoff delay in seconds (500ms).
handle_moved_redirects bool True Redirects MOVED slot errors transparently in Cluster mode.
handle_ask_redirects bool True Redirects ASK slot migration errors in Cluster mode.
slot_map_caching bool True Cache cluster slots locally to avoid routing hops.
slot_refresh_automatic bool True Triggers slot map refreshes when node redirections fail.
reinitialize_steps int 5 Redirection failures before performing full slot refreshes.
failover_awareness bool True Monitors topology changes during master failover.
read_from_replicas bool True Reads from replica nodes to distribute cluster read load.
require_full_coverage bool False Continues execution even if slot slots are partially down.
ssl bool False Enables TLS connection protection.
ssl_cert_reqs str "required" Validation requirement for server certificate.
ssl_keyfile Optional[str] None Path to client TLS private key file.
ssl_certfile Optional[str] None Path to client TLS certificate file.
ssl_ca_certs Optional[str] None Path to Certificate Authority bundle file.
startup_nodes Optional[List[dict]] None Seeds list of Cluster startup nodes: [{"host": "...", "port": 7001}].
sentinel_nodes Optional[List[tuple]] None List of Sentinel endpoints: [("host1", 26379), ("host2", 26379)].
sentinel_master_name Optional[str] None Master group name monitored by Sentinel.
sentinel_port Optional[int] None Custom Sentinel connection port override.
hybrid_port Optional[int] None Custom Hybrid proxy connection port override.
cluster_host_mapping Optional[dict] None NAT/Docker cluster node host map translation dictionary.

RedisClientFactory

The factory class used to resolve and instantiate specific client adapters matching the target architecture.

Method Signature:

@staticmethod
def get_client(
    config: RedisConfig = None, 
    architecture: Architecture = None, 
    use_case: UseCase = None,
    connection: Any = None,
    sentinel_connection: Any = None,
    hybrid_connection: Any = None,
    native_connection: Any = None
) -> IRedisClient:
  • Parameters:
    • config (RedisConfig): Custom connection configuration object.
    • architecture (Architecture): Explicit architecture choice override (takes precedence).
    • use_case (UseCase): Guiding workload type to automatically select target topology.
    • connection / sentinel_connection / hybrid_connection / native_connection (Any): Optional pre-established connections to bypass client instantiations.
  • Adapter Resolution Output:
    • Architecture.SENTINEL -> Returns SentinelClientAdapter
    • Architecture.CLUSTER -> Returns ClusterClientAdapter
    • Architecture.HYBRID -> Returns HybridClientAdapter
    • Architecture.SMART_ROUTER -> Returns IntelligentRouterAdapter (holding nested config states for all adapters)
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.redis_custom.factory import RedisClientFactory

# Retrieve high concurrency cluster client
redis_client = RedisClientFactory.get_client(
    config=RedisConfig(host="localhost", port=7001),
    use_case=UseCase.HIGH_CONCURRENCY
)

Core Operations (IRedisClient)

  • set(key: str, value: Any) -> bool
  • get(key: str) -> Any
  • mget(keys: List[str]) -> List[Any]
  • hset(name: str, key: str = None, value: str = None, mapping: dict = None, ttl: int = None) -> int
  • hget(name: str, key: str) -> Any
  • lpush(name: str, *values, ttl: int = None) -> int
  • rpop(name: str) -> Any
  • pipeline(transaction: bool = True, shard_key: str = None) -> Pipeline

Multi-key Transaction Sharding

When using cluster architecture, transactions must route to the same slot using hash tags.

# Shard key guarantees slots match in Redis Cluster
with redis_client.pipeline(transaction=True, shard_key="hotel:987") as pipe:
    pipe.set("hotel:987:name", "Serengeti Lodge")
    pipe.set("hotel:987:available_rooms", 12)
    pipe.execute()

High Concurrency Redis Streams APIs

  • xadd(name: str, fields: dict) -> str: Appends a message to a stream.
  • xread(streams: dict, count: int = None, block: int = None) -> List: Reads entries from streams.
  • xgroup_create(name: str, groupname: str, id: str = "$", mkstream: bool = False): Registers consumer groups.
  • xreadgroup(groupname: str, consumername: str, streams: dict, count: int = None, block: int = None) -> List: Reads streams inside a consumer group.
  • xack(name: str, groupname: str, *ids): Acknowledges message receipt.
# Publish to stream
msg_id = redis_client.xadd("safari-bookings", {"booking_id": "BK-99", "status": "pending"})

# Read group messages
entries = redis_client.xreadgroup("booking-group", "consumer-1", {"safari-bookings": ">"}, count=10)

3. 📡 Resilient Messaging Facade (Pulsar / Redis)

The messaging facade provides resilient event pub/sub. It supports dual-active routing, client-side circuit breakers, and automatic database-to-broker failover.

PulsarConfig

Configuration parameters for Apache Pulsar.

Field Name Type Default Description
host str "localhost" Pulsar broker host.
port int 6650 Pulsar broker port.
connection_timeout_ms int 5000 Connect timeout (default 5s).
operation_timeout_ms int 10000 Read/write timeout (default 10s).

@msg.on(topic, strategy="load_balanced", retries=2)

Subscribes a callback to a topic. Supports specific distribution strategies:

  • Pulsar Subscription Types:
    • broadcast: Creates an exclusive subscription with a randomized ID so every node receives the message.
    • load_balanced: Creates a shared subscription (Shared) to distribute work across available workers.
    • sticky: Creates a key-shared subscription (Key_Shared) to route messages with matching partition keys to the same consumer.
    • failover: Creates a failover subscription (Failover) with active-standby consumers.
  • Redis Subsystem Adaptation:
    • broadcast routes through standard PubSub.
    • load_balanced or sticky routes through Redis Streams consumer groups.
    • exclusive routes through Redis Lists (RPOP/LPUSH queue patterns).
@msg.on("payment-events", strategy="load_balanced")
def handle_payment(payload):
    print(f"Received payment notification: {payload}")

@msg.on_event(topic, queue="default")

Wires an event topic to automatically start a Temporal workflow when a message arrives.

# Event-driven trigger
@msg.on_event("booking-requests", queue="booking-queue")
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Workflow triggers automatically on message
    await tc.execute_step(confirm_booking, payload["booking_id"])

msg.smart_send(topic, payload, key=None)

Publishes a message to a topic with automatic database-to-broker fallback.

  • Resiliency Workflow:
    1. Attempts to publish to primary backend (e.g. Redis Stream).
    2. If the connection fails or throws errors, it trips the circuit breaker to OPEN.
    3. The message is automatically wrapped and published to the Pulsar emergency fallback topic: persistent://public/default/emergency-fallback-redis-{topic}.
    4. Safe recovery loops buffer stranded payloads until the primary server recovers.
# Resilient send
msg.smart_send("booking-requests", {"booking_id": "BK-100", "user_id": "U5"})

msg.smart_listen(topic, callback, hint=None, criteria=None, explicit_type=None, block=True)

Dynamically listens on a Redis or Pulsar topic. It automatically inspects the channel type (e.g. detecting if a Redis key is a Stream, List, or PubSub channel) and launches the corresponding listener loops.

  • Parameters:
    • topic (str): Target channel to monitor.
    • callback (Callable): Code executed on payload arrival.
    • hint (Any / explicit_type): Used to guide target pattern selection.
    • block (bool, default True): Blocks execution and polls continuously.
# Dynamically resolves stream/list/pubsub loops
msg.smart_listen("booking-requests", process_incoming_booking, explicit_type="stream")

msg.smart_get(topic, hint=None, explicit_type=None, criteria=None)

Intelligently queries Redis keys, dynamically casting results based on type detection.

  • Supported Type Resolutions:
    • string / cache -> returned value parsed from JSON.
    • hash -> returns all mapped fields via hgetall.
    • zset / geo -> returns top score ranges via zrange.
    • stream -> returns pending stream entries.
booking_data = msg.smart_get("booking:987", explicit_type="hash")

msg.flush_emergency_fallbacks(topic, batch_size=100)

Drains the Redis emergency fallback queues and replays stranded payloads back into the primary Pulsar cluster once connectivity has restored.

# Recovers and replays stranded messages
recovered = msg.flush_emergency_fallbacks("booking-requests", batch_size=50)
print(f"Replayed {recovered} stranded messages.")

4. 📊 Observability & Monitoring SDK

Provides health checks, counters, and statistics from the different technological subsystems using the MonitoringProvider protocol.

from typing import Dict, Any, Protocol

class MonitoringProvider(Protocol):
    def get_real_time_stats(self) -> Dict[str, Any]: ...
    def check_health(self) -> Dict[str, Any]: ...

TemporalMonitor

Monitors workflow statuses, retry events, and active queues.

from smartinno.monitoring.temporal_monitor import TemporalMonitor

monitor = TemporalMonitor(config=t_config, temporal_client=tc)

# Get statistics on active workflow runs
stats = monitor.get_real_time_stats(workflow_ids=["wf-1", "wf-2"])
print(stats["summary"])  # {'total': 2, 'completed': 1, 'running': 1, ...}

# Retrieve namespace liveness
health = monitor.check_health()
print(health["is_healthy"])  # True / False

RedisMonitor

Monitors cluster topologies, slot redirections, and client pools.

from smartinno.monitoring.redis_monitor import RedisMonitor

r_monitor = RedisMonitor(redis_client)
print(r_monitor.get_real_time_stats())

PulsarMonitor

Checks Pulsar broker liveness and fallback subscriptions.

from smartinno.monitoring.pulsar_monitor import PulsarMonitor

p_monitor = PulsarMonitor(pulsar_config)
print(p_monitor.check_health())

🎓 5. Step-by-Step Tutorial for New Developers

Follow this tutorial to build a fully resilient, event-driven Safari Tour booking pipeline from scratch.

Step 1: Initialize the Configs

Create configuration settings for Redis (using the High Concurrency UseCase mapped to Native Cluster), messaging (with the Pulsar configuration), and Temporal orchestration.

from smartinno.temporal.config import TemporalConfig
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.apache_pulsar.config import PulsarConfig

# 1. Config for Redis Shards
redis_cfg = RedisConfig(
    host="127.0.0.1",
    port=7001,
    cluster_host_mapping={
        "redis-node-1": ("127.0.0.1", 7001),
        "redis-node-2": ("127.0.0.1", 7002),
        "redis-node-3": ("127.0.0.1", 7003),
    }
)

# 2. Config for Pulsar Fallback
pulsar_cfg = PulsarConfig(
    host="164.68.120.77",
    port=6650,
    connection_timeout_ms=5000,
    operation_timeout_ms=10000
)

# 3. Config for Temporal Orchestrator
temporal_cfg = TemporalConfig(
    host="127.0.0.1",
    port=7233,
    namespace="tenant-safari-corp"
)

Step 2: Instantiate SDK Clients

Create the client objects. The message client acts as the dispatcher, and the temporal client coordinates stateful workflows.

from smartinno.temporal.client import TemporalClient
from smartinno.messaging.client import MessageClient

tc = TemporalClient(config=temporal_cfg)

msg = MessageClient(
    backend="redis",
    redis_config=redis_cfg,
    pulsar_config=pulsar_cfg,
    use_case=UseCase.HIGH_CONCURRENCY,
    temporal_client=tc
)

Step 3: Define Workflows and Activity Steps

Decorate your operational tasks using the task decorators.

# An activity step executing transactional calculations
@tc.step(queue="safari-queue", retry_attempts=3, backoff_seconds=2)
def create_booking_record(user_id: str, tour_id: str):
    # Perform database insertion or Redis write
    return {"booking_id": "BK-9092", "status": "reserved"}

# The main process orchestrator
@msg.on_event("booking-requests", queue="safari-queue") # Dynamic workflow start decorator
@tc.process(queue="safari-queue")
async def safari_booking_workflow(payload: dict):
    # Execute step durably
    booking = await tc.execute_step(create_booking_record, payload["user_id"], payload["tour_id"])
    print(f"Booking created: {booking}")
    return booking

Step 4: Boot Up Workers & Start Routing

Instantiate and run the background execution threads.

import threading
from smartinno.temporal.worker import TemporalWorkerManager

# 1. Start Temporal Orchestrator worker manager
worker_manager = TemporalWorkerManager(tc)
worker_manager.start()

# 2. Start Message Client routing loop in a separate thread
msg_thread = threading.Thread(target=lambda: msg.start(block=True), daemon=True)
msg_thread.start()

print("All background worker loops are listening...")

Step 5: Trigger Event Execution

Publish an event to the stream topic. The messaging client intercepts it, auto-routes it to the correct Redis structure, starts the dynamic workflow process, and resolves the step activities durably.

# Publish message -> triggers safari_booking_workflow automatically!
msg.smart_send("booking-requests", {"user_id": "user-88", "tour_id": "tour-maasai-mara"})

🦁 Safari Pro – Enterprise Unified SDK Master API Reference

Welcome to the Safari Pro Unified SDK Master API Reference. This guide is designed to help developers—both new and experienced—understand, configure, and operate the core infrastructure of the Safari Pro Super App.

This SDK unifies Temporal Orchestration, Enterprise Custom Redis Connectivity, Resilient Messaging (Pulsar/Redis Failover), and Observability Monitoring into a single, high-performance toolkit with built-in failover capabilities.


📐 Core SDK Architecture Overview

The Safari Pro SDK suite provides a high-reliability abstraction layer over databases, message buses, and workflow systems. It simplifies building microservices by exposing unified clients with built-in resiliency patterns.

graph TD
    User[Application Code] --> tc[TemporalClient]
    User --> rc[RedisClientFactory]
    User --> mc[MessageClient]
    User --> mon[Observability Monitors]
    
    tc -->|Orchestration| TempServer[Temporal Cluster]
    rc -->|Key-Value / Streams| RedisCluster[Redis Sentinel / Cluster]
    mc -->|Resilient Messaging| PubSub[Pulsar / Redis PubSub]

🛠️ Complete Enums & Configurations Guide

1. Architectural & Routing Enums

smartinno.redis_custom.config.Architecture

Defines the backend deployment topology for Redis connections.

  • SENTINEL ("sentinel"): High-Availability Sentinel setup with active-passive replication and automated master failover.
  • CLUSTER ("cluster"): Cryptographically sharded cluster distributing keys across 16,384 slots over multiple master nodes.
  • HYBRID ("hybrid"): Routes through a Predixy/Envoy proxy gateway to abstract cluster topology away from clients.
  • SMART_ROUTER ("smart_router"): Meta-architecture routing commands dynamically to different Redis topologies.

smartinno.redis_custom.config.UseCase

Guides the factory to auto-select the optimal Redis architecture for a given workload.

  • HIGH_CONCURRENCY: Maps to CLUSTER (Ideal for massive horizontal read/write scale).
  • HEAVY_TRANSACTIONS: Maps to SENTINEL (Ideal for multi-key pipelines and transactions).
  • MICROSERVICES: Maps to HYBRID (Ideal for polyglot systems and proxy gateways).
  • AUTO: Maps to SMART_ROUTER (Dynamically selects based on payload structure).

smartinno.temporal.client.Frequency

Defines scheduling intervals for recurring cron orchestrations.

  • ONCE ("once"): Fires exactly once.
  • REPEATED ("repeated"): Fires every minute.
  • HOURLY ("hourly"): Fires at the start of every hour.
  • DAILY ("daily"): Fires at midnight every day.
  • WEEKLY ("weekly"): Fires at midnight every Sunday.
  • MONTHLY ("monthly"): Fires at midnight on the first day of every month.

smartinno.redis_custom.smart_router.DataSensitivity

Categorizes data payloads based on PII/critical parameters to dictate caching TTL policies.

  • CRITICAL (Matches passport, visa): Bypasses persistent storage; cached for 60 seconds max.
  • HIGH (Matches password, payment, card, cvv, secret, ssn, token): Cached for 300 seconds.
  • MEDIUM (Matches email, phone, user_name): Standard caching rules apply.
  • LOW: Default category for standard operational logs or cache elements.

🔑 Redis Key Generation & Data Serialization Envelopes

To prevent key collision and ensure clean data parsing across heterogeneous systems (e.g. Go, Python, and Node.js microservices), the SDK enforces standardized key structures and JSON packaging envelopes.

1. Standardized Key Formatting Pattern

All keys written to Redis are transparently ran through the BaseRedisAdapter._format_key() handler. This prefixes keys according to a strict namespace convention:

$$\text{redis_key} = \text{service_name} : \text{action} : \text{original_key}$$

  • Parameters:
    • service_name (str, default "safari_pro"): Declared in RedisConfig. Prevents cross-app database contamination.
    • action (str, default "data"): Scopes key usage (e.g., data, cache, stream, queue).
  • Example Generation:
    • Original Key: "booking:8848"
    • Generated Key: safari_pro:data:booking:8848

2. Cache Data envelope Serialization

For caching payloads (strings, hashes, simple JSON targets), keys are packed into a typed JSON envelope containing contextual metadata.

{
    "request_id": "4020a6e3-fcf3-40e1-b4f6-6cbe702e5b85",
    "event": "update",
    "type": "cache",
    "service": "safari_pro",
    "tenant_name": null,
    "payload": {
        "booking_id": "BK-9989",
        "total_amount": 1500.0
    },
    "timestamp": "2026-06-29T10:24:18.123456"
}
  • Envelope Fields:

    • request_id (str): UUIDv4 tracking identifier.
    • event (str): Action trigger description (e.g., update, invalidate).
    • type (str): Target cache category type.
    • service (str): Originating application namespace name.
    • tenant_name (Optional[str]): Partition indicator for multi-tenancy.
    • payload (Any): The actual payload data structure.
    • timestamp (str): UTC timestamp of serialization in ISO-8601 formatting.
  • Note: Read operations automatically deserialize the envelope, returning only the underlying payload value.

3. Messaging Emergency Fallback Envelopes

When circuit breakers trip or connection drops occur, message payloads are wrapped with failover metadata.

Redis Emergency Fallback

Stranded messages waiting for Pulsar recovery are written to a Redis Stream with the key pattern: $$\text{key} = \text{emergency-fallback} : {topic}$$

  • Envelope Schema:
    {
        "origin_topic": "booking-requests",
        "failed_at": 1782728658.452,
        "data": { ... }
    }
    

Pulsar Emergency Fallback

When Redis is down and fallback is routed directly to Pulsar, messages are packaged with origin metadata:

  • Target Topic: persistent://public/default/emergency-fallback-redis-{topic}
  • Envelope Schema:
    {
        "origin": "booking-requests",
        "time": 1782728658.452,
        "data": { ... }
    }
    

⏳ 1. Temporal Orchestration SDK

TemporalConfig

A configuration dataclass used to initialize the TemporalClient.

Field Name Type Default Description
host str "localhost" Endpoint hostname of the Temporal frontend service.
port int 7233 Endpoint port of the Temporal frontend service.
namespace str "default" Target namespace partition.
ssl bool False Set to True to enable secure TLS communication.
client_cert_path Optional[str] None Absolute path to the client certificate (.crt/.pem).
client_key_path Optional[str] None Absolute path to the client private key (.key).
auto_start_worker bool False Automatically boots background worker thread on execution.
queues List[str] ["default"] List of task queues the background worker monitors.
mock_mode Optional[bool] None Explicit override. If omitted, mode is auto-detected from environment variables.

@tc.workflow (Alias: @tc.process)

Marks an async function as a stateful workflow process coordinator.

  • Parameters:
    • queue (str, default "default"): The task queue where this workflow registration and its tasks reside.
  • Attributes Attached to Wrapper:
    • .delay(*args, **kwargs): Triggers the workflow asynchronously, returning a TaskHandle.
    • .workflow_name: Registered string identifier (the decorated function's name).
    • ._queue: Target queue.
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Orchestration logic goes here...
    return {"status": "success"}

@tc.task (Alias: @tc.step)

Marks a function (sync or async) as an activity step. Activity steps execute real network operations, queries, or API calls.

  • Parameters:
    • queue (str, default "default"): Task queue where this step executes.
    • retry_attempts (Optional[int], default None): Maximum times to retry if an exception occurs.
    • backoff_seconds (Optional[int], default None): Initial backoff delay (seconds) before retrying.
    • fallback (Optional[Callable], default None): Backup function run if all retries fail.
    • timeout (int, default 300): Maximum time allowed for a single run in seconds.
    • heartbeat_timeout (Optional[int], default None): Time limit between activity status heartbeats.
def log_failed_payment(booking_id: str, amount: float):
    print(f"Payment failed for {booking_id} after retries.")
    return {"status": "payment_failed_logged"}

@tc.step(
    queue="billing-queue",
    retry_attempts=3,
    backoff_seconds=5,
    fallback=log_failed_payment,
    timeout=60
)
def process_payment(booking_id: str, amount: float):
    # Charge gateway API call...
    return {"txn_id": "TXN-8848"}

tc.execute_step(step_func, *args, **kwargs)

Runs an activity step durably within the workflow context.

  • Parameters:
    • step_func (Callable): The decorated @tc.step function to execute.
    • *args: Positional arguments forwarded to step_func.
    • **kwargs: Keyword arguments forwarded to step_func.
  • Returns: Any return value from step_func.
result = await tc.execute_step(process_payment, "booking-id-123", 450.0)

tc.execute_local_step(step_func, *args, **kwargs)

Runs an activity step locally inside the worker process running the workflow. Bypasses the Temporal server queues.

  • Parameters:
    • step_func (Callable): Decorated @tc.step function.
    • *args / **kwargs: Forwarded arguments.
    • timeout (int, default 5): Maximum local timeout in seconds.
    • retry_attempts (int, default 3): Maximum retries.
  • Returns: Any return value from step_func.
is_valid = await tc.execute_local_step(check_promo_validity, "SAFARI10", 450.0)

tc.execute_in_parallel(*calls) (Alias: fan_out)

Executes multiple step functions concurrently (Fan-Out/Fan-In).

  • Parameters:
    • *calls (tuple): Varargs of tuples. Each tuple is formatted as:
      1. step_func (Callable): The @tc.step function to run.
      2. args_list (list): List of positional arguments.
      3. kwargs_dict (dict, optional): Dict of keyword arguments.
  • Returns: list containing results in corresponding order.
results = await tc.execute_in_parallel(
    (search_hotels, ["Arusha"]),
    (search_flights, ["JRO"])
)
hotels, flights = results[0], results[1]

tc.execute_in_series(*calls)

Executes multiple step functions sequentially.

  • Parameters: Same format as execute_in_parallel.
  • Returns: list containing results in order of completion.
results = await tc.execute_in_series(
    (create_invoice, ["book-99"]),
    (send_receipt, ["book-99"])
)

tc.start_process(process_name, queue, *args, **kwargs)

Launches a workflow process asynchronously and returns a TaskHandle.

  • Parameters:
    • process_name (str): Name of the @tc.process workflow.
    • queue (str): Target queue.
    • *args / **kwargs: Inputs forwarded to the workflow.
  • Returns: TaskHandle to query or await workflow status and results.
handle = tc.start_process("safari_booking_workflow", "booking-queue", {"tour_id": "T1"})

TaskHandle Methods

  • get(timeout: Optional[float] = None) -> Any: Blocks until completion and returns workflow output.
  • cancel() -> None: Sends a cancellation request to the workflow.
  • then(next_process_func: Callable, queue: Optional[str] = None, *args, **kwargs) -> TaskHandle: Chains another process that starts automatically with the current workflow's result.
  • status (str): Returns "RUNNING", "COMPLETED", or "CANCELLED".
result = handle.get(timeout=60.0)

tc.set_state(process_id, key, value)

Signals a running workflow to update its state dict out-of-band.

  • Parameters:
    • process_id (str): The workflow's ID.
    • key (str): State key name.
    • value (Any): Value to set.
tc.set_state(process_id, "guide_assigned", "Alex Kibona")

tc.get_state(process_id, key) -> Any

Queries a running workflow for a state value.

guide = tc.get_state(process_id, "guide_assigned")

tc.side_effect(func, *args) -> Any

Safely records non-deterministic operations (UUIDs, timestamps, database lookups) into workflow history.

  • Parameters:
    • func (Callable): A function returning a non-deterministic value.
    • *args: Positional arguments passed to func.
  • Returns: The result of func(*args).
txn_ref = tc.side_effect(lambda: f"REF-{uuid.uuid4().hex[:8].upper()}")

tc.continue_process_as_new(process_func, *args, **kwargs)

Restarts the active workflow with a clean history to prevent unbounded log growth.

tc.continue_process_as_new(safari_booking_workflow, new_payload)

Saga Transaction Coordinator

Rolls back actions in Last-In-First-Out (LIFO) order if an error is encountered.

from smartinno.temporal.client import Saga

@tc.process(queue="booking-queue")
async def booking_saga_workflow(payload: dict):
    saga = Saga(tc)
    try:
        # Step 1: Reserve Hotel
        hotel = await tc.execute_step(reserve_hotel, payload["hotel_id"])
        saga.add_compensation(release_hotel_reservation, hotel["reservation_id"])
        
        # Step 2: Book Tour (might fail)
        await tc.execute_step(book_safari_tour, payload["tour_id"])
        
    except Exception as e:
        await saga.compensate()
        raise e

Dynamic Queue Routing (tc.route_step)

Routes steps dynamically to specific queues at runtime.

  • Parameters:
    • step_func (Callable): @tc.step function to execute.
    • router (Callable[..., str]): Function returning the target queue string.
    • *args / **kwargs: Arguments passed to step_func and router.
def select_billing_queue(amount: float):
    return "high-value-payments" if amount >= 10000 else "standard-payments"

await tc.route_step(process_payment, select_billing_queue, amount=12500)

Namespace Management APIs

  • create_namespace(namespace_name: str, retention_days: int = 3) -> bool
  • describe_namespace(namespace_name: str) -> dict
  • update_namespace(namespace_name: str, retention_days: Optional[int] = None, description: Optional[str] = None, owner_email: Optional[str] = None) -> bool
  • delete_namespace(namespace_name: str) -> bool
tc.create_namespace("tenant-serengeti-lodges", retention_days=5)

Scheduler (tc.create_schedule)

  • schedule_id (str): Unique identifier for the scheduled task.
  • process_func (Callable): Workflow process to launch.
  • frequency (Frequency): Pre-defined frequency (ONCE, REPEATED, HOURLY, DAILY, WEEKLY, MONTHLY).
  • cron_expression (Optional[str]): Raw cron string (e.g., 0 0 * * 1-5).
  • count (Optional[int]): Maximum execution runs allowed before termination.
  • interval_minutes (Optional[int]): Run workflow at specific minute intervals.
await tc.create_schedule(
    schedule_id="daily-reconciliation",
    process_func=reconciliation_workflow,
    frequency=Frequency.DAILY
)

2. 🔌 Custom Redis Connectivity SDK

Provides a unified interface IRedisClient with support for Sentinel, Cluster, Hybrid proxies (e.g., Predixy), and an auto-routing Intelligent Router.

RedisConfig

Configuration settings for Redis instances. Key configuration fields:

Field Name Type Default Description
host str "localhost" Redis server host.
port int 6379 Redis server port.
password Optional[str] None Authentication password.
db int 0 Database index.
max_connections int 100 Max connection pool size.
socket_timeout float 0.5 500ms operations socket timeout.
socket_connect_timeout float 0.2 200ms connection establish timeout.
retry_on_timeout bool True Automatically retry on socket timeout.
cluster_host_mapping Optional[dict] None NAT/Docker cluster host maps.

RedisClientFactory

Instantiates the correct adapter according to specified architecture or use case.

from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.redis_custom.factory import RedisClientFactory

# Retrieve high concurrency cluster client
redis_client = RedisClientFactory.get_client(
    config=RedisConfig(host="localhost", port=7001),
    use_case=UseCase.HIGH_CONCURRENCY
)

Core Operations (IRedisClient)

  • set(key: str, value: Any) -> bool
  • get(key: str) -> Any
  • mget(keys: List[str]) -> List[Any]
  • hset(name: str, key: str = None, value: str = None, mapping: dict = None, ttl: int = None) -> int
  • hget(name: str, key: str) -> Any
  • lpush(name: str, *values, ttl: int = None) -> int
  • rpop(name: str) -> Any
  • pipeline(transaction: bool = True, shard_key: str = None) -> Pipeline

Multi-key Transaction Sharding

When using cluster architecture, transactions must route to the same slot using hash tags.

# Shard key guarantees slots match in Redis Cluster
with redis_client.pipeline(transaction=True, shard_key="hotel:987") as pipe:
    pipe.set("hotel:987:name", "Serengeti Lodge")
    pipe.set("hotel:987:available_rooms", 12)
    pipe.execute()

High Concurrency Redis Streams APIs

  • xadd(name: str, fields: dict) -> str: Appends a message to a stream.
  • xread(streams: dict, count: int = None, block: int = None) -> List: Reads entries from streams.
  • xgroup_create(name: str, groupname: str, id: str = "$", mkstream: bool = False): Registers consumer groups.
  • xreadgroup(groupname: str, consumername: str, streams: dict, count: int = None, block: int = None) -> List: Reads streams inside a consumer group.
  • xack(name: str, groupname: str, *ids): Acknowledges message receipt.
# Publish to stream
msg_id = redis_client.xadd("safari-bookings", {"booking_id": "BK-99", "status": "pending"})

# Read group messages
entries = redis_client.xreadgroup("booking-group", "consumer-1", {"safari-bookings": ">"}, count=10)

3. 📡 Resilient Messaging Facade (Pulsar / Redis)

The messaging facade provides resilient event pub/sub. It supports dual-active routing, client-side circuit breakers, and automatic database-to-broker failover.

PulsarConfig

Configuration parameters for Apache Pulsar.

Field Name Type Default Description
host str "localhost" Pulsar broker host.
port int 6650 Pulsar broker port.
connection_timeout_ms int 5000 Connect timeout (default 5s).
operation_timeout_ms int 10000 Read/write timeout (default 10s).

@msg.on(topic, strategy="load_balanced", retries=2)

Subscribes a callback to a topic. Supports specific distribution strategies:

  • Pulsar Subscription Types:
    • broadcast: Creates an exclusive subscription with a randomized ID so every node receives the message.
    • load_balanced: Creates a shared subscription (Shared) to distribute work across available workers.
    • sticky: Creates a key-shared subscription (Key_Shared) to route messages with matching partition keys to the same consumer.
    • failover: Creates a failover subscription (Failover) with active-standby consumers.
  • Redis Subsystem Adaptation:
    • broadcast routes through standard PubSub.
    • load_balanced or sticky routes through Redis Streams consumer groups.
    • exclusive routes through Redis Lists (RPOP/LPUSH queue patterns).
@msg.on("payment-events", strategy="load_balanced")
def handle_payment(payload):
    print(f"Received payment notification: {payload}")

@msg.on_event(topic, queue="default")

Wires an event topic to automatically start a Temporal workflow when a message arrives.

# Event-driven trigger
@msg.on_event("booking-requests", queue="booking-queue")
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Workflow triggers automatically on message
    await tc.execute_step(confirm_booking, payload["booking_id"])

msg.smart_send(topic, payload, key=None)

Publishes a message to a topic with automatic database-to-broker fallback.

  • Resiliency Workflow:
    1. Attempts to publish to primary backend (e.g. Redis Stream).
    2. If the connection fails or throws errors, it trips the circuit breaker to OPEN.
    3. The message is automatically wrapped and published to the Pulsar emergency fallback topic: persistent://public/default/emergency-fallback-redis-{topic}.
    4. Safe recovery loops buffer stranded payloads until the primary server recovers.
# Resilient send
msg.smart_send("booking-requests", {"booking_id": "BK-100", "user_id": "U5"})

msg.smart_listen(topic, callback, hint=None, criteria=None, explicit_type=None, block=True)

Dynamically listens on a Redis or Pulsar topic. It automatically inspects the channel type (e.g. detecting if a Redis key is a Stream, List, or PubSub channel) and launches the corresponding listener loops.

  • Parameters:
    • topic (str): Target channel to monitor.
    • callback (Callable): Code executed on payload arrival.
    • hint (Any / explicit_type): Used to guide target pattern selection.
    • block (bool, default True): Blocks execution and polls continuously.
# Dynamically resolves stream/list/pubsub loops
msg.smart_listen("booking-requests", process_incoming_booking, explicit_type="stream")

msg.smart_get(topic, hint=None, explicit_type=None, criteria=None)

Intelligently queries Redis keys, dynamically casting results based on type detection.

  • Supported Type Resolutions:
    • string / cache -> returned value parsed from JSON.
    • hash -> returns all mapped fields via hgetall.
    • zset / geo -> returns top score ranges via zrange.
    • stream -> returns pending stream entries.
booking_data = msg.smart_get("booking:987", explicit_type="hash")

msg.flush_emergency_fallbacks(topic, batch_size=100)

Drains the Redis emergency fallback queues and replays stranded payloads back into the primary Pulsar cluster once connectivity has restored.

# Recovers and replays stranded messages
recovered = msg.flush_emergency_fallbacks("booking-requests", batch_size=50)
print(f"Replayed {recovered} stranded messages.")

4. 📊 Observability & Monitoring SDK

Provides health checks, counters, and statistics from the different technological subsystems using the MonitoringProvider protocol.

from typing import Dict, Any, Protocol

class MonitoringProvider(Protocol):
    def get_real_time_stats(self) -> Dict[str, Any]: ...
    def check_health(self) -> Dict[str, Any]: ...

TemporalMonitor

Monitors workflow statuses, retry events, and active queues.

from smartinno.monitoring.temporal_monitor import TemporalMonitor

monitor = TemporalMonitor(config=t_config, temporal_client=tc)

# Get statistics on active workflow runs
stats = monitor.get_real_time_stats(workflow_ids=["wf-1", "wf-2"])
print(stats["summary"])  # {'total': 2, 'completed': 1, 'running': 1, ...}

# Retrieve namespace liveness
health = monitor.check_health()
print(health["is_healthy"])  # True / False

RedisMonitor

Monitors cluster topologies, slot redirections, and client pools.

from smartinno.monitoring.redis_monitor import RedisMonitor

r_monitor = RedisMonitor(redis_client)
print(r_monitor.get_real_time_stats())

PulsarMonitor

Checks Pulsar broker liveness and fallback subscriptions.

from smartinno.monitoring.pulsar_monitor import PulsarMonitor

p_monitor = PulsarMonitor(pulsar_config)
print(p_monitor.check_health())

🎓 5. Step-by-Step Tutorial for New Developers

Follow this tutorial to build a fully resilient, event-driven Safari Tour booking pipeline from scratch.

Step 1: Initialize the Configs

Create configuration settings for Redis (using the High Concurrency UseCase mapped to Native Cluster), messaging (with the Pulsar configuration), and Temporal orchestration.

from smartinno.temporal.config import TemporalConfig
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.apache_pulsar.config import PulsarConfig

# 1. Config for Redis Shards
redis_cfg = RedisConfig(
    host="127.0.0.1",
    port=7001,
    cluster_host_mapping={
        "redis-node-1": ("127.0.0.1", 7001),
        "redis-node-2": ("127.0.0.1", 7002),
        "redis-node-3": ("127.0.0.1", 7003),
    }
)

# 2. Config for Pulsar Fallback
pulsar_cfg = PulsarConfig(
    host="164.68.120.77",
    port=6650,
    connection_timeout_ms=5000,
    operation_timeout_ms=10000
)

# 3. Config for Temporal Orchestrator
temporal_cfg = TemporalConfig(
    host="127.0.0.1",
    port=7233,
    namespace="tenant-safari-corp"
)

Step 2: Instantiate SDK Clients

Create the client objects. The message client acts as the dispatcher, and the temporal client coordinates stateful workflows.

from smartinno.temporal.client import TemporalClient
from smartinno.messaging.client import MessageClient

tc = TemporalClient(config=temporal_cfg)

msg = MessageClient(
    backend="redis",
    redis_config=redis_cfg,
    pulsar_config=pulsar_cfg,
    use_case=UseCase.HIGH_CONCURRENCY,
    temporal_client=tc
)

Step 3: Define Workflows and Activity Steps

Decorate your operational tasks using the task decorators.

# An activity step executing transactional calculations
@tc.step(queue="safari-queue", retry_attempts=3, backoff_seconds=2)
def create_booking_record(user_id: str, tour_id: str):
    # Perform database insertion or Redis write
    return {"booking_id": "BK-9092", "status": "reserved"}

# The main process orchestrator
@msg.on_event("booking-requests", queue="safari-queue") # Dynamic workflow start decorator
@tc.process(queue="safari-queue")
async def safari_booking_workflow(payload: dict):
    # Execute step durably
    booking = await tc.execute_step(create_booking_record, payload["user_id"], payload["tour_id"])
    print(f"Booking created: {booking}")
    return booking

Step 4: Boot Up Workers & Start Routing

Instantiate and run the background execution threads.

import threading
from smartinno.temporal.worker import TemporalWorkerManager

# 1. Start Temporal Orchestrator worker manager
worker_manager = TemporalWorkerManager(tc)
worker_manager.start()

# 2. Start Message Client routing loop in a separate thread
msg_thread = threading.Thread(target=lambda: msg.start(block=True), daemon=True)
msg_thread.start()

print("All background worker loops are listening...")

Step 5: Trigger Event Execution

Publish an event to the stream topic. The messaging client intercepts it, auto-routes it to the correct Redis structure, starts the dynamic workflow process, and resolves the step activities durably.

# Publish message -> triggers safari_booking_workflow automatically!
msg.smart_send("booking-requests", {"user_id": "user-88", "tour_id": "tour-maasai-mara"})

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

smartinno-1.0.3.tar.gz (45.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

smartinno-1.0.3-py3-none-any.whl (31.5 kB view details)

Uploaded Python 3

File details

Details for the file smartinno-1.0.3.tar.gz.

File metadata

  • Download URL: smartinno-1.0.3.tar.gz
  • Upload date:
  • Size: 45.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.3.tar.gz
Algorithm Hash digest
SHA256 7ec8183b22a391124b24d4d0f6f4781502c113753d8207225c6c921c8c85b587
MD5 9ef421878377c17cf9320cb668a58a25
BLAKE2b-256 6349d3a3e9fe82ff293111245d5178e9d9d22a2ca392acc8e2be21f8e9396223

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.3.tar.gz:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file smartinno-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: smartinno-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 31.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for smartinno-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 d50022624649dd92a89267b4b0d0e5ba39d0c1cc0b12bd1d6608f67d7bee7777
MD5 809a175eaaacb0f2935129305f783ed1
BLAKE2b-256 cfbfdeab0363725fa1ad481a54e9d59215ab24a2977603970ad1913822c1d866

See more details on using hashes here.

Provenance

The following attestation bundles were made for smartinno-1.0.3-py3-none-any.whl:

Publisher: publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page