Skip to main content

Reactive signals for Python with async support

Project description

reaktiv

Python Version PyPI Version PyPI - Downloads Documentation Status License Checked with pyright

Reactive Computation Graphs for Python with first-class async support, inspired by Angular's reactivity model.

reaktiv

Installation

pip install reaktiv
# or with uv
uv pip install reaktiv

reaktiv creates efficient reactive computation graphs that only recalculate values when their dependencies change. The system automatically tracks dependencies between signals, computed values, and effects, eliminating the need for manual subscription management.

Key features:

  • 🔄 Efficient computation graph: Only recomputes values affected by a change
  • 🔍 Automatic dependency tracking: Dependencies are discovered at runtime
  • 🧠 Intelligent memoization: Computed values are cached until dependencies change
  • 🔌 Side effects via subscriptions: Changes propagate to effects for integration with external systems
  • Async-first design: Built for Python's asyncio ecosystem

Documentation

Full documentation is available at https://reaktiv.readthedocs.io/.

Quick Start

Basic Reactivity

from reaktiv import Signal, Computed, Effect

# Create some signals
name = Signal("Alice")
age = Signal(30)
city = Signal("New York")

# Computed value with automatically tracked dependencies
# The system detects that this depends on name and age
greeting = Computed(lambda: f"{name()} is {age()} years old")

# Another computed value with different dependencies
# The system detects this depends only on name and city
location = Computed(lambda: f"{name()} lives in {city()}")

# Create effects to demonstrate updates
print("Initial Setup:")
greeting_effect = Effect(lambda: print(f"Greeting: {greeting()}"))
location_effect = Effect(lambda: print(f"Location: {location()}"))

# Changing age only triggers recomputation of greeting
print("\nChanging age to 31:")
age.set(31)
# Only greeting recomputed (location unaffected)

# Changing city only triggers recomputation of location
print("\nChanging city to Boston:")
city.set("Boston")
# Only location recomputed (greeting unaffected)

# Changing name triggers recomputation of both derived values
print("\nChanging name to Bob:")
name.set("Bob")
# Both greeting and location recomputed

Using update()

Instead of calling set(new_value), update() lets you modify a signal based on its current value.

from reaktiv import Signal

counter = Signal(0)

# Standard way
counter.set(counter() + 1)

# Using update() for cleaner syntax
counter.update(lambda x: x + 1)

print(counter())  # 2

Computed Values

from reaktiv import Signal, Computed

# Synchronous context example
price = Signal(100)
tax_rate = Signal(0.2)
total = Computed(lambda: price() * (1 + tax_rate()))

print(total())  # 120.0
tax_rate.set(0.25)
print(total())  # 125.0

Core Concepts

graph LR
    A[Signal] -->|Value| B[Computed Signal]
    A -->|Change| C[Effect]
    B -->|Value| C
    B -->|Change| C
    C -->|Update| D[External System]
    
    classDef signal fill:#4CAF50,color:white;
    classDef computed fill:#2196F3,color:white;
    classDef effect fill:#FF9800,color:white;
    
    class A,B signal;
    class B computed;
    class C effect;

How it Works

reaktiv provides three core primitives:

  1. Signals: Store a value and notify dependents when it changes
  2. Computed Signals: Derive values that automatically update when dependencies change
  3. Effects: Run side effects when signals or computed signals change

Why reaktiv?

If you've worked with modern frontend frameworks like React, Vue, or Angular, you're familiar with the power of reactive state management. The idea is simple but transformative: when data changes, everything that depends on it updates automatically.

While this pattern revolutionized frontend development, its benefits are equally powerful in backend systems where complex state management is often overlooked or implemented with brittle, ad-hoc solutions.

reaktiv brings these reactive programming advantages to your Python backend projects:

  • Automatic state dependency tracking: No more manually tracing which components need updating when data changes
  • Declarative state relationships: Define how data is transformed once, not every time it changes
  • Efficient fine-grained updates: Only recompute what actually needs to change
  • Async-first design: Seamlessly integrates with Python's asyncio for managing real-time data flows
  • Zero external dependencies: Lightweight with minimal overhead
  • Type-safe: Fully annotated for clarity and maintainability

Benefits for Backend Development

reaktiv addresses key challenges in backend state management:

  1. Eliminates manual dependency tracking: No more forgotten update logic when state changes
  2. Prevents state synchronization bugs: Updates happen automatically and consistently
  3. Improves performance: Only affected computations are recalculated
  4. Reduces cognitive load: Declare relationships once, not throughout your codebase
  5. Simplifies testing: Clean separation of state, derivation, and effects

Even in "stateless" architectures, ephemeral state still exists during request processing. reaktiv helps manage this complexity without the boilerplate of observers, callbacks, or event dispatchers.

Beyond Pub/Sub: State Management in Backend Systems

Many backend developers view reactive libraries as just another pub/sub system and question their value in "stateless" architectures. However, reaktiv addresses fundamentally different problems:

Traditional Pub/Sub vs. Reaktiv

Pub/Sub Systems Reaktiv
Message delivery between components Automatic state dependency tracking
Point-to-point or broadcast messaging Fine-grained computation graphs
Manual subscription management Automatic dependency detection
Focus on message transport Focus on state derivation
Stateless by design Intentional state management

State in "Stateless" Systems

Even in "stateless" microservices and serverless functions, state exists during request processing:

  • Configuration management
  • Request context propagation
  • In-memory caching
  • Rate limiting and circuit breaking
  • Feature flag evaluation
  • Connection pooling
  • Runtime metrics collection

reaktiv helps manage this ephemeral state with less code, fewer bugs, and better maintainability.

Basic Examples

Here are some simple examples to help you understand reaktiv's benefits:

Example 1: A Price Calculator

from reaktiv import Signal, Computed, Effect

# Create base values (signals)
price = Signal(10.0)
quantity = Signal(2)
tax_rate = Signal(0.1)  # 10% tax

# Create derived values (computed)
subtotal = Computed(lambda: price() * quantity())
tax = Computed(lambda: subtotal() * tax_rate())
total = Computed(lambda: subtotal() + tax())

# Create a side effect for logging
logger = Effect(lambda: print(f"Order Total: ${total():.2f}"))

# Initial output: "Order Total: $22.00"

# Change the quantity
quantity.set(3)
# Automatically logs: "Order Total: $33.00"

# Change the price
price.set(12.0) 
# Automatically logs: "Order Total: $39.60"

# Change tax rate
tax_rate.set(0.15)
# Automatically logs: "Order Total: $41.40"

Example 2: Simple Status Monitoring

from reaktiv import Signal, Computed, Effect

# Base signals: system metrics
memory_usage = Signal(75)  # percent
cpu_usage = Signal(50)     # percent

# Computed value: system status based on thresholds
system_status = Computed(lambda: 
    "critical" if memory_usage() > 90 or cpu_usage() > 90 else
    "warning" if memory_usage() > 70 or cpu_usage() > 70 else
    "normal"
)

# Effect: alert when system status changes
def alert_on_status():
    status = system_status()
    print(f"System status: {status}")
    if status != "normal":
        print(f"  Memory: {memory_usage()}%, CPU: {cpu_usage()}%")

status_monitor = Effect(alert_on_status)
# Initial output: "System status: warning"
#                "  Memory: 75%, CPU: 50%"

# Simulate memory dropping
memory_usage.set(60)
# Output: "System status: normal"

# Simulate CPU spiking
cpu_usage.set(95)
# Output: "System status: critical"
#         "  Memory: 60%, CPU: 95%" 

Example 2b: Async Status Monitoring

from reaktiv import Signal, Computed, Effect
import asyncio

async def demo_async_monitoring():
    # Base signals: system metrics
    memory_usage = Signal(75)  # percent
    cpu_usage = Signal(50)     # percent

    # Computed value: system status
    system_status = Computed(lambda: 
        "critical" if memory_usage() > 90 or cpu_usage() > 90 else
        "warning" if memory_usage() > 70 or cpu_usage() > 70 else
        "normal"
    )

    # Async effect: alert and take action when status changes
    async def async_alert_handler():
        status = system_status()
        print(f"System status: {status}")
        
        if status == "critical":
            print(f"  Memory: {memory_usage()}%, CPU: {cpu_usage()}%")
            # Simulate sending notification
            await asyncio.sleep(0.1)  # non-blocking wait
            print("  ✉️ Alert notification sent")
            
            # Simulate recovery action
            if cpu_usage() > 90:
                await asyncio.sleep(0.2)  # simulating some async operation
                print("  🔄 Triggered automatic scaling")

    # Register the async effect
    status_monitor = Effect(async_alert_handler)
    print("Starting monitoring...")
    
    # Give effect time to run initially
    await asyncio.sleep(0.2)
    
    # Simulate CPU spike
    print("\nSimulating CPU spike:")
    cpu_usage.set(95)
    await asyncio.sleep(0.5)  # Allow effect to complete
    
    # Simulate recovery
    print("\nSimulating recovery:")
    cpu_usage.set(50)
    memory_usage.set(60)
    await asyncio.sleep(0.2)

asyncio.run(demo_async_monitoring())

Example 3: Configuration Management

from reaktiv import Signal, Computed, Effect

# Different configuration sources
default_config = Signal({"timeout": 30, "retries": 3, "log_level": "INFO"})
user_config = Signal({})

# Effective config combines both sources, with user settings taking precedence
effective_config = Computed(lambda: {**default_config(), **user_config()})

# Log when config changes
config_logger = Effect(lambda: print(f"Active config: {effective_config()}"))
# Initial output: "Active config: {'timeout': 30, 'retries': 3, 'log_level': 'INFO'}"

# When user updates their preferences
user_config.set({"timeout": 60, "log_level": "DEBUG"})
# Automatically logs: "Active config: {'timeout': 60, 'retries': 3, 'log_level': 'DEBUG'}"

# When defaults are updated (e.g., from a config file)
default_config.update(lambda cfg: {**cfg, "retries": 5, "max_connections": 100})
# Automatically logs: "Active config: {'timeout': 60, 'retries': 5, 'log_level': 'DEBUG', 'max_connections': 100}"

Example 4: Simple Caching

from reaktiv import Signal, Computed

# Database simulation
database = {"user1": "Alice", "user2": "Bob"}

# Cache invalidation signal
last_update = Signal(0)  # timestamp or version number

# User data with automatic cache refresh
def fetch_user(user_id):
    # In a real app, this would actually query a database
    return database.get(user_id)

active_user_id = Signal("user1")

# This computed value automatically refreshes when active_user_id changes
# or when the cache is invalidated via last_update
user_data = Computed(lambda: {
    "id": active_user_id(),
    "name": fetch_user(active_user_id()),
    "cache_version": last_update()
})

print(user_data())  # {'id': 'user1', 'name': 'Alice', 'cache_version': 0}

# Switch to another user - cache updates automatically
active_user_id.set("user2")
print(user_data())  # {'id': 'user2', 'name': 'Bob', 'cache_version': 0}

# Backend data changes - we update the database and invalidate cache
database["user2"] = "Robert"
last_update.set(1)  # increment version number
print(user_data())  # {'id': 'user2', 'name': 'Robert', 'cache_version': 1}

Example 5: Processing a Stream of Events

from reaktiv import Signal, Computed, Effect
import time

# Event stream (simulating incoming data)
events = Signal([])

# Compute statistics from the events
event_count = Computed(lambda: len(events()))
recent_events = Computed(lambda: [e for e in events() if time.time() - e["timestamp"] < 60])
error_count = Computed(lambda: len([e for e in events() if e["level"] == "ERROR"]))

# Monitor for problems
def check_errors():
    if error_count() >= 3:
        print(f"ALERT: {error_count()} errors detected in the event stream!")
    
error_monitor = Effect(check_errors)

# Process new events
def add_event(level, message):
    new_event = {"timestamp": time.time(), "level": level, "message": message}
    events.update(lambda current: current + [new_event])

# Simulate incoming events
add_event("INFO", "Application started")
add_event("INFO", "Processing request")
add_event("ERROR", "Database connection failed")
add_event("ERROR", "Retry failed")
add_event("ERROR", "Giving up after 3 attempts")
# Automatically triggers: "ALERT: 3 errors detected in the event stream!"

Example 5b: Async Event Processing Pipeline

from reaktiv import Signal, Computed, Effect
import asyncio
import time

# Demo async event processing
async def demo_async_events():
    # Event stream
    events = Signal([])

    # Derived statistics
    error_count = Computed(lambda: len([e for e in events() if e["level"] == "ERROR"]))
    warning_count = Computed(lambda: len([e for e in events() if e["level"] == "WARNING"]))

    # Async effect for error handling
    async def handle_errors():
        errors = error_count()
        if errors > 0:
            print(f"Found {errors} errors")
            
            # Simulate error processing that takes time
            await asyncio.sleep(0.2)
            
            if errors >= 3:
                print("🚨 Critical error threshold reached")
                await asyncio.sleep(0.1)  # Simulate sending alert
                print("✉️ Error notification dispatched to on-call team")

    # Register async effect
    error_handler = Effect(handle_errors)

    # Function to add events
    async def add_event_async(level, message):
        new_event = {"timestamp": time.time(), "level": level, "message": message}
        events.update(lambda current: current + [new_event])
        # Simulate some processing time
        await asyncio.sleep(0.1)
    print("Starting event monitoring...")
    
    # Add some events
    await add_event_async("INFO", "Application started")
    await add_event_async("INFO", "User logged in")
    await add_event_async("WARNING", "Slow database query detected")
    
    # Add error events that will trigger our effect
    print("\nSimulating error condition:")
    await add_event_async("ERROR", "Database connection timeout")
    await add_event_async("ERROR", "Retry failed")
    await add_event_async("ERROR", "Service unavailable")
    
    # Give effect time to complete
    await asyncio.sleep(0.5)
    
    print(f"\nFinal status: {error_count()} errors, {warning_count()} warnings")

asyncio.run(demo_async_events())

More Examples

You can find more example scripts in the examples folder to help you get started with using this project.


Inspired by Angular Signals • Built for Python's async-first world • Made in Hamburg

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

reaktiv-0.14.7.tar.gz (37.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

reaktiv-0.14.7-py3-none-any.whl (20.0 kB view details)

Uploaded Python 3

File details

Details for the file reaktiv-0.14.7.tar.gz.

File metadata

  • Download URL: reaktiv-0.14.7.tar.gz
  • Upload date:
  • Size: 37.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.17

File hashes

Hashes for reaktiv-0.14.7.tar.gz
Algorithm Hash digest
SHA256 ac0b506c290a2b67f2f4a59b13c374fbe1b6a6b6af92296c8bec1d780cede062
MD5 7a9ae15ba07193d37088f9789aedd691
BLAKE2b-256 e7a9f365d78a44f97544f9a66ede54f4cc3eb2f123e1ddf767860725915074a7

See more details on using hashes here.

File details

Details for the file reaktiv-0.14.7-py3-none-any.whl.

File metadata

  • Download URL: reaktiv-0.14.7-py3-none-any.whl
  • Upload date:
  • Size: 20.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.17

File hashes

Hashes for reaktiv-0.14.7-py3-none-any.whl
Algorithm Hash digest
SHA256 cc4861dc40f8737c4afa2e62bdf8cf88a83c6c3b51fe1f090af55a4a5f086c5f
MD5 b0bc9f00b6fe27c586d63922922e8732
BLAKE2b-256 c6d869dfb5fa83438e21713fb484dec079a372bdbc93e02c6b282cede16c5397

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page