Unified Redis Python SDK for the Safari Pro architecture.
Project description
🦁 Safari Pro – Enterprise Unified SDK Master API Reference
Welcome to the Safari Pro Unified SDK Master API Reference. This guide is designed to help developers—both new and experienced—understand, configure, and operate the core infrastructure of the Safari Pro Super App.
This SDK unifies Temporal Orchestration, Enterprise Custom Redis Connectivity, Resilient Messaging (Pulsar/Redis Failover), and Observability Monitoring into a single, high-performance toolkit with built-in failover capabilities.
📐 Core SDK Architecture Overview
The Safari Pro SDK suite provides a high-reliability abstraction layer over databases, message buses, and workflow systems. It simplifies building microservices by exposing unified clients with built-in resiliency patterns.
graph TD
User[Application Code] --> tc[TemporalClient]
User --> rc[RedisClientFactory]
User --> mc[MessageClient]
User --> mon[Observability Monitors]
tc -->|Orchestration| TempServer[Temporal Cluster]
rc -->|Key-Value / Streams| RedisCluster[Redis Sentinel / Cluster]
mc -->|Resilient Messaging| PubSub[Pulsar / Redis PubSub]
🛠️ Complete Enums & Configurations Guide
1. Architectural & Routing Enums
smartinno.redis_custom.config.Architecture
Defines the backend deployment topology for Redis connections.
SENTINEL("sentinel"): High-Availability Sentinel setup with active-passive replication and automated master failover.CLUSTER("cluster"): Cryptographically sharded cluster distributing keys across 16,384 slots over multiple master nodes.HYBRID("hybrid"): Routes through a Predixy/Envoy proxy gateway to abstract cluster topology away from clients.SMART_ROUTER("smart_router"): Meta-architecture routing commands dynamically to different Redis topologies.
smartinno.redis_custom.config.UseCase
Guides the factory to auto-select the optimal Redis architecture for a given workload.
HIGH_CONCURRENCY: Maps toCLUSTER(Ideal for massive horizontal read/write scale).HEAVY_TRANSACTIONS: Maps toSENTINEL(Ideal for multi-key pipelines and transactions).MICROSERVICES: Maps toHYBRID(Ideal for polyglot systems and proxy gateways).AUTO: Maps toSMART_ROUTER(Dynamically selects based on payload structure).
smartinno.temporal.client.Frequency
Defines scheduling intervals for recurring cron orchestrations.
ONCE("once"): Fires exactly once.REPEATED("repeated"): Fires every minute.HOURLY("hourly"): Fires at the start of every hour.DAILY("daily"): Fires at midnight every day.WEEKLY("weekly"): Fires at midnight every Sunday.MONTHLY("monthly"): Fires at midnight on the first day of every month.
smartinno.redis_custom.smart_router.DataSensitivity
Categorizes data payloads based on PII/critical parameters to dictate caching TTL policies.
CRITICAL(Matchespassport,visa): Bypasses persistent storage; cached for 60 seconds max.HIGH(Matchespassword,payment,card,cvv,secret,ssn,token): Cached for 300 seconds.MEDIUM(Matchesemail,phone,user_name): Standard caching rules apply.LOW: Default category for standard operational logs or cache elements.
🔑 Redis Key Generation & Data Serialization Envelopes
To prevent key collision and ensure clean data parsing across heterogeneous systems (e.g. Go, Python, and Node.js microservices), the SDK enforces standardized key structures and JSON packaging envelopes.
1. Standardized Key Formatting Pattern
All keys written to Redis are transparently ran through the BaseRedisAdapter._format_key() handler. This prefixes keys according to a strict namespace convention:
$$\text{redis_key} = \text{service_name} : \text{action} : \text{original_key}$$
- Parameters:
service_name(str, default"safari_pro"): Declared inRedisConfig. Prevents cross-app database contamination.action(str, default"data"): Scopes key usage (e.g.,data,cache,stream,queue).
- Example Generation:
- Original Key:
"booking:8848" - Generated Key:
safari_pro:data:booking:8848
- Original Key:
2. Cache Data envelope Serialization
For caching payloads (strings, hashes, simple JSON targets), keys are packed into a typed JSON envelope containing contextual metadata.
{
"request_id": "4020a6e3-fcf3-40e1-b4f6-6cbe702e5b85",
"event": "update",
"type": "cache",
"service": "safari_pro",
"tenant_name": null,
"payload": {
"booking_id": "BK-9989",
"total_amount": 1500.0
},
"timestamp": "2026-06-29T10:24:18.123456"
}
-
Envelope Fields:
request_id(str): UUIDv4 tracking identifier.event(str): Action trigger description (e.g.,update,invalidate).type(str): Target cache category type.service(str): Originating application namespace name.tenant_name(Optional[str]): Partition indicator for multi-tenancy.payload(Any): The actual payload data structure.timestamp(str): UTC timestamp of serialization in ISO-8601 formatting.
-
Note: Read operations automatically deserialize the envelope, returning only the underlying
payloadvalue.
3. Messaging Emergency Fallback Envelopes
When circuit breakers trip or connection drops occur, message payloads are wrapped with failover metadata.
Redis Emergency Fallback
Stranded messages waiting for Pulsar recovery are written to a Redis Stream with the key pattern: $$\text{key} = \text{emergency-fallback} : {topic}$$
- Envelope Schema:
{ "origin_topic": "booking-requests", "failed_at": 1782728658.452, "data": { ... } }
Pulsar Emergency Fallback
When Redis is down and fallback is routed directly to Pulsar, messages are packaged with origin metadata:
- Target Topic:
persistent://public/default/emergency-fallback-redis-{topic} - Envelope Schema:
{ "origin": "booking-requests", "time": 1782728658.452, "data": { ... } }
⏳ 1. Temporal Orchestration SDK
TemporalConfig
A configuration dataclass used to initialize the TemporalClient.
| Field Name | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Endpoint hostname of the Temporal frontend service. |
port |
int |
7233 |
Endpoint port of the Temporal frontend service. |
namespace |
str |
"default" |
Target namespace partition. |
ssl |
bool |
False |
Set to True to enable secure TLS communication. |
client_cert_path |
Optional[str] |
None |
Absolute path to the client certificate (.crt/.pem). |
client_key_path |
Optional[str] |
None |
Absolute path to the client private key (.key). |
auto_start_worker |
bool |
False |
Automatically boots background worker thread on execution. |
queues |
List[str] |
["default"] |
List of task queues the background worker monitors. |
mock_mode |
Optional[bool] |
None |
Explicit override. If omitted, mode is auto-detected from environment variables. |
@tc.workflow (Alias: @tc.process)
Marks an async function as a stateful workflow process coordinator.
- Parameters:
queue(str, default"default"): The task queue where this workflow registration and its tasks reside.
- Attributes Attached to Wrapper:
.delay(*args, **kwargs): Triggers the workflow asynchronously, returning aTaskHandle..workflow_name: Registered string identifier (the decorated function's name).._queue: Target queue.
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
# Orchestration logic goes here...
return {"status": "success"}
@tc.task (Alias: @tc.step)
Marks a function (sync or async) as an activity step. Activity steps execute real network operations, queries, or API calls.
- Parameters:
queue(str, default"default"): Task queue where this step executes.retry_attempts(Optional[int], defaultNone): Maximum times to retry if an exception occurs.backoff_seconds(Optional[int], defaultNone): Initial backoff delay (seconds) before retrying.fallback(Optional[Callable], defaultNone): Backup function run if all retries fail.timeout(int, default300): Maximum time allowed for a single run in seconds.heartbeat_timeout(Optional[int], defaultNone): Time limit between activity status heartbeats.
def log_failed_payment(booking_id: str, amount: float):
print(f"Payment failed for {booking_id} after retries.")
return {"status": "payment_failed_logged"}
@tc.step(
queue="billing-queue",
retry_attempts=3,
backoff_seconds=5,
fallback=log_failed_payment,
timeout=60
)
def process_payment(booking_id: str, amount: float):
# Charge gateway API call...
return {"txn_id": "TXN-8848"}
tc.execute_step(step_func, *args, **kwargs)
Runs an activity step durably within the workflow context.
- Parameters:
step_func(Callable): The decorated@tc.stepfunction to execute.*args: Positional arguments forwarded tostep_func.**kwargs: Keyword arguments forwarded tostep_func.
- Returns:
Anyreturn value fromstep_func.
result = await tc.execute_step(process_payment, "booking-id-123", 450.0)
tc.execute_local_step(step_func, *args, **kwargs)
Runs an activity step locally inside the worker process running the workflow. Bypasses the Temporal server queues.
- Parameters:
step_func(Callable): Decorated@tc.stepfunction.*args/**kwargs: Forwarded arguments.timeout(int, default5): Maximum local timeout in seconds.retry_attempts(int, default3): Maximum retries.
- Returns:
Anyreturn value fromstep_func.
is_valid = await tc.execute_local_step(check_promo_validity, "SAFARI10", 450.0)
tc.execute_in_parallel(*calls) (Alias: fan_out)
Executes multiple step functions concurrently (Fan-Out/Fan-In).
- Parameters:
*calls(tuple): Varargs of tuples. Each tuple is formatted as:step_func(Callable): The@tc.stepfunction to run.args_list(list): List of positional arguments.kwargs_dict(dict, optional): Dict of keyword arguments.
- Returns:
listcontaining results in corresponding order.
results = await tc.execute_in_parallel(
(search_hotels, ["Arusha"]),
(search_flights, ["JRO"])
)
hotels, flights = results[0], results[1]
tc.execute_in_series(*calls)
Executes multiple step functions sequentially.
- Parameters: Same format as
execute_in_parallel. - Returns:
listcontaining results in order of completion.
results = await tc.execute_in_series(
(create_invoice, ["book-99"]),
(send_receipt, ["book-99"])
)
tc.start_process(process_name, queue, *args, **kwargs)
Launches a workflow process asynchronously and returns a TaskHandle.
- Parameters:
process_name(str): Name of the@tc.processworkflow.queue(str): Target queue.*args/**kwargs: Inputs forwarded to the workflow.
- Returns:
TaskHandleto query or await workflow status and results.
handle = tc.start_process("safari_booking_workflow", "booking-queue", {"tour_id": "T1"})
TaskHandle Methods
get(timeout: Optional[float] = None) -> Any: Blocks until completion and returns workflow output.cancel() -> None: Sends a cancellation request to the workflow.then(next_process_func: Callable, queue: Optional[str] = None, *args, **kwargs) -> TaskHandle: Chains another process that starts automatically with the current workflow's result.status(str): Returns"RUNNING","COMPLETED", or"CANCELLED".
result = handle.get(timeout=60.0)
tc.set_state(process_id, key, value)
Signals a running workflow to update its state dict out-of-band.
- Parameters:
process_id(str): The workflow's ID.key(str): State key name.value(Any): Value to set.
tc.set_state(process_id, "guide_assigned", "Alex Kibona")
tc.get_state(process_id, key) -> Any
Queries a running workflow for a state value.
guide = tc.get_state(process_id, "guide_assigned")
tc.side_effect(func, *args) -> Any
Safely records non-deterministic operations (UUIDs, timestamps, database lookups) into workflow history.
- Parameters:
func(Callable): A function returning a non-deterministic value.*args: Positional arguments passed tofunc.
- Returns: The result of
func(*args).
txn_ref = tc.side_effect(lambda: f"REF-{uuid.uuid4().hex[:8].upper()}")
tc.continue_process_as_new(process_func, *args, **kwargs)
Restarts the active workflow with a clean history to prevent unbounded log growth.
tc.continue_process_as_new(safari_booking_workflow, new_payload)
Saga Transaction Coordinator
Rolls back actions in Last-In-First-Out (LIFO) order if an error is encountered.
from smartinno.temporal.client import Saga
@tc.process(queue="booking-queue")
async def booking_saga_workflow(payload: dict):
saga = Saga(tc)
try:
# Step 1: Reserve Hotel
hotel = await tc.execute_step(reserve_hotel, payload["hotel_id"])
saga.add_compensation(release_hotel_reservation, hotel["reservation_id"])
# Step 2: Book Tour (might fail)
await tc.execute_step(book_safari_tour, payload["tour_id"])
except Exception as e:
await saga.compensate()
raise e
Dynamic Queue Routing (tc.route_step)
Routes steps dynamically to specific queues at runtime.
- Parameters:
step_func(Callable):@tc.stepfunction to execute.router(Callable[..., str]): Function returning the target queue string.*args/**kwargs: Arguments passed tostep_funcandrouter.
def select_billing_queue(amount: float):
return "high-value-payments" if amount >= 10000 else "standard-payments"
await tc.route_step(process_payment, select_billing_queue, amount=12500)
Namespace Management APIs
create_namespace(namespace_name: str, retention_days: int = 3) -> booldescribe_namespace(namespace_name: str) -> dictupdate_namespace(namespace_name: str, retention_days: Optional[int] = None, description: Optional[str] = None, owner_email: Optional[str] = None) -> booldelete_namespace(namespace_name: str) -> bool
tc.create_namespace("tenant-serengeti-lodges", retention_days=5)
Scheduler (tc.create_schedule)
schedule_id(str): Unique identifier for the scheduled task.process_func(Callable): Workflow process to launch.frequency(Frequency): Pre-defined frequency (ONCE,REPEATED,HOURLY,DAILY,WEEKLY,MONTHLY).cron_expression(Optional[str]): Raw cron string (e.g.,0 0 * * 1-5).count(Optional[int]): Maximum execution runs allowed before termination.interval_minutes(Optional[int]): Run workflow at specific minute intervals.
await tc.create_schedule(
schedule_id="daily-reconciliation",
process_func=reconciliation_workflow,
frequency=Frequency.DAILY
)
2. 🔌 Custom Redis Connectivity SDK
Provides a unified interface IRedisClient with support for Sentinel, Cluster, Hybrid proxies (e.g., Predixy), and an auto-routing Intelligent Router.
RedisConfig
Configuration settings for Redis instances. Full configuration properties:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Redis hostname or IP address. |
port |
int |
6379 |
Redis port number. |
password |
Optional[str] |
None |
Password authentication string. |
db |
int |
0 |
Redis logical database index. |
service_name |
str |
"safari_pro" |
Key prefix namespace prefix to prevent collisions. |
max_connections |
int |
100 |
Maximum size of the client connection pool. |
socket_timeout |
float |
0.5 |
Socket read/write timeout in seconds (500ms). |
socket_connect_timeout |
float |
0.2 |
Initial socket connection timeout in seconds (200ms). |
socket_keepalive |
bool |
True |
Sends TCP keepalive probes to keep connection open. |
health_check_interval |
int |
30 |
Liveness check interval in seconds. |
retry_on_timeout |
bool |
True |
Retries commands automatically when socket timeouts happen. |
retry_backoff |
bool |
True |
Enforce exponential backoff on retry operations. |
retry_backoff_retries |
int |
3 |
Maximum retry attempts under backoff (Range: 3-5). |
retry_backoff_min |
float |
0.2 |
Minimum backoff delay in seconds (200ms). |
retry_backoff_max |
float |
0.5 |
Maximum backoff delay in seconds (500ms). |
handle_moved_redirects |
bool |
True |
Redirects MOVED slot errors transparently in Cluster mode. |
handle_ask_redirects |
bool |
True |
Redirects ASK slot migration errors in Cluster mode. |
slot_map_caching |
bool |
True |
Cache cluster slots locally to avoid routing hops. |
slot_refresh_automatic |
bool |
True |
Triggers slot map refreshes when node redirections fail. |
reinitialize_steps |
int |
5 |
Redirection failures before performing full slot refreshes. |
failover_awareness |
bool |
True |
Monitors topology changes during master failover. |
read_from_replicas |
bool |
True |
Reads from replica nodes to distribute cluster read load. |
require_full_coverage |
bool |
False |
Continues execution even if slot slots are partially down. |
ssl |
bool |
False |
Enables TLS connection protection. |
ssl_cert_reqs |
str |
"required" |
Validation requirement for server certificate. |
ssl_keyfile |
Optional[str] |
None |
Path to client TLS private key file. |
ssl_certfile |
Optional[str] |
None |
Path to client TLS certificate file. |
ssl_ca_certs |
Optional[str] |
None |
Path to Certificate Authority bundle file. |
startup_nodes |
Optional[List[dict]] |
None |
Seeds list of Cluster startup nodes: [{"host": "...", "port": 7001}]. |
sentinel_nodes |
Optional[List[tuple]] |
None |
List of Sentinel endpoints: [("host1", 26379), ("host2", 26379)]. |
sentinel_master_name |
Optional[str] |
None |
Master group name monitored by Sentinel. |
sentinel_port |
Optional[int] |
None |
Custom Sentinel connection port override. |
hybrid_port |
Optional[int] |
None |
Custom Hybrid proxy connection port override. |
cluster_host_mapping |
Optional[dict] |
None |
NAT/Docker cluster node host map translation dictionary. |
RedisClientFactory
The factory class used to resolve and instantiate specific client adapters matching the target architecture.
Method Signature:
@staticmethod
def get_client(
config: RedisConfig = None,
architecture: Architecture = None,
use_case: UseCase = None,
connection: Any = None,
sentinel_connection: Any = None,
hybrid_connection: Any = None,
native_connection: Any = None
) -> IRedisClient:
- Parameters:
config(RedisConfig): Custom connection configuration object.architecture(Architecture): Explicit architecture choice override (takes precedence).use_case(UseCase): Guiding workload type to automatically select target topology.connection/sentinel_connection/hybrid_connection/native_connection(Any): Optional pre-established connections to bypass client instantiations.
- Adapter Resolution Output:
Architecture.SENTINEL-> ReturnsSentinelClientAdapterArchitecture.CLUSTER-> ReturnsClusterClientAdapterArchitecture.HYBRID-> ReturnsHybridClientAdapterArchitecture.SMART_ROUTER-> ReturnsIntelligentRouterAdapter(holding nested config states for all adapters)
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.redis_custom.factory import RedisClientFactory
# Retrieve high concurrency cluster client
redis_client = RedisClientFactory.get_client(
config=RedisConfig(host="localhost", port=7001),
use_case=UseCase.HIGH_CONCURRENCY
)
Core Operations (IRedisClient)
set(key: str, value: Any) -> boolget(key: str) -> Anymget(keys: List[str]) -> List[Any]hset(name: str, key: str = None, value: str = None, mapping: dict = None, ttl: int = None) -> inthget(name: str, key: str) -> Anylpush(name: str, *values, ttl: int = None) -> intrpop(name: str) -> Anypipeline(transaction: bool = True, shard_key: str = None) -> Pipeline
Multi-key Transaction Sharding
When using cluster architecture, transactions must route to the same slot using hash tags.
# Shard key guarantees slots match in Redis Cluster
with redis_client.pipeline(transaction=True, shard_key="hotel:987") as pipe:
pipe.set("hotel:987:name", "Serengeti Lodge")
pipe.set("hotel:987:available_rooms", 12)
pipe.execute()
High Concurrency Redis Streams APIs
xadd(name: str, fields: dict) -> str: Appends a message to a stream.xread(streams: dict, count: int = None, block: int = None) -> List: Reads entries from streams.xgroup_create(name: str, groupname: str, id: str = "$", mkstream: bool = False): Registers consumer groups.xreadgroup(groupname: str, consumername: str, streams: dict, count: int = None, block: int = None) -> List: Reads streams inside a consumer group.xack(name: str, groupname: str, *ids): Acknowledges message receipt.
# Publish to stream
msg_id = redis_client.xadd("safari-bookings", {"booking_id": "BK-99", "status": "pending"})
# Read group messages
entries = redis_client.xreadgroup("booking-group", "consumer-1", {"safari-bookings": ">"}, count=10)
3. 📡 Resilient Messaging Facade (Pulsar / Redis)
The messaging facade provides resilient event pub/sub. It supports dual-active routing, client-side circuit breakers, and automatic database-to-broker failover.
PulsarConfig
Configuration parameters for Apache Pulsar.
| Field Name | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Pulsar broker host. |
port |
int |
6650 |
Pulsar broker port. |
connection_timeout_ms |
int |
5000 |
Connect timeout (default 5s). |
operation_timeout_ms |
int |
10000 |
Read/write timeout (default 10s). |
@msg.on(topic, strategy="load_balanced", retries=2)
Subscribes a callback to a topic. Supports specific distribution strategies:
- Pulsar Subscription Types:
broadcast: Creates an exclusive subscription with a randomized ID so every node receives the message.load_balanced: Creates a shared subscription (Shared) to distribute work across available workers.sticky: Creates a key-shared subscription (Key_Shared) to route messages with matching partition keys to the same consumer.failover: Creates a failover subscription (Failover) with active-standby consumers.
- Redis Subsystem Adaptation:
broadcastroutes through standard PubSub.load_balancedorstickyroutes through Redis Streams consumer groups.exclusiveroutes through Redis Lists (RPOP/LPUSHqueue patterns).
@msg.on("payment-events", strategy="load_balanced")
def handle_payment(payload):
print(f"Received payment notification: {payload}")
@msg.on_event(topic, queue="default")
Wires an event topic to automatically start a Temporal workflow when a message arrives.
# Event-driven trigger
@msg.on_event("booking-requests", queue="booking-queue")
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
# Workflow triggers automatically on message
await tc.execute_step(confirm_booking, payload["booking_id"])
msg.smart_send(topic, payload, key=None)
Publishes a message to a topic with automatic database-to-broker fallback.
- Resiliency Workflow:
- Attempts to publish to primary backend (e.g. Redis Stream).
- If the connection fails or throws errors, it trips the circuit breaker to
OPEN. - The message is automatically wrapped and published to the Pulsar emergency fallback topic:
persistent://public/default/emergency-fallback-redis-{topic}. - Safe recovery loops buffer stranded payloads until the primary server recovers.
# Resilient send
msg.smart_send("booking-requests", {"booking_id": "BK-100", "user_id": "U5"})
msg.smart_listen(topic, callback, hint=None, criteria=None, explicit_type=None, block=True)
Dynamically listens on a Redis or Pulsar topic. It automatically inspects the channel type (e.g. detecting if a Redis key is a Stream, List, or PubSub channel) and launches the corresponding listener loops.
- Parameters:
topic(str): Target channel to monitor.callback(Callable): Code executed on payload arrival.hint(Any/explicit_type): Used to guide target pattern selection.block(bool, defaultTrue): Blocks execution and polls continuously.
# Dynamically resolves stream/list/pubsub loops
msg.smart_listen("booking-requests", process_incoming_booking, explicit_type="stream")
msg.smart_get(topic, hint=None, explicit_type=None, criteria=None)
Intelligently queries Redis keys, dynamically casting results based on type detection.
- Supported Type Resolutions:
string/cache-> returned value parsed from JSON.hash-> returns all mapped fields viahgetall.zset/geo-> returns top score ranges viazrange.stream-> returns pending stream entries.
booking_data = msg.smart_get("booking:987", explicit_type="hash")
msg.flush_emergency_fallbacks(topic, batch_size=100)
Drains the Redis emergency fallback queues and replays stranded payloads back into the primary Pulsar cluster once connectivity has restored.
# Recovers and replays stranded messages
recovered = msg.flush_emergency_fallbacks("booking-requests", batch_size=50)
print(f"Replayed {recovered} stranded messages.")
4. 📊 Observability & Monitoring SDK
Provides health checks, counters, and statistics from the different technological subsystems using the MonitoringProvider protocol.
from typing import Dict, Any, Protocol
class MonitoringProvider(Protocol):
def get_real_time_stats(self) -> Dict[str, Any]: ...
def check_health(self) -> Dict[str, Any]: ...
TemporalMonitor
Monitors workflow statuses, retry events, and active queues.
from smartinno.monitoring.temporal_monitor import TemporalMonitor
monitor = TemporalMonitor(config=t_config, temporal_client=tc)
# Get statistics on active workflow runs
stats = monitor.get_real_time_stats(workflow_ids=["wf-1", "wf-2"])
print(stats["summary"]) # {'total': 2, 'completed': 1, 'running': 1, ...}
# Retrieve namespace liveness
health = monitor.check_health()
print(health["is_healthy"]) # True / False
RedisMonitor
Monitors cluster topologies, slot redirections, and client pools.
from smartinno.monitoring.redis_monitor import RedisMonitor
r_monitor = RedisMonitor(redis_client)
print(r_monitor.get_real_time_stats())
PulsarMonitor
Checks Pulsar broker liveness and fallback subscriptions.
from smartinno.monitoring.pulsar_monitor import PulsarMonitor
p_monitor = PulsarMonitor(pulsar_config)
print(p_monitor.check_health())
🎓 5. Step-by-Step Tutorial for New Developers
Follow this tutorial to build a fully resilient, event-driven Safari Tour booking pipeline from scratch.
Step 1: Initialize the Configs
Create configuration settings for Redis (using the High Concurrency UseCase mapped to Native Cluster), messaging (with the Pulsar configuration), and Temporal orchestration.
from smartinno.temporal.config import TemporalConfig
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.apache_pulsar.config import PulsarConfig
# 1. Config for Redis Shards
redis_cfg = RedisConfig(
host="127.0.0.1",
port=7001,
cluster_host_mapping={
"redis-node-1": ("127.0.0.1", 7001),
"redis-node-2": ("127.0.0.1", 7002),
"redis-node-3": ("127.0.0.1", 7003),
}
)
# 2. Config for Pulsar Fallback
pulsar_cfg = PulsarConfig(
host="164.68.120.77",
port=6650,
connection_timeout_ms=5000,
operation_timeout_ms=10000
)
# 3. Config for Temporal Orchestrator
temporal_cfg = TemporalConfig(
host="127.0.0.1",
port=7233,
namespace="tenant-safari-corp"
)
Step 2: Instantiate SDK Clients
Create the client objects. The message client acts as the dispatcher, and the temporal client coordinates stateful workflows.
from smartinno.temporal.client import TemporalClient
from smartinno.messaging.client import MessageClient
tc = TemporalClient(config=temporal_cfg)
msg = MessageClient(
backend="redis",
redis_config=redis_cfg,
pulsar_config=pulsar_cfg,
use_case=UseCase.HIGH_CONCURRENCY,
temporal_client=tc
)
Step 3: Define Workflows and Activity Steps
Decorate your operational tasks using the task decorators.
# An activity step executing transactional calculations
@tc.step(queue="safari-queue", retry_attempts=3, backoff_seconds=2)
def create_booking_record(user_id: str, tour_id: str):
# Perform database insertion or Redis write
return {"booking_id": "BK-9092", "status": "reserved"}
# The main process orchestrator
@msg.on_event("booking-requests", queue="safari-queue") # Dynamic workflow start decorator
@tc.process(queue="safari-queue")
async def safari_booking_workflow(payload: dict):
# Execute step durably
booking = await tc.execute_step(create_booking_record, payload["user_id"], payload["tour_id"])
print(f"Booking created: {booking}")
return booking
Step 4: Boot Up Workers & Start Routing
Instantiate and run the background execution threads.
import threading
from smartinno.temporal.worker import TemporalWorkerManager
# 1. Start Temporal Orchestrator worker manager
worker_manager = TemporalWorkerManager(tc)
worker_manager.start()
# 2. Start Message Client routing loop in a separate thread
msg_thread = threading.Thread(target=lambda: msg.start(block=True), daemon=True)
msg_thread.start()
print("All background worker loops are listening...")
Step 5: Trigger Event Execution
Publish an event to the stream topic. The messaging client intercepts it, auto-routes it to the correct Redis structure, starts the dynamic workflow process, and resolves the step activities durably.
# Publish message -> triggers safari_booking_workflow automatically!
msg.smart_send("booking-requests", {"user_id": "user-88", "tour_id": "tour-maasai-mara"})
🦁 Safari Pro – Enterprise Unified SDK Master API Reference
Welcome to the Safari Pro Unified SDK Master API Reference. This guide is designed to help developers—both new and experienced—understand, configure, and operate the core infrastructure of the Safari Pro Super App.
This SDK unifies Temporal Orchestration, Enterprise Custom Redis Connectivity, Resilient Messaging (Pulsar/Redis Failover), and Observability Monitoring into a single, high-performance toolkit with built-in failover capabilities.
📐 Core SDK Architecture Overview
The Safari Pro SDK suite provides a high-reliability abstraction layer over databases, message buses, and workflow systems. It simplifies building microservices by exposing unified clients with built-in resiliency patterns.
graph TD
User[Application Code] --> tc[TemporalClient]
User --> rc[RedisClientFactory]
User --> mc[MessageClient]
User --> mon[Observability Monitors]
tc -->|Orchestration| TempServer[Temporal Cluster]
rc -->|Key-Value / Streams| RedisCluster[Redis Sentinel / Cluster]
mc -->|Resilient Messaging| PubSub[Pulsar / Redis PubSub]
🛠️ Complete Enums & Configurations Guide
1. Architectural & Routing Enums
smartinno.redis_custom.config.Architecture
Defines the backend deployment topology for Redis connections.
SENTINEL("sentinel"): High-Availability Sentinel setup with active-passive replication and automated master failover.CLUSTER("cluster"): Cryptographically sharded cluster distributing keys across 16,384 slots over multiple master nodes.HYBRID("hybrid"): Routes through a Predixy/Envoy proxy gateway to abstract cluster topology away from clients.SMART_ROUTER("smart_router"): Meta-architecture routing commands dynamically to different Redis topologies.
smartinno.redis_custom.config.UseCase
Guides the factory to auto-select the optimal Redis architecture for a given workload.
HIGH_CONCURRENCY: Maps toCLUSTER(Ideal for massive horizontal read/write scale).HEAVY_TRANSACTIONS: Maps toSENTINEL(Ideal for multi-key pipelines and transactions).MICROSERVICES: Maps toHYBRID(Ideal for polyglot systems and proxy gateways).AUTO: Maps toSMART_ROUTER(Dynamically selects based on payload structure).
smartinno.temporal.client.Frequency
Defines scheduling intervals for recurring cron orchestrations.
ONCE("once"): Fires exactly once.REPEATED("repeated"): Fires every minute.HOURLY("hourly"): Fires at the start of every hour.DAILY("daily"): Fires at midnight every day.WEEKLY("weekly"): Fires at midnight every Sunday.MONTHLY("monthly"): Fires at midnight on the first day of every month.
smartinno.redis_custom.smart_router.DataSensitivity
Categorizes data payloads based on PII/critical parameters to dictate caching TTL policies.
CRITICAL(Matchespassport,visa): Bypasses persistent storage; cached for 60 seconds max.HIGH(Matchespassword,payment,card,cvv,secret,ssn,token): Cached for 300 seconds.MEDIUM(Matchesemail,phone,user_name): Standard caching rules apply.LOW: Default category for standard operational logs or cache elements.
🔑 Redis Key Generation & Data Serialization Envelopes
To prevent key collision and ensure clean data parsing across heterogeneous systems (e.g. Go, Python, and Node.js microservices), the SDK enforces standardized key structures and JSON packaging envelopes.
1. Standardized Key Formatting Pattern
All keys written to Redis are transparently ran through the BaseRedisAdapter._format_key() handler. This prefixes keys according to a strict namespace convention:
$$\text{redis_key} = \text{service_name} : \text{action} : \text{original_key}$$
- Parameters:
service_name(str, default"safari_pro"): Declared inRedisConfig. Prevents cross-app database contamination.action(str, default"data"): Scopes key usage (e.g.,data,cache,stream,queue).
- Example Generation:
- Original Key:
"booking:8848" - Generated Key:
safari_pro:data:booking:8848
- Original Key:
2. Cache Data envelope Serialization
For caching payloads (strings, hashes, simple JSON targets), keys are packed into a typed JSON envelope containing contextual metadata.
{
"request_id": "4020a6e3-fcf3-40e1-b4f6-6cbe702e5b85",
"event": "update",
"type": "cache",
"service": "safari_pro",
"tenant_name": null,
"payload": {
"booking_id": "BK-9989",
"total_amount": 1500.0
},
"timestamp": "2026-06-29T10:24:18.123456"
}
-
Envelope Fields:
request_id(str): UUIDv4 tracking identifier.event(str): Action trigger description (e.g.,update,invalidate).type(str): Target cache category type.service(str): Originating application namespace name.tenant_name(Optional[str]): Partition indicator for multi-tenancy.payload(Any): The actual payload data structure.timestamp(str): UTC timestamp of serialization in ISO-8601 formatting.
-
Note: Read operations automatically deserialize the envelope, returning only the underlying
payloadvalue.
3. Messaging Emergency Fallback Envelopes
When circuit breakers trip or connection drops occur, message payloads are wrapped with failover metadata.
Redis Emergency Fallback
Stranded messages waiting for Pulsar recovery are written to a Redis Stream with the key pattern: $$\text{key} = \text{emergency-fallback} : {topic}$$
- Envelope Schema:
{ "origin_topic": "booking-requests", "failed_at": 1782728658.452, "data": { ... } }
Pulsar Emergency Fallback
When Redis is down and fallback is routed directly to Pulsar, messages are packaged with origin metadata:
- Target Topic:
persistent://public/default/emergency-fallback-redis-{topic} - Envelope Schema:
{ "origin": "booking-requests", "time": 1782728658.452, "data": { ... } }
⏳ 1. Temporal Orchestration SDK
TemporalConfig
A configuration dataclass used to initialize the TemporalClient.
| Field Name | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Endpoint hostname of the Temporal frontend service. |
port |
int |
7233 |
Endpoint port of the Temporal frontend service. |
namespace |
str |
"default" |
Target namespace partition. |
ssl |
bool |
False |
Set to True to enable secure TLS communication. |
client_cert_path |
Optional[str] |
None |
Absolute path to the client certificate (.crt/.pem). |
client_key_path |
Optional[str] |
None |
Absolute path to the client private key (.key). |
auto_start_worker |
bool |
False |
Automatically boots background worker thread on execution. |
queues |
List[str] |
["default"] |
List of task queues the background worker monitors. |
mock_mode |
Optional[bool] |
None |
Explicit override. If omitted, mode is auto-detected from environment variables. |
@tc.workflow (Alias: @tc.process)
Marks an async function as a stateful workflow process coordinator.
- Parameters:
queue(str, default"default"): The task queue where this workflow registration and its tasks reside.
- Attributes Attached to Wrapper:
.delay(*args, **kwargs): Triggers the workflow asynchronously, returning aTaskHandle..workflow_name: Registered string identifier (the decorated function's name).._queue: Target queue.
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
# Orchestration logic goes here...
return {"status": "success"}
@tc.task (Alias: @tc.step)
Marks a function (sync or async) as an activity step. Activity steps execute real network operations, queries, or API calls.
- Parameters:
queue(str, default"default"): Task queue where this step executes.retry_attempts(Optional[int], defaultNone): Maximum times to retry if an exception occurs.backoff_seconds(Optional[int], defaultNone): Initial backoff delay (seconds) before retrying.fallback(Optional[Callable], defaultNone): Backup function run if all retries fail.timeout(int, default300): Maximum time allowed for a single run in seconds.heartbeat_timeout(Optional[int], defaultNone): Time limit between activity status heartbeats.
def log_failed_payment(booking_id: str, amount: float):
print(f"Payment failed for {booking_id} after retries.")
return {"status": "payment_failed_logged"}
@tc.step(
queue="billing-queue",
retry_attempts=3,
backoff_seconds=5,
fallback=log_failed_payment,
timeout=60
)
def process_payment(booking_id: str, amount: float):
# Charge gateway API call...
return {"txn_id": "TXN-8848"}
tc.execute_step(step_func, *args, **kwargs)
Runs an activity step durably within the workflow context.
- Parameters:
step_func(Callable): The decorated@tc.stepfunction to execute.*args: Positional arguments forwarded tostep_func.**kwargs: Keyword arguments forwarded tostep_func.
- Returns:
Anyreturn value fromstep_func.
result = await tc.execute_step(process_payment, "booking-id-123", 450.0)
tc.execute_local_step(step_func, *args, **kwargs)
Runs an activity step locally inside the worker process running the workflow. Bypasses the Temporal server queues.
- Parameters:
step_func(Callable): Decorated@tc.stepfunction.*args/**kwargs: Forwarded arguments.timeout(int, default5): Maximum local timeout in seconds.retry_attempts(int, default3): Maximum retries.
- Returns:
Anyreturn value fromstep_func.
is_valid = await tc.execute_local_step(check_promo_validity, "SAFARI10", 450.0)
tc.execute_in_parallel(*calls) (Alias: fan_out)
Executes multiple step functions concurrently (Fan-Out/Fan-In).
- Parameters:
*calls(tuple): Varargs of tuples. Each tuple is formatted as:step_func(Callable): The@tc.stepfunction to run.args_list(list): List of positional arguments.kwargs_dict(dict, optional): Dict of keyword arguments.
- Returns:
listcontaining results in corresponding order.
results = await tc.execute_in_parallel(
(search_hotels, ["Arusha"]),
(search_flights, ["JRO"])
)
hotels, flights = results[0], results[1]
tc.execute_in_series(*calls)
Executes multiple step functions sequentially.
- Parameters: Same format as
execute_in_parallel. - Returns:
listcontaining results in order of completion.
results = await tc.execute_in_series(
(create_invoice, ["book-99"]),
(send_receipt, ["book-99"])
)
tc.start_process(process_name, queue, *args, **kwargs)
Launches a workflow process asynchronously and returns a TaskHandle.
- Parameters:
process_name(str): Name of the@tc.processworkflow.queue(str): Target queue.*args/**kwargs: Inputs forwarded to the workflow.
- Returns:
TaskHandleto query or await workflow status and results.
handle = tc.start_process("safari_booking_workflow", "booking-queue", {"tour_id": "T1"})
TaskHandle Methods
get(timeout: Optional[float] = None) -> Any: Blocks until completion and returns workflow output.cancel() -> None: Sends a cancellation request to the workflow.then(next_process_func: Callable, queue: Optional[str] = None, *args, **kwargs) -> TaskHandle: Chains another process that starts automatically with the current workflow's result.status(str): Returns"RUNNING","COMPLETED", or"CANCELLED".
result = handle.get(timeout=60.0)
tc.set_state(process_id, key, value)
Signals a running workflow to update its state dict out-of-band.
- Parameters:
process_id(str): The workflow's ID.key(str): State key name.value(Any): Value to set.
tc.set_state(process_id, "guide_assigned", "Alex Kibona")
tc.get_state(process_id, key) -> Any
Queries a running workflow for a state value.
guide = tc.get_state(process_id, "guide_assigned")
tc.side_effect(func, *args) -> Any
Safely records non-deterministic operations (UUIDs, timestamps, database lookups) into workflow history.
- Parameters:
func(Callable): A function returning a non-deterministic value.*args: Positional arguments passed tofunc.
- Returns: The result of
func(*args).
txn_ref = tc.side_effect(lambda: f"REF-{uuid.uuid4().hex[:8].upper()}")
tc.continue_process_as_new(process_func, *args, **kwargs)
Restarts the active workflow with a clean history to prevent unbounded log growth.
tc.continue_process_as_new(safari_booking_workflow, new_payload)
Saga Transaction Coordinator
Rolls back actions in Last-In-First-Out (LIFO) order if an error is encountered.
from smartinno.temporal.client import Saga
@tc.process(queue="booking-queue")
async def booking_saga_workflow(payload: dict):
saga = Saga(tc)
try:
# Step 1: Reserve Hotel
hotel = await tc.execute_step(reserve_hotel, payload["hotel_id"])
saga.add_compensation(release_hotel_reservation, hotel["reservation_id"])
# Step 2: Book Tour (might fail)
await tc.execute_step(book_safari_tour, payload["tour_id"])
except Exception as e:
await saga.compensate()
raise e
Dynamic Queue Routing (tc.route_step)
Routes steps dynamically to specific queues at runtime.
- Parameters:
step_func(Callable):@tc.stepfunction to execute.router(Callable[..., str]): Function returning the target queue string.*args/**kwargs: Arguments passed tostep_funcandrouter.
def select_billing_queue(amount: float):
return "high-value-payments" if amount >= 10000 else "standard-payments"
await tc.route_step(process_payment, select_billing_queue, amount=12500)
Namespace Management APIs
create_namespace(namespace_name: str, retention_days: int = 3) -> booldescribe_namespace(namespace_name: str) -> dictupdate_namespace(namespace_name: str, retention_days: Optional[int] = None, description: Optional[str] = None, owner_email: Optional[str] = None) -> booldelete_namespace(namespace_name: str) -> bool
tc.create_namespace("tenant-serengeti-lodges", retention_days=5)
Scheduler (tc.create_schedule)
schedule_id(str): Unique identifier for the scheduled task.process_func(Callable): Workflow process to launch.frequency(Frequency): Pre-defined frequency (ONCE,REPEATED,HOURLY,DAILY,WEEKLY,MONTHLY).cron_expression(Optional[str]): Raw cron string (e.g.,0 0 * * 1-5).count(Optional[int]): Maximum execution runs allowed before termination.interval_minutes(Optional[int]): Run workflow at specific minute intervals.
await tc.create_schedule(
schedule_id="daily-reconciliation",
process_func=reconciliation_workflow,
frequency=Frequency.DAILY
)
2. 🔌 Custom Redis Connectivity SDK
Provides a unified interface IRedisClient with support for Sentinel, Cluster, Hybrid proxies (e.g., Predixy), and an auto-routing Intelligent Router.
RedisConfig
Configuration settings for Redis instances. Key configuration fields:
| Field Name | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Redis server host. |
port |
int |
6379 |
Redis server port. |
password |
Optional[str] |
None |
Authentication password. |
db |
int |
0 |
Database index. |
max_connections |
int |
100 |
Max connection pool size. |
socket_timeout |
float |
0.5 |
500ms operations socket timeout. |
socket_connect_timeout |
float |
0.2 |
200ms connection establish timeout. |
retry_on_timeout |
bool |
True |
Automatically retry on socket timeout. |
cluster_host_mapping |
Optional[dict] |
None |
NAT/Docker cluster host maps. |
RedisClientFactory
Instantiates the correct adapter according to specified architecture or use case.
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.redis_custom.factory import RedisClientFactory
# Retrieve high concurrency cluster client
redis_client = RedisClientFactory.get_client(
config=RedisConfig(host="localhost", port=7001),
use_case=UseCase.HIGH_CONCURRENCY
)
Core Operations (IRedisClient)
set(key: str, value: Any) -> boolget(key: str) -> Anymget(keys: List[str]) -> List[Any]hset(name: str, key: str = None, value: str = None, mapping: dict = None, ttl: int = None) -> inthget(name: str, key: str) -> Anylpush(name: str, *values, ttl: int = None) -> intrpop(name: str) -> Anypipeline(transaction: bool = True, shard_key: str = None) -> Pipeline
Multi-key Transaction Sharding
When using cluster architecture, transactions must route to the same slot using hash tags.
# Shard key guarantees slots match in Redis Cluster
with redis_client.pipeline(transaction=True, shard_key="hotel:987") as pipe:
pipe.set("hotel:987:name", "Serengeti Lodge")
pipe.set("hotel:987:available_rooms", 12)
pipe.execute()
High Concurrency Redis Streams APIs
xadd(name: str, fields: dict) -> str: Appends a message to a stream.xread(streams: dict, count: int = None, block: int = None) -> List: Reads entries from streams.xgroup_create(name: str, groupname: str, id: str = "$", mkstream: bool = False): Registers consumer groups.xreadgroup(groupname: str, consumername: str, streams: dict, count: int = None, block: int = None) -> List: Reads streams inside a consumer group.xack(name: str, groupname: str, *ids): Acknowledges message receipt.
# Publish to stream
msg_id = redis_client.xadd("safari-bookings", {"booking_id": "BK-99", "status": "pending"})
# Read group messages
entries = redis_client.xreadgroup("booking-group", "consumer-1", {"safari-bookings": ">"}, count=10)
3. 📡 Resilient Messaging Facade (Pulsar / Redis)
The messaging facade provides resilient event pub/sub. It supports dual-active routing, client-side circuit breakers, and automatic database-to-broker failover.
PulsarConfig
Configuration parameters for Apache Pulsar.
| Field Name | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Pulsar broker host. |
port |
int |
6650 |
Pulsar broker port. |
connection_timeout_ms |
int |
5000 |
Connect timeout (default 5s). |
operation_timeout_ms |
int |
10000 |
Read/write timeout (default 10s). |
@msg.on(topic, strategy="load_balanced", retries=2)
Subscribes a callback to a topic. Supports specific distribution strategies:
- Pulsar Subscription Types:
broadcast: Creates an exclusive subscription with a randomized ID so every node receives the message.load_balanced: Creates a shared subscription (Shared) to distribute work across available workers.sticky: Creates a key-shared subscription (Key_Shared) to route messages with matching partition keys to the same consumer.failover: Creates a failover subscription (Failover) with active-standby consumers.
- Redis Subsystem Adaptation:
broadcastroutes through standard PubSub.load_balancedorstickyroutes through Redis Streams consumer groups.exclusiveroutes through Redis Lists (RPOP/LPUSHqueue patterns).
@msg.on("payment-events", strategy="load_balanced")
def handle_payment(payload):
print(f"Received payment notification: {payload}")
@msg.on_event(topic, queue="default")
Wires an event topic to automatically start a Temporal workflow when a message arrives.
# Event-driven trigger
@msg.on_event("booking-requests", queue="booking-queue")
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
# Workflow triggers automatically on message
await tc.execute_step(confirm_booking, payload["booking_id"])
msg.smart_send(topic, payload, key=None)
Publishes a message to a topic with automatic database-to-broker fallback.
- Resiliency Workflow:
- Attempts to publish to primary backend (e.g. Redis Stream).
- If the connection fails or throws errors, it trips the circuit breaker to
OPEN. - The message is automatically wrapped and published to the Pulsar emergency fallback topic:
persistent://public/default/emergency-fallback-redis-{topic}. - Safe recovery loops buffer stranded payloads until the primary server recovers.
# Resilient send
msg.smart_send("booking-requests", {"booking_id": "BK-100", "user_id": "U5"})
msg.smart_listen(topic, callback, hint=None, criteria=None, explicit_type=None, block=True)
Dynamically listens on a Redis or Pulsar topic. It automatically inspects the channel type (e.g. detecting if a Redis key is a Stream, List, or PubSub channel) and launches the corresponding listener loops.
- Parameters:
topic(str): Target channel to monitor.callback(Callable): Code executed on payload arrival.hint(Any/explicit_type): Used to guide target pattern selection.block(bool, defaultTrue): Blocks execution and polls continuously.
# Dynamically resolves stream/list/pubsub loops
msg.smart_listen("booking-requests", process_incoming_booking, explicit_type="stream")
msg.smart_get(topic, hint=None, explicit_type=None, criteria=None)
Intelligently queries Redis keys, dynamically casting results based on type detection.
- Supported Type Resolutions:
string/cache-> returned value parsed from JSON.hash-> returns all mapped fields viahgetall.zset/geo-> returns top score ranges viazrange.stream-> returns pending stream entries.
booking_data = msg.smart_get("booking:987", explicit_type="hash")
msg.flush_emergency_fallbacks(topic, batch_size=100)
Drains the Redis emergency fallback queues and replays stranded payloads back into the primary Pulsar cluster once connectivity has restored.
# Recovers and replays stranded messages
recovered = msg.flush_emergency_fallbacks("booking-requests", batch_size=50)
print(f"Replayed {recovered} stranded messages.")
4. 📊 Observability & Monitoring SDK
Provides health checks, counters, and statistics from the different technological subsystems using the MonitoringProvider protocol.
from typing import Dict, Any, Protocol
class MonitoringProvider(Protocol):
def get_real_time_stats(self) -> Dict[str, Any]: ...
def check_health(self) -> Dict[str, Any]: ...
TemporalMonitor
Monitors workflow statuses, retry events, and active queues.
from smartinno.monitoring.temporal_monitor import TemporalMonitor
monitor = TemporalMonitor(config=t_config, temporal_client=tc)
# Get statistics on active workflow runs
stats = monitor.get_real_time_stats(workflow_ids=["wf-1", "wf-2"])
print(stats["summary"]) # {'total': 2, 'completed': 1, 'running': 1, ...}
# Retrieve namespace liveness
health = monitor.check_health()
print(health["is_healthy"]) # True / False
RedisMonitor
Monitors cluster topologies, slot redirections, and client pools.
from smartinno.monitoring.redis_monitor import RedisMonitor
r_monitor = RedisMonitor(redis_client)
print(r_monitor.get_real_time_stats())
PulsarMonitor
Checks Pulsar broker liveness and fallback subscriptions.
from smartinno.monitoring.pulsar_monitor import PulsarMonitor
p_monitor = PulsarMonitor(pulsar_config)
print(p_monitor.check_health())
🎓 5. Step-by-Step Tutorial for New Developers
Follow this tutorial to build a fully resilient, event-driven Safari Tour booking pipeline from scratch.
Step 1: Initialize the Configs
Create configuration settings for Redis (using the High Concurrency UseCase mapped to Native Cluster), messaging (with the Pulsar configuration), and Temporal orchestration.
from smartinno.temporal.config import TemporalConfig
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.apache_pulsar.config import PulsarConfig
# 1. Config for Redis Shards
redis_cfg = RedisConfig(
host="127.0.0.1",
port=7001,
cluster_host_mapping={
"redis-node-1": ("127.0.0.1", 7001),
"redis-node-2": ("127.0.0.1", 7002),
"redis-node-3": ("127.0.0.1", 7003),
}
)
# 2. Config for Pulsar Fallback
pulsar_cfg = PulsarConfig(
host="164.68.120.77",
port=6650,
connection_timeout_ms=5000,
operation_timeout_ms=10000
)
# 3. Config for Temporal Orchestrator
temporal_cfg = TemporalConfig(
host="127.0.0.1",
port=7233,
namespace="tenant-safari-corp"
)
Step 2: Instantiate SDK Clients
Create the client objects. The message client acts as the dispatcher, and the temporal client coordinates stateful workflows.
from smartinno.temporal.client import TemporalClient
from smartinno.messaging.client import MessageClient
tc = TemporalClient(config=temporal_cfg)
msg = MessageClient(
backend="redis",
redis_config=redis_cfg,
pulsar_config=pulsar_cfg,
use_case=UseCase.HIGH_CONCURRENCY,
temporal_client=tc
)
Step 3: Define Workflows and Activity Steps
Decorate your operational tasks using the task decorators.
# An activity step executing transactional calculations
@tc.step(queue="safari-queue", retry_attempts=3, backoff_seconds=2)
def create_booking_record(user_id: str, tour_id: str):
# Perform database insertion or Redis write
return {"booking_id": "BK-9092", "status": "reserved"}
# The main process orchestrator
@msg.on_event("booking-requests", queue="safari-queue") # Dynamic workflow start decorator
@tc.process(queue="safari-queue")
async def safari_booking_workflow(payload: dict):
# Execute step durably
booking = await tc.execute_step(create_booking_record, payload["user_id"], payload["tour_id"])
print(f"Booking created: {booking}")
return booking
Step 4: Boot Up Workers & Start Routing
Instantiate and run the background execution threads.
import threading
from smartinno.temporal.worker import TemporalWorkerManager
# 1. Start Temporal Orchestrator worker manager
worker_manager = TemporalWorkerManager(tc)
worker_manager.start()
# 2. Start Message Client routing loop in a separate thread
msg_thread = threading.Thread(target=lambda: msg.start(block=True), daemon=True)
msg_thread.start()
print("All background worker loops are listening...")
Step 5: Trigger Event Execution
Publish an event to the stream topic. The messaging client intercepts it, auto-routes it to the correct Redis structure, starts the dynamic workflow process, and resolves the step activities durably.
# Publish message -> triggers safari_booking_workflow automatically!
msg.smart_send("booking-requests", {"user_id": "user-88", "tour_id": "tour-maasai-mara"})
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file smartinno-1.0.4.tar.gz.
File metadata
- Download URL: smartinno-1.0.4.tar.gz
- Upload date:
- Size: 108.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1cc5e81ef3e83a9a83e9a282f2dca6c2560959f11e1e6bdf94d08f06b7820471
|
|
| MD5 |
825db2b45a6148cd61c0080804d9cd8f
|
|
| BLAKE2b-256 |
fdf0ab81426365cd8f8bff2c53c511f729eb44be2dcd40adc9ad46af3fa22e6d
|
Provenance
The following attestation bundles were made for smartinno-1.0.4.tar.gz:
Publisher:
publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
smartinno-1.0.4.tar.gz -
Subject digest:
1cc5e81ef3e83a9a83e9a282f2dca6c2560959f11e1e6bdf94d08f06b7820471 - Sigstore transparency entry: 2032322882
- Sigstore integration time:
-
Permalink:
smart-Ino-Engineering/redis-pulsar-temporal-setups@7f7815a8ba45cc18ead61ce9338c5f0e265fb537 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/smart-Ino-Engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish_redis_sdk.yml@7f7815a8ba45cc18ead61ce9338c5f0e265fb537 -
Trigger Event:
push
-
Statement type:
File details
Details for the file smartinno-1.0.4-py3-none-any.whl.
File metadata
- Download URL: smartinno-1.0.4-py3-none-any.whl
- Upload date:
- Size: 109.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd33155cadf15a5d9c6ed1599cdb15cb7d9605cbf584ce3d2ad0acaf6ccdaa89
|
|
| MD5 |
e070726129eb4b4d5e0242f13686765b
|
|
| BLAKE2b-256 |
1c94ce6e54276c19d27c6aa90be8ad66a2bcd4efea7fd8df3643c119bcb5f0a4
|
Provenance
The following attestation bundles were made for smartinno-1.0.4-py3-none-any.whl:
Publisher:
publish_redis_sdk.yml on smart-Ino-Engineering/redis-pulsar-temporal-setups
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
smartinno-1.0.4-py3-none-any.whl -
Subject digest:
bd33155cadf15a5d9c6ed1599cdb15cb7d9605cbf584ce3d2ad0acaf6ccdaa89 - Sigstore transparency entry: 2032322997
- Sigstore integration time:
-
Permalink:
smart-Ino-Engineering/redis-pulsar-temporal-setups@7f7815a8ba45cc18ead61ce9338c5f0e265fb537 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/smart-Ino-Engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish_redis_sdk.yml@7f7815a8ba45cc18ead61ce9338c5f0e265fb537 -
Trigger Event:
push
-
Statement type: