Workflow-native database framework for Kailash SDK
Project description
Kailash DataFlow
Zero-Config Database Framework - Django simplicity meets enterprise-grade production quality.
๐ Quick Start (60 seconds)
from kailash_dataflow import DataFlow
# That's it! No configuration needed
db = DataFlow()
# Define your model
@db.model
class User:
id: int
name: str
email: str
# DataFlow automatically creates:
# โ
Database schema (PostgreSQL, MySQL, SQLite)
# โ
Auto-migration system (PostgreSQL-only, production-ready)
# โ
9 workflow nodes per model (CRUD + bulk ops)
# โ
Real SQL operations with security
# โ
Connection pooling and transaction management
# โ
MongoDB-style query builder (implemented!)
# โ
Concurrent access protection with locking
# โ
Schema state management with rollback
# โ ๏ธ Redis query cache (planned)
# โ ๏ธ Multi-database runtime (PostgreSQL only)
You now have a production-ready database layer!
๐ฏ What Makes DataFlow Different?
Zero Configuration That Actually Works
# Development? Uses SQLite automatically
db = DataFlow() # Just works!
# Production? Reads from environment
# DATABASE_URL=postgresql://...
db = DataFlow() # Still just works!
# Need control? Progressive enhancement
db = DataFlow(
pool_size=50,
read_replicas=['replica1', 'replica2'],
monitoring=True
)
Real Database Operations (Currently Available)
# Traditional ORMs: Imperative code
User.objects.create(name="Alice") # Django
user = User(name="Alice"); session.add(user) # SQLAlchemy
# DataFlow: Workflow-native database operations
workflow = WorkflowBuilder()
workflow.add_node("UserCreateNode", "create_user", {
"name": "Alice",
"email": "alice@example.com"
})
workflow.add_node("UserListNode", "find_users", {
"limit": 10,
"offset": 0
})
# Real SQL is executed: INSERT INTO users (name, email) VALUES ($1, $2)
MongoDB-Style Query Builder (NEW!)
# Get QueryBuilder from any model
builder = User.query_builder()
# MongoDB-style operators
builder.where("age", "$gte", 18)
builder.where("status", "$in", ["active", "premium"])
builder.where("email", "$regex", "^[a-z]+@company\.com$")
builder.order_by("created_at", "DESC")
builder.limit(10)
# Generates optimized SQL for your database
sql, params = builder.build_select()
# PostgreSQL: SELECT * FROM "users" WHERE "age" >= $1 AND "status" IN ($2, $3) AND "email" ~ $4 ORDER BY "created_at" DESC LIMIT 10
# Works seamlessly with ListNode
workflow.add_node("UserListNode", "search", {
"filter": {
"age": {"$gte": 18},
"status": {"$in": ["active", "premium"]},
"email": {"$regex": "^admin"}
}
})
Database Requirements
# Current limitation: PostgreSQL only for execution
db = DataFlow(database_url="postgresql://user:pass@localhost/db")
# Schema generation works for all databases
schema_sql = db.generate_complete_schema_sql("sqlite") # โ
Works
schema_sql = db.generate_complete_schema_sql("mysql") # โ
Works
schema_sql = db.generate_complete_schema_sql("postgresql") # โ
Works
# But execution currently requires PostgreSQL
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow.build()) # โ
PostgreSQL only
Database Operations as Workflow Nodes
# Traditional ORMs: Imperative code
user = User.objects.create(name="Alice") # Django
user = User(name="Alice"); session.add(user) # SQLAlchemy
# DataFlow: Workflow-native (9 nodes per model!)
workflow = WorkflowBuilder()
workflow.add_node("UserCreateNode", "create_user", {
"name": "Alice",
"email": "alice@example.com"
})
workflow.add_node("UserListNode", "find_users", {
"filter": {"name": {"$like": "A%"}}
})
Enterprise Configuration
# Multi-tenancy configuration (query modification planned)
db = DataFlow(multi_tenant=True)
# Real SQL generation with security
db = DataFlow(
database_url="postgresql://user:pass@localhost/db",
pool_size=20,
pool_max_overflow=30,
monitoring=True,
echo=False # No SQL logging in production
)
# All generated nodes use parameterized queries for security
# INSERT INTO users (name, email) VALUES ($1, $2) -- Safe from SQL injection
๐ฆ Implementation Status
โ Currently Available (Production-Ready)
- Database Schema Generation: Complete CREATE TABLE for PostgreSQL, MySQL, SQLite
- Auto-Migration System: PostgreSQL-only, production-ready automatic schema synchronization
- Real Database Operations: All 9 CRUD + bulk nodes execute actual SQL
- SQL Security: Parameterized queries prevent SQL injection
- Connection Management: Connection pooling, DDL execution, error handling
- Workflow Integration: Full compatibility with WorkflowBuilder/LocalRuntime
- Configuration System: Zero-config to enterprise patterns
- MongoDB-Style Query Builder: Complete with all operators ($eq, $gt, $in, $regex, etc.)
- Concurrent Access Protection: Migration locking and atomic operations
- Schema State Management: Change detection, caching, and rollback capabilities
โ ๏ธ Limitations
- Database Runtime: PostgreSQL execution only (schema generation works for all)
- AsyncSQLDatabaseNode: Current limitation requires PostgreSQL connection string
๐ Planned Features (Roadmap)
- Redis Query Caching:
User.cached_query()with automatic invalidation - Multi-Database Runtime: SQLite/MySQL execution support
- Advanced Multi-Tenancy: Automatic query modification for tenant isolation
๐ Documentation
Getting Started
- 5-Minute Tutorial - Build your first app
- Core Concepts - Understand DataFlow
- Examples - Complete applications
Development
- Models - Define your schema
- CRUD Operations - Basic operations
- Relationships - Model associations
Production
- Deployment - Go to production
- Performance - Optimization guide
- Monitoring - Observability
๐ก Real-World Examples
E-Commerce Platform
# Define your models
@db.model
class Product:
id: int
name: str
price: float
stock: int
@db.model
class Order:
id: int
user_id: int
total: float
status: str
# Use in workflows
workflow = WorkflowBuilder()
# Check inventory
workflow.add_node("ProductGetNode", "check_stock", {
"id": "{product_id}"
})
# Create order with transaction
workflow.add_node("TransactionContextNode", "tx_start")
workflow.add_node("OrderCreateNode", "create_order", {
"user_id": "{user_id}",
"total": "{total}"
})
workflow.add_node("ProductUpdateNode", "update_stock", {
"id": "{product_id}",
"stock": "{new_stock}"
})
Multi-Tenant SaaS (Current Implementation)
# Enable multi-tenancy configuration
db = DataFlow(
database_url="postgresql://user:pass@localhost/db",
multi_tenant=True
)
# Multi-tenant models get tenant_id field automatically
@db.model
class User:
name: str
email: str
# tenant_id: str automatically added
# Use in workflows with real database operations
workflow.add_node("UserCreateNode", "create_user", {
"name": "Alice",
"email": "alice@acme-corp.com"
})
workflow.add_node("UserListNode", "list_users", {
"limit": 10,
"filter": {}
})
High-Performance ETL (Current Implementation)
# Bulk operations with real database execution
workflow.add_node("UserBulkCreateNode", "import_users", {
"data": users_data, # List of user records
"batch_size": 1000,
"conflict_resolution": "skip"
})
# Real bulk INSERT operations executed
# Uses parameterized queries for security
# Processes data in configurable batches
# List operations with filters
workflow.add_node("UserListNode", "active_users", {
"limit": 1000,
"offset": 0,
"order_by": ["created_at"],
"filter": {"active": True}
})
๐๏ธ Architecture
DataFlow seamlessly integrates with Kailash's workflow architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Your Application โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ DataFlow โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Models โ โ Nodes โ โ Migrationsโ โ
โ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โ
โ โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ โ
โ Core Features โ
โ QueryBuilder โ QueryCache โ Monitoring โ Multi-tenant โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โMongoDB- โ โRedis โ โPattern โ โ
โ โstyle โ โCaching โ โInvalidateโ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Kailash SDK โ
โ Workflows โ Nodes โ Runtime โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐งช Testing
DataFlow includes comprehensive testing support:
# Test with in-memory database
def test_user_creation():
db = DataFlow(testing=True)
@db.model
class User:
id: int
name: str
# Automatic test isolation
user = db.test_create(User, name="Test User")
assert user.name == "Test User"
๐ค Contributing
We welcome contributions! DataFlow follows Kailash SDK patterns:
- Use SDK components and patterns
- Maintain zero-config philosophy
- Write comprehensive tests
- Update documentation
See CONTRIBUTING.md for details.
๐ Performance
DataFlow provides real database performance with PostgreSQL:
- Real SQL execution with parameterized queries
- Connection pooling with configurable pool sizes
- Bulk operations with batching for large datasets
- Production-ready database operations
Performance testing requires PostgreSQL database setup. Advanced caching and query optimization features are planned.
โก Why DataFlow?
- Real Database Operations: Actual SQL execution, not mocks
- Workflow-Native: Database ops as first-class nodes
- Production-Ready: PostgreSQL support with connection pooling
- Progressive: Simple to start, enterprise features available
- 100% Kailash: Built on proven SDK components
Built with Kailash SDK | Parent Project | SDK Docs
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kailash_dataflow-0.3.6.tar.gz.
File metadata
- Download URL: kailash_dataflow-0.3.6.tar.gz
- Upload date:
- Size: 267.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
29cb2fd506e495667a6a770c3eb7b6bf7223e231bcc3f8140704ae79ab8a3e95
|
|
| MD5 |
f73b8e56f381c4f09cd2301ee9ff3c9b
|
|
| BLAKE2b-256 |
964db91b7e22ebb5997e16dbb72855f5d9c2701ca8fabdaf94ea493a4145d691
|
File details
Details for the file kailash_dataflow-0.3.6-py3-none-any.whl.
File metadata
- Download URL: kailash_dataflow-0.3.6-py3-none-any.whl
- Upload date:
- Size: 308.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4a56fbc23164a99119d37dac324c6d96a9bf9cecfefbfba8f18cde51514f3ac7
|
|
| MD5 |
3b3dceb96aa604ae6885213b6d4c0485
|
|
| BLAKE2b-256 |
f10ef3c84ac2b9628da513eece908aefe4bdd224969801688536aaecdbc1ef3f
|