KRules Framework - Async event-driven framework
Project description
KRules Framework 2.0
KRules Framework is a modern, async-first event-driven application framework for Python.
What's New in 2.0
🎉 Complete rewrite with focus on simplicity and developer experience:
- ✨ Decorator-based API - Clean, intuitive syntax
- ⚡ Async/await native - Built for modern Python
- 🎯 Type hints - Full IDE autocomplete support
- 🪶 Lightweight - Minimal dependencies (removed ReactiveX, Pydantic, CEL, JSONPath)
- 🧪 Easy testing - Simple, fast unit tests
- 📦 Same subject system - Dynamic properties, persistent state, storage backends
Note: 2.0 has breaking changes. See MIGRATION.md for upgrade guide.
Quick Start
Installation
pip install krules-framework
With optional features:
# Redis storage backend
pip install "krules-framework[redis]"
# Google Cloud Pub/Sub
pip install "krules-framework[pubsub]"
# FastAPI integration
pip install "krules-framework[fastapi]"
Basic Example
from krules_core import on, when, emit, subject_factory
from datetime import datetime
# Define event handlers with decorators
@on("user.login")
@when(lambda ctx: ctx.subject.get("status") == "active")
async def handle_user_login(ctx):
"""Handle active user login"""
user = ctx.subject
# Update subject properties
user.set("last_login", datetime.now())
user.set("login_count", lambda count: count + 1)
# Emit new events
await ctx.emit("user.logged-in", {
"user_id": user.name,
"count": user.get("login_count")
})
# React to property changes
@on("subject-property-changed")
@when(lambda ctx: ctx.property_name == "temperature")
@when(lambda ctx: ctx.new_value > 80)
async def alert_on_overheat(ctx):
"""Alert when temperature exceeds threshold"""
await ctx.emit("alert.overheat", {
"device": ctx.subject.name,
"temperature": ctx.new_value
})
# Use subjects
user = subject_factory("user-123")
user.set("status", "active")
user.set("login_count", 0)
# Emit events
await emit("user.login", user, {"ip": "192.168.1.1"})
Core Concepts
Subjects - Dynamic Entities with State
Subjects are entities with persistent, reactive properties:
from krules_core import subject_factory
# Create or load subject
device = subject_factory("device-456")
# Set properties (fully dynamic - no schema required!)
device.set("temperature", 75.5)
device.set("status", "online")
device.set("metadata", {"location": "room-1", "floor": 2})
# Get properties
temp = device.get("temperature")
status = device.get("status", default="offline") # With default
# Lambda values (computed from previous value)
device.set("count", 0)
device.set("count", lambda c: c + 1) # Increment
# Extended properties (metadata, not part of main state)
device.set_ext("tags", ["production", "critical"])
# Iteration
for prop_name in device:
print(f"{prop_name}: {device.get(prop_name)}")
# Check existence
if "temperature" in device:
print(device.get("temperature"))
# Persist to storage
device.store()
# Export to dict
data = device.dict() # {"name": "device-456", "temperature": 75.5, ...}
Event Handlers - Decorators
Define handlers using clean decorator syntax:
from krules_core import on, when, EventContext
# Simple handler
@on("order.created")
async def process_order(ctx: EventContext):
order = ctx.subject
order.set("status", "processing")
await ctx.emit("order.processing")
# Multiple events
@on("user.created", "user.updated", "user.deleted")
async def log_user_change(ctx: EventContext):
logger.info(f"User event: {ctx.event_type}")
# Glob patterns
@on("device.*") # Matches device.created, device.updated, etc.
async def handle_device(ctx: EventContext):
process_device_event(ctx)
# Wildcard
@on("*")
async def log_all(ctx: EventContext):
logger.debug(f"Event: {ctx.event_type} on {ctx.subject.name}")
Filters - Conditional Execution
Use @when to add conditions:
# Single filter
@on("payment.process")
@when(lambda ctx: ctx.payload.get("amount") > 0)
async def process_payment(ctx):
# Only processes payments with amount > 0
pass
# Multiple filters (ALL must pass)
@on("admin.action")
@when(lambda ctx: ctx.payload.get("role") == "admin")
@when(lambda ctx: ctx.subject.get("verified") == True)
@when(lambda ctx: not ctx.subject.get("suspended", False))
async def admin_action(ctx):
# Only for verified, non-suspended admins
pass
# Reusable filters
def is_premium(ctx):
return ctx.subject.get("tier") == "premium"
def has_credits(ctx):
return ctx.subject.get("credits", 0) > 0
@on("feature.use")
@when(is_premium)
@when(has_credits)
async def use_premium_feature(ctx):
ctx.subject.set("credits", lambda c: c - 1)
# Use feature...
Property Change Events
Subject properties emit change events automatically:
@on("subject-property-changed")
@when(lambda ctx: ctx.property_name == "status")
async def on_status_change(ctx):
device = ctx.subject
print(f"Status changed: {ctx.old_value} → {ctx.new_value}")
if ctx.new_value == "error":
await ctx.emit("alert.device_error", {
"device_id": device.name
})
# Use it
device = subject_factory("device-123")
device.set("status", "ok") # Emits subject-property-changed
device.set("status", "warning") # Emits subject-property-changed
device.set("status", "error") # Emits subject-property-changed → triggers alert
Middleware
Run logic for all events:
from krules_core import middleware
import time
@middleware
async def timing_middleware(ctx, next):
"""Measure handler execution time"""
start = time.time()
await next()
duration = time.time() - start
print(f"{ctx.event_type} took {duration:.3f}s")
@middleware
async def error_handling(ctx, next):
"""Global error handler"""
try:
await next()
except Exception as e:
logger.error(f"Handler error: {e}")
await ctx.emit("error.handler_failed", {"error": str(e)})
Advanced Features
Storage Backends
from dependency_injector import providers
from krules_core.providers import subject_storage_factory
# Redis storage
from redis_subjects_storage import RedisSubjectStorage
import redis
redis_client = redis.Redis(host='localhost', port=6379)
subject_storage_factory.override(
providers.Factory(
lambda name, **kwargs: RedisSubjectStorage(name, redis_client)
)
)
# Now all subjects use Redis
user = subject_factory("user-123")
user.set("name", "John") # Stored in Redis
Async Context
# In async context (FastAPI, async main, etc.)
@on("data.fetch")
async def fetch_data(ctx):
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
ctx.subject.set("external_data", response.json())
# Events emit asynchronously
await emit("data.fetch", subject)
Testing
import pytest
from krules_core import on, when, emit, subject_factory, reset_event_bus
@pytest.fixture(autouse=True)
def reset():
"""Reset event bus before each test"""
reset_event_bus()
@pytest.mark.asyncio
async def test_user_login():
"""Test user login handler"""
results = []
@on("user.login")
async def handler(ctx):
results.append(ctx.event_type)
ctx.subject.set("logged_in", True)
user = subject_factory("test-user")
await emit("user.login", user)
assert len(results) == 1
assert user.get("logged_in") == True
Requirements
- Python >=3.11
- For async support: Python 3.11+ with asyncio
Upgrading from 1.x
See MIGRATION.md for detailed migration guide.
TL;DR:
- Rules (
RuleFactory.create(...)) → Handlers (@on,@when) event_router_factory().route()→await emit()Filter,Processclasses → Python functions- Subject API unchanged ✅
License
Apache License 2.0
Contributing
This package is maintained by Airspot for internal use, but contributions are welcome.
Developed and maintained by Airspot
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file krules_framework-2.0.0rc1.tar.gz.
File metadata
- Download URL: krules_framework-2.0.0rc1.tar.gz
- Upload date:
- Size: 42.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b8161cecd4b2a9685d08a4072c5a360241d9b526937cd01d67cf8a92323173c1
|
|
| MD5 |
1ebb023b2c807f7e3bf9c2d89371c79e
|
|
| BLAKE2b-256 |
6b94a62cdbc69ae0a2f80b76792025d0fac6719e01e91bcfba49282d561f10ee
|
File details
Details for the file krules_framework-2.0.0rc1-py3-none-any.whl.
File metadata
- Download URL: krules_framework-2.0.0rc1-py3-none-any.whl
- Upload date:
- Size: 65.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
971229bfb8098faf20c2ee09b73eb57e74604f2b32049379d20df79b13a950b0
|
|
| MD5 |
49aa34e15234a15fae9dd648753a42cb
|
|
| BLAKE2b-256 |
67dc28b92619e7aab85f2d300a2fd6609668b5136a1e5f857ab989dfc2c8e528
|