Skip to main content

Reactive signals for Python with async support

Project description

reaktiv

Python Version PyPI Version PyPI - Downloads Documentation Status License Checked with pyright

Reactive Signals for Python with first-class async support, inspired by Angular's reactivity model.

reaktiv

Documentation

Full documentation is available at https://reaktiv.readthedocs.io/.

Installation

pip install reaktiv
# or with uv
uv pip install reaktiv

Quick Start

Basic Reactivity

import asyncio
from reaktiv import signal, effect

async def main():
    name = signal("Alice")

    async def greet():
        print(f"Hello, {name()}!")

    # Create and schedule effect
    # IMPORTANT: Assign the effect to a variable to ensure it is not garbage collected.
    greeter = effect(greet)

    name.set("Bob")  # Prints: "Hello, Bob!"
    await asyncio.sleep(0)  # Process effects

asyncio.run(main())

Using update()

Instead of calling set(new_value), update() lets you modify a signal based on its current value.

from reaktiv import signal

counter = signal(0)

# Standard way
counter.set(counter() + 1)

# Using update() for cleaner syntax
counter.update(lambda x: x + 1)

print(counter())  # 2

Computed Values

from reaktiv import signal, computed

# Synchronous context example
price = signal(100)
tax_rate = signal(0.2)
total = computed(lambda: price() * (1 + tax_rate()))

print(total())  # 120.0
tax_rate.set(0.25)
print(total())  # 125.0

Core Concepts

graph LR
    A[Signal] -->|Value| B[Computed Signal]
    A -->|Change| C[Effect]
    B -->|Value| C
    B -->|Change| C
    C -->|Update| D[External System]
    
    classDef signal fill:#4CAF50,color:white;
    classDef computed fill:#2196F3,color:white;
    classDef effect fill:#FF9800,color:white;
    
    class A,B signal;
    class B computed;
    class C effect;

How it Works

reaktiv provides three core primitives:

  1. Signals: Store a value and notify dependents when it changes
  2. Computed Signals: Derive values that automatically update when dependencies change
  3. Effects: Run side effects when signals or computed signals change

Why reaktiv?

If you've worked with modern frontend frameworks like React, Vue, or Angular, you're familiar with the power of reactive state management. The idea is simple but transformative: when data changes, everything that depends on it updates automatically.

While this pattern revolutionized frontend development, its benefits are equally powerful in backend systems where complex state management is often overlooked or implemented with brittle, ad-hoc solutions.

reaktiv brings these reactive programming advantages to your Python backend projects:

  • Automatic state dependency tracking: No more manually tracing which components need updating when data changes
  • Declarative state relationships: Define how data is transformed once, not every time it changes
  • Efficient fine-grained updates: Only recompute what actually needs to change
  • Async-first design: Seamlessly integrates with Python's asyncio for managing real-time data flows
  • Zero external dependencies: Lightweight with minimal overhead
  • Type-safe: Fully annotated for clarity and maintainability

Benefits for Backend Development

reaktiv addresses key challenges in backend state management:

  1. Eliminates manual dependency tracking: No more forgotten update logic when state changes
  2. Prevents state synchronization bugs: Updates happen automatically and consistently
  3. Improves performance: Only affected computations are recalculated
  4. Reduces cognitive load: Declare relationships once, not throughout your codebase
  5. Simplifies testing: Clean separation of state, derivation, and effects

Even in "stateless" architectures, ephemeral state still exists during request processing. reaktiv helps manage this complexity without the boilerplate of observers, callbacks, or event dispatchers.

Beyond Pub/Sub: State Management in Backend Systems

Many backend developers view reactive libraries as just another pub/sub system and question their value in "stateless" architectures. However, reaktiv addresses fundamentally different problems:

Traditional Pub/Sub vs. Reaktiv

Pub/Sub Systems Reaktiv
Message delivery between components Automatic state dependency tracking
Point-to-point or broadcast messaging Fine-grained computation graphs
Manual subscription management Automatic dependency detection
Focus on message transport Focus on state derivation
Stateless by design Intentional state management

State in "Stateless" Systems

Even in "stateless" microservices and serverless functions, state exists during request processing:

  • Configuration management
  • Request context propagation
  • In-memory caching
  • Rate limiting and circuit breaking
  • Feature flag evaluation
  • Connection pooling
  • Runtime metrics collection

reaktiv helps manage this ephemeral state with less code, fewer bugs, and better maintainability.

Basic Examples

Feature Flag System with Dynamic Rules

from reaktiv import signal, computed, effect
import time

# Core state
user_segments = signal({"user1": ["premium", "beta_tester"]})
feature_flags = signal({
    "new_dashboard": {"enabled": True, "segments": ["premium"]},
    "dark_mode": {"enabled": True, "segments": []},
    "beta_feature": {"enabled": True, "segments": ["beta_tester"]}
})

# Computed user permissions that update automatically
user_features = computed(lambda: {
    user_id: [
        flag_name 
        for flag_name, flag in feature_flags().items()
        if flag["enabled"] and (
            not flag["segments"] or 
            any(segment in user_segments().get(user_id, []) for segment in flag["segments"])
        )
    ]
    for user_id in user_segments()
})

# Add real-time monitoring
def monitor_features():
    features = user_features()
    for user_id, enabled_features in features.items():
        if "beta_feature" in enabled_features:
            print(f"User {user_id} has access to beta features")

feature_monitor = effect(monitor_features)

# When segments change, permissions automatically update
user_segments.update(lambda segments: {**segments, "user2": ["premium"]})

Real-World Backend Use Cases

1. Intelligent Cache Management

from reaktiv import signal, computed, effect

# Database-derived state (imagine this comes from your database)
user_data = signal({})
permissions = signal({})
content_items = signal({})

# Computed caches that automatically update when source data changes
user_permissions = computed(lambda: {
    user_id: [perm for perm_id, perm in permissions().items() 
              if perm['user_id'] == user_id]
    for user_id in user_data()
})

authorized_content = computed(lambda: {
    user_id: [item for item_id, item in content_items().items()
              if any(p['level'] >= item['required_level'] for p in user_permissions().get(user_id, []))]
    for user_id in user_data()
})

# Demonstration of automatic cache updates
def demo_reactive_cache():
    # Initial data setup
    user_data.set({"user1": {"name": "Alice"}, "user2": {"name": "Bob"}})
    permissions.set({
        "p1": {"user_id": "user1", "level": 5},  # Admin
        "p2": {"user_id": "user2", "level": 2}   # Editor
    })
    content_items.set({
        "c1": {"title": "Public Content", "required_level": 1},
        "c2": {"title": "Admin Content", "required_level": 5}
    })
    
    # Access permissions and content (cache is computed on first access)
    print("Bob can access:", [item["title"] for item in authorized_content().get("user2", [])])
    # Output: Bob can access: ['Public Content']
    
    # Update Bob's permission level - cache automatically updates!
    permissions.update(lambda p: {**p, "p2": {"user_id": "user2", "level": 5}})
    
    # Cache is automatically recalculated only for Bob
    print("Bob can now access:", [item["title"] for item in authorized_content().get("user2", [])])
    # Output: Bob can now access: ['Public Content', 'Admin Content']
    
    # No manual cache invalidation needed anywhere!

demo_reactive_cache()

2. Adaptive Rate Limiting & Circuit Breaking

from reaktiv import signal, computed, effect
import time
import asyncio

# Track API calls and failures
endpoint_calls = signal({})  # endpoint: [timestamp, timestamp...]
endpoint_failures = signal({})  # endpoint: [timestamp, timestamp...]

# Computed circuit breakers that automatically update
circuit_status = computed(lambda: {
    endpoint: "open" if len(failures) >= 5 and (time.time() - failures[-1]) < 30 else "closed"
    for endpoint, failures in endpoint_failures().items()
})

# Dynamic rate limiting based on traffic patterns
rate_limits = computed(lambda: {
    endpoint: max(10, min(1000, len(calls) // 10)) 
    for endpoint, calls in endpoint_calls().items() 
    if calls and time.time() - calls[0] < 60
})

# Add monitoring effect to see changes in real-time
def monitor_circuit_status():
    status = circuit_status()
    if status:
        print(f"Circuit Status: {status}")

circuit_monitor = effect(monitor_circuit_status)

# Simulate API calls and failures
async def simulate_traffic():
    print("\n=== Simulating API Traffic and Failures ===")
    
    # Record some successful calls
    endpoint_calls.update(lambda calls: {
        **calls, 
        "api/users": [time.time() - i for i in range(20)]
    })
    print(f"Rate limit for api/users: {rate_limits()['api/users']} requests/min")
    
    # Simulate failures for api/orders
    endpoint_failures.update(lambda failures: {
        **failures,
        "api/orders": [time.time() - i for i in range(3)]
    })
    print(f"Circuit status for api/orders: {circuit_status()['api/orders']}")
    
    # Simulate more failures to trigger circuit breaker
    print("Adding more failures to api/orders...")
    endpoint_failures.update(lambda failures: {
        **failures,
        "api/orders": failures["api/orders"] + [time.time() for _ in range(3)]
    })
    print(f"Circuit status for api/orders: {circuit_status()['api/orders']}")

# Run this example with: asyncio.run(simulate_traffic())

3. Multi-Layer Configuration Management

from reaktiv import signal, computed, effect
import asyncio

# Configuration at different levels
global_config = signal({"log_level": "INFO", "timeout": 30})
service_config = signal({"auth": {"timeout": 10}})
instance_config = signal({"log_level": "DEBUG"})

# Computed effective configuration with correct precedence
effective_config = computed(lambda: {
    **global_config(),
    **{k: v for k, v in service_config().items() if not isinstance(v, dict)},
    **instance_config()
})

# Nested configs are merged properly
auth_config = computed(lambda: {
    **global_config(),
    **(service_config().get("auth", {}))
})

# When any config source changes, all systems update automatically
def log_config_changes():
    print(f"Current effective config: {effective_config()}")
    print(f"Auth specific config: {auth_config()}")

logger_config = effect(log_config_changes)

async def demo_configuration():
    print("\n=== Configuration Management Demo ===")
    
    # Initial configuration state
    print(f"Initial effective config: {effective_config()}")
    print(f"Initial auth config: {auth_config()}")
    
    # Change global config
    print("\nUpdating global timeout to 60 seconds...")
    global_config.update(lambda cfg: {**cfg, "timeout": 60})
    await asyncio.sleep(0.1)  # Allow effects to process
    
    # Override at service level
    print("\nAdding database configuration at service level...")
    service_config.update(lambda cfg: {
        **cfg, 
        "database": {"host": "localhost", "port": 5432}
    })
    await asyncio.sleep(0.1)  # Allow effects to process
    
    # Change instance config
    print("\nChanging instance log_level to TRACE...")
    instance_config.update(lambda cfg: {**cfg, "log_level": "TRACE"})
    await asyncio.sleep(0.1)  # Allow effects to process

# Run this example with: asyncio.run(demo_configuration())

Advanced Examples

1. Resource Pool Management

from reaktiv import signal, computed, effect
import time

# Connection pool state
db_connections = signal({})  # id: {created_at, last_used, state}
connection_requests = signal(0)

# Auto-scaling connection pool
idle_connections = computed(lambda: [
    conn_id for conn_id, conn in db_connections().items() 
    if conn['state'] == 'idle' and time.time() - conn['last_used'] < 60
])

connections_needed = computed(lambda: max(0, connection_requests() - len(idle_connections())))

# Effect that manages pool size based on demand
def manage_pool():
    current_needed = connections_needed()
    if current_needed > 0:
        create_new_connections(current_needed)
    elif len(idle_connections()) > 10 and connection_requests() < 5:
        close_excess_connections()

pool_manager = effect(manage_pool)

2. Multi-Stage Data Processing Pipeline

from reaktiv import signal, computed, effect, batch
import json

# Raw event stream
raw_events = signal([])

# Normalized data
normalized_events = computed(lambda: [
    {**event, "timestamp": parse_timestamp(event.get("ts", 0))}
    for event in raw_events()
])

# Filtered data
error_events = computed(lambda: [
    event for event in normalized_events()
    if event.get("level") == "ERROR"
])

# Aggregated metrics
error_count_by_service = computed(lambda: {
    service: len([e for e in error_events() if e.get("service") == service])
    for service in set(e.get("service", "unknown") for e in error_events())
})

# Effect to trigger alerts
def check_alerts():
    counts = error_count_by_service()
    for service, count in counts.items():
        if count > 5:
            print(f"ALERT: {service} has {count} errors")

alert_system = effect(check_alerts)

# Adding data triggers the entire pipeline automatically
def ingest_batch(new_events):
    raw_events.update(lambda events: events + new_events)

# Multiple updates in a batch to prevent intermediate recalculations
with batch():
    raw_events.update(lambda events: events + [
        {"service": "auth", "level": "ERROR", "ts": 1619712000},
        {"service": "auth", "level": "ERROR", "ts": 1619712060}
    ])

3. Real-Time System Monitoring

from reaktiv import signal, computed, effect
import asyncio

# System metrics
cpu_usage = signal([])
memory_usage = signal([])
disk_io = signal([])

# Derived analytics
avg_cpu = computed(lambda: sum(cpu_usage()[-5:]) / 5 if len(cpu_usage()) >= 5 else 0)
avg_memory = computed(lambda: sum(memory_usage()[-5:]) / 5 if len(memory_usage()) >= 5 else 0)

# System status derived from multiple metrics
system_status = computed(lambda: 
    "critical" if avg_cpu() > 90 or avg_memory() > 90 else
    "warning" if avg_cpu() > 70 or avg_memory() > 70 else
    "normal"
)

# Single monitoring effect that updates based on derived status
def update_monitoring_dashboard():
    status = system_status()
    print(f"System status: {status}")
    print(f"CPU: {avg_cpu():.1f}%, Memory: {avg_memory():.1f}%")
    
    if status == "critical":
        print("⚠️ ALERT: System resources critical!")

dashboard = effect(update_monitoring_dashboard)

# When new metrics arrive, all derived values and the dashboard update automatically
async def simulate_metrics():
    for i in range(10):
        cpu_usage.update(lambda readings: readings + [50 + i * 5])
        memory_usage.update(lambda readings: readings + [60 + i * 4])
        await asyncio.sleep(1)

# No need to manually update the dashboard - it reacts to changes automatically

4. API Gateway Rate Limiting Example

from reaktiv import signal, computed, effect
import time

# Per-client request tracking
client_requests = signal({})  # client_id: [(timestamp, endpoint), ...]

# Computed rate limits that automatically update
requests_per_minute = computed(lambda: {
    client_id: len([req for timestamp, _ in requests 
                   if time.time() - timestamp < 60])
    for client_id, requests in client_requests().items()
})

endpoint_requests = computed(lambda: {
    endpoint: sum(1 for client_reqs in client_requests().values() 
                 for _, ep in client_reqs if ep == endpoint)
    for endpoint in set(ep for reqs in client_requests().values() 
                       for _, ep in reqs)
})

# Rate limit decision making
should_rate_limit = computed(lambda: {
    client_id: requests_per_minute().get(client_id, 0) > 100
    for client_id in client_requests()
})

# Real-time monitoring
def monitor_rate_limits():
    limits = should_rate_limit()
    throttled = [client for client, limited in limits.items() if limited]
    if throttled:
        print(f"Rate limiting clients: {throttled}")

rate_limit_monitor = effect(monitor_rate_limits)

# When new requests come in, all rate limits automatically recalculate
def track_request(client_id, endpoint):
    client_requests.update(lambda reqs: {
        **reqs,
        client_id: reqs.get(client_id, []) + [(time.time(), endpoint)]
    })

Example Application: Health Monitoring System

This example shows how reaktiv simplifies building a real-time health monitoring system that ingests metrics, computes derived health indicators, and triggers alerts.

from reaktiv import signal, computed, effect
import asyncio
import time

# Core state signals
server_metrics = signal({})  # server_id -> {cpu, memory, disk, last_seen}
alert_thresholds = signal({"cpu": 80, "memory": 90, "disk": 95})
maintenance_mode = signal({})  # server_id -> bool

# Derived state
servers_online = computed(lambda: {
    server_id: time.time() - metrics["last_seen"] < 60
    for server_id, metrics in server_metrics().items()
})

health_status = computed(lambda: {
    server_id: (
        "maintenance" if maintenance_mode().get(server_id, False) else
        "offline" if not servers_online().get(server_id, False) else
        "alert" if (
            metrics["cpu"] > alert_thresholds()["cpu"] or
            metrics["memory"] > alert_thresholds()["memory"] or
            metrics["disk"] > alert_thresholds()["disk"]
        ) else 
        "healthy"
    )
    for server_id, metrics in server_metrics().items()
})

servers_by_status = computed(lambda: {
    status: [server_id for server_id, s in health_status().items() if s == status]
    for status in ["healthy", "alert", "offline", "maintenance"]
})

# Effects for monitoring and alerting
def update_dashboard():
    statuses = servers_by_status()
    print(f"Dashboard: {len(statuses['healthy'])} healthy, {len(statuses['alert'])} in alert")
    
    if statuses["alert"]:
        print(f"⚠️ ALERT: Servers in alert state: {statuses['alert']}")

dashboard_effect = effect(update_dashboard)

async def main():
    # Update metrics - all derived values and effects update automatically
    server_metrics.set({
        "server1": {"cpu": 70, "memory": 65, "disk": 80, "last_seen": time.time()},
        "server2": {"cpu": 85, "memory": 50, "disk": 70, "last_seen": time.time()}
    })
    
    await asyncio.sleep(1)
    
    # Put a server in maintenance - dashboard updates automatically
    maintenance_mode.set({"server2": True})
    
    await asyncio.sleep(1)
    
    # Adjust thresholds - alerts recalculate automatically
    alert_thresholds.set({"cpu": 75, "memory": 90, "disk": 95})

asyncio.run(main())

More Examples

You can find more example scripts in the examples folder to help you get started with using this project.


Inspired by Angular Signals • Built for Python's async-first world • Made in Hamburg

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

reaktiv-0.14.2.tar.gz (36.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

reaktiv-0.14.2-py3-none-any.whl (19.2 kB view details)

Uploaded Python 3

File details

Details for the file reaktiv-0.14.2.tar.gz.

File metadata

  • Download URL: reaktiv-0.14.2.tar.gz
  • Upload date:
  • Size: 36.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.16

File hashes

Hashes for reaktiv-0.14.2.tar.gz
Algorithm Hash digest
SHA256 62725a3adf0931c6a517f0854f4264715920b3cb2d7a489e2d454e86ccfbcb8d
MD5 2af7892e85c62b42221f1e6fd8c042e4
BLAKE2b-256 920907107f0396289c908a90a28dade42dfb57e5ff9235f6913297c0985d47bd

See more details on using hashes here.

File details

Details for the file reaktiv-0.14.2-py3-none-any.whl.

File metadata

  • Download URL: reaktiv-0.14.2-py3-none-any.whl
  • Upload date:
  • Size: 19.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.16

File hashes

Hashes for reaktiv-0.14.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a1350fe43f8e66d4919f89f89ead11f641caf0016b89bbda7ee23d3a3f516850
MD5 b6f8c6b2440351d6cf5acfc4bfa36c3b
BLAKE2b-256 3c8d3a2e2d15617f9f4e6120def5a472d5581825d29b20c4087fb14587187b97

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page