Skip to main content

KRules Framework - Async event-driven framework

Project description

KRules Framework

Modern async-first event-driven application framework for Python

KRules is a Python framework for building reactive, event-driven applications with a focus on dynamic state management and declarative event handling.

Features

  • Reactive Subjects - Dynamic entities with schema-less properties that automatically emit events on changes
  • Declarative Handlers - Clean decorator-based API (@on, @when, @middleware)
  • Async Native - Built on asyncio for high-performance concurrent event processing
  • Type Safe - Full type hints for excellent IDE support and type checking
  • Dependency Injection - Container-based architecture for testability and flexibility
  • Storage Agnostic - Pluggable backends (Redis, PostgreSQL, in-memory, custom)
  • Production Ready - Middleware support, error isolation, monitoring hooks

Installation

pip install krules-framework

With optional features:

# Redis storage backend
pip install "krules-framework[redis]"

# PostgreSQL storage backend
pip install "krules-framework[postgres]"

# Google Cloud Pub/Sub
pip install "krules-framework[pubsub]"

# FastAPI integration
pip install "krules-framework[fastapi]"

Quick Example

This example demonstrates reactive state composition - building complex states from simple properties, where each layer reacts to changes in lower layers.

from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

# Layer 1: Derive health status from metrics (ONLY for device: subjects)
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.subject.name.startswith("device:"))
@when(lambda ctx: ctx.property_name in ["cpu_usage", "memory_usage", "error_rate"])
async def compute_device_health(ctx):
    """Aggregate device metrics into health status"""
    device = ctx.subject
    # Read from subject's internal cache (even if not yet persisted)
    cpu = device.get("cpu_usage", 0)
    memory = device.get("memory_usage", 0)
    errors = device.get("error_rate", 0)

    if cpu > 90 or memory > 90 or errors > 10:
        device.set("health", "critical")
    elif cpu > 70 or memory > 70 or errors > 5:
        device.set("health", "warning")
    else:
        device.set("health", "healthy")

# Layer 2: React to health transitions (ONLY for device: subjects)
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.subject.name.startswith("device:"))
@when(lambda ctx: ctx.property_name == "health")
async def handle_device_health_change(ctx):
    """Take action based on health state transition"""
    print(f"{ctx.subject.name}: {ctx.old_value}{ctx.new_value}")

    if ctx.new_value == "critical":
        await ctx.emit("device.alert.critical", ctx.subject)
    elif ctx.new_value == "healthy" and ctx.old_value == "critical":
        await ctx.emit("device.alert.recovered", ctx.subject)

# Usage
device = container.subject("device:prod-01")

# Batch mode: multiple sets + single store
device.set("cpu_usage", 75)      # → triggers handler, health="warning"
device.set("memory_usage", 60)
device.set("error_rate", 2)
device.store()  # Single persistence, flushes cache to storage

# Single update mode: bypass cache, write directly
await device.set("cpu_usage", 95, use_cache=False)  # → health="critical" → alert!
await device.set("cpu_usage", 50, use_cache=False)  # → health="healthy" → recovered!
await device.set("cpu_usage", 45, use_cache=False)  # → NO EVENT (health unchanged)

Key Concepts:

  1. Reactive Composition - health state is automatically derived from metrics
  2. Subject Type Filtering - Handlers target device:* subjects using naming conventions
  3. Events on Change Only - Property change events fire only when values actually change
  4. State Transitions - Access old_value and new_value to handle transitions
  5. Efficient Persistence - Batch updates with single store(), or use_cache=False for single updates
  6. Bounded Entities - Devices are predictable, limited entities (not infinite like orders)

Core Concepts

Subjects - Reactive State Entities

Subjects are dynamic entities with persistent, reactive properties. Setting a property automatically emits a subject-property-changed event.

from krules_core.container import KRulesContainer

container = KRulesContainer()

# Create subject
device = container.subject("device-456")

# Set properties (schema-less, fully dynamic)
device.set("temperature", 75.5)
device.set("status", "online")
device.set("metadata", {"location": "room-1", "floor": 2})

# Lambda values for atomic operations
device.set("count", 0)
device.set("count", lambda c: c + 1)  # Atomic increment

# Pass extra context to handlers (audit trail, business context)
await device.set("status", "maintenance", extra={
    "reason": "scheduled_maintenance",
    "operator_id": "admin-123"
})

# Get with defaults
temp = device.get("temperature")
status = device.get("status", default="offline")

# Extended properties (metadata, no events)
device.set_ext("tags", ["production", "critical"])

# Persist to storage
device.store()

Event Handlers - Declarative Processing

Define handlers using decorators. Supports glob patterns and conditional filters.

from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

# Single event
@on("order.created")
async def process_order(ctx):
    ctx.subject.set("status", "processing")
    await ctx.emit("order.processing")

# Multiple events
@on("user.created", "user.updated", "user.deleted")
async def log_user_change(ctx):
    print(f"User event: {ctx.event_type}")

# Glob patterns
@on("device.*")
async def handle_device(ctx):
    print(f"Device event: {ctx.event_type}")

# Property change with filters
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.property_name == "status")
@when(lambda ctx: ctx.new_value == "error")
async def on_error_status(ctx):
    await ctx.emit("alert.device_error", {
        "device_id": ctx.subject.name
    })

# Access extra context from set() operations
@on(SUBJECT_PROPERTY_CHANGED)
async def audit_property_change(ctx):
    if ctx.extra:  # Extra context passed from set()/delete()
        operator = ctx.extra.get("operator_id", "system")
        reason = ctx.extra.get("reason", "unspecified")
        print(f"{operator} changed {ctx.property_name}: {reason}")

Filters - Conditional Execution

Stack multiple @when decorators for conditional execution (all must pass).

# Multiple filters (AND logic)
@on("payment.process")
@when(lambda ctx: ctx.payload.get("amount") > 0)
@when(lambda ctx: ctx.subject.get("verified") == True)
async def process_payment(ctx):
    # Only for verified users with amount > 0
    pass

# Reusable filters
def is_premium(ctx):
    return ctx.subject.get("tier") == "premium"

def has_credits(ctx):
    return ctx.subject.get("credits", 0) > 0

@on("feature.use")
@when(is_premium)
@when(has_credits)
async def use_premium_feature(ctx):
    ctx.subject.set("credits", lambda c: c - 1)

Middleware - Cross-Cutting Concerns

Middleware runs for all events, enabling logging, timing, error handling, etc.

from krules_core.container import KRulesContainer
import time

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

@middleware
async def timing_middleware(ctx, next):
    """Measure handler execution time"""
    start = time.time()
    await next()
    duration = time.time() - start
    print(f"{ctx.event_type} took {duration:.3f}s")

@middleware
async def error_handling(ctx, next):
    """Global error handler"""
    try:
        await next()
    except Exception as e:
        print(f"Handler error: {e}")
        await ctx.emit("error.handler_failed", {"error": str(e)})

Storage Backends

KRules supports pluggable storage backends for subject persistence.

Redis Storage

from dependency_injector import providers
from krules_core.container import KRulesContainer
from redis.asyncio import Redis
from redis_subjects_storage.storage_impl import create_redis_storage

# Create container
container = KRulesContainer()

# Create Redis client
redis_client = Redis.from_url("redis://localhost:6379")

# Override storage with Redis
redis_factory = create_redis_storage(
    redis_client=redis_client,
    redis_prefix="myapp:"
)
container.subject_storage.override(providers.Object(redis_factory))

# Now all subjects use Redis
user = container.subject("user-123")
await user.set("name", "John")  # Persisted in Redis
await user.store()

PostgreSQL Storage

from dependency_injector import providers
from krules_core.container import KRulesContainer
import asyncpg
from postgres_subjects_storage.storage_impl import create_postgres_storage

# Create container
container = KRulesContainer()

# Create PostgreSQL connection pool
pg_pool = await asyncpg.create_pool(
    database="krules",
    user="postgres",
    password="postgres",
    host="localhost",
    port=5432
)

# Override storage with PostgreSQL
pg_factory = create_postgres_storage(pool=pg_pool)
container.subject_storage.override(providers.Object(pg_factory))

# Now all subjects use PostgreSQL (tables auto-created)
user = container.subject("user-123")
await user.set("name", "John")  # Persisted in PostgreSQL with JSONB
await user.store()

Custom Storage

Implement the storage interface to create custom backends:

class CustomStorage:
    def __init__(self, subject_name, event_info=None, event_data=None):
        self._subject = subject_name

    def load(self):
        """Return (properties_dict, ext_properties_dict)"""
        return {}, {}

    def store(self, inserts=[], updates=[], deletes=[]):
        """Persist property changes"""
        pass

    def set(self, prop):
        """Set single property, return (new_value, old_value)"""
        pass

    def get(self, prop):
        """Get property value"""
        pass

    def delete(self, prop):
        """Delete property"""
        pass

    def flush(self):
        """Delete entire subject"""
        pass

    def get_ext_props(self):
        """Return extended properties dict"""
        return {}

Testing

KRules provides utilities for easy testing:

import pytest
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

@pytest.fixture
def container():
    """Create fresh container for each test"""
    return KRulesContainer()

@pytest.mark.asyncio
async def test_user_login(container):
    """Test user login handler"""
    on, when, middleware, emit = container.handlers()
    results = []

    @on("user.login")
    async def handler(ctx):
        results.append(ctx.event_type)
        ctx.subject.set("logged_in", True)

    user = container.subject("test-user")
    await emit("user.login", user)

    assert len(results) == 1
    assert user.get("logged_in") == True

Documentation

Integrations

KRules supports event-driven communication with external systems through event receivers (inbound) and event emitters (outbound).

Event Receivers (Inbound)

FastAPI Integration - Receive HTTP CloudEvents

from krules_fastapi_env import KRulesApp
from krules_core.container import KRulesContainer

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

# Define handlers (same as local events)
@on("order.created")
async def handle_order(ctx):
    print(f"Received order: {ctx.subject.name}")

# Create FastAPI app that receives CloudEvents
app = KRulesApp(krules_container=container)
# POST /krules endpoint now receives CloudEvents and triggers handlers

Pub/Sub Subscriber - Receive events from Google Pub/Sub

from krules_cloudevents_pubsub import PubSubSubscriber

# Subscribe to Pub/Sub topic
subscriber = PubSubSubscriber(
    project_id="my-project",
    subscription_name="my-subscription",
    container=container
)

# Same handlers work for Pub/Sub events
await subscriber.run()

Event Emitters (Outbound)

HTTP CloudEvents - Send events to external HTTP endpoints

from krules_cloudevents import CloudEventsDispatcher, create_dispatcher_middleware

# Create dispatcher
dispatcher = CloudEventsDispatcher(
    dispatch_url="https://api.example.com/events",
    source="my-service",
    krules_container=container
)

# Register as middleware
dispatcher_mw = create_dispatcher_middleware(dispatcher)
container.event_bus().add_middleware(dispatcher_mw)

# Now emit events to external URL
await emit("user.created", user, dispatch_url="https://api.example.com/events")

Pub/Sub Publisher - Send events to Google Pub/Sub

from krules_cloudevents_pubsub import CloudEventsDispatcher, create_dispatcher_middleware

# Create dispatcher
dispatcher = CloudEventsDispatcher(
    project_id="my-project",
    default_topic="krules-events",
    source="my-service",
    krules_container=container
)

# Register as middleware
dispatcher_mw = create_dispatcher_middleware(dispatcher)
container.event_bus().add_middleware(dispatcher_mw)

# Emit to Pub/Sub topic
await emit("user.created", user, topic="user-events")

See Integrations for detailed guides.

Requirements

  • Python >=3.11
  • asyncio support

License

Apache License 2.0

Contributing

This framework is maintained by Airspot for internal use, but contributions are welcome.

Support

For issues and questions, please open a GitHub issue.


Built with ❤️ by Airspot

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

krules_framework-3.1.1.tar.gz (33.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

krules_framework-3.1.1-py3-none-any.whl (57.2 kB view details)

Uploaded Python 3

File details

Details for the file krules_framework-3.1.1.tar.gz.

File metadata

  • Download URL: krules_framework-3.1.1.tar.gz
  • Upload date:
  • Size: 33.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for krules_framework-3.1.1.tar.gz
Algorithm Hash digest
SHA256 3a027f452e21efbd7a5c1d3d592180f130433ed1578793f1f8bda98ab5215fdc
MD5 847256f3f42d109e952eba0fea12f76a
BLAKE2b-256 79d2380a3a8fb789228751b23fe19f093c7d8dc0b7bbf85be34fa214a1f0bf71

See more details on using hashes here.

File details

Details for the file krules_framework-3.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for krules_framework-3.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4c8ed3567456b703477c19c97403e9ffb8c2f5f0bd736d31e6b3f6cae78ebd77
MD5 6b09abc6e8605c5639b47bb99ca5b789
BLAKE2b-256 7e453f83d8385d1745bd6071609d8d3dd1ef91eccc5f4127b3c4cedf4873170e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page