Skip to main content

Workflow-native database framework for Kailash SDK

Project description

Kailash DataFlow

Zero-Config Database Framework - Django simplicity meets enterprise-grade production quality.

๐Ÿš€ Quick Start (60 seconds)

from kailash_dataflow import DataFlow

# That's it! No configuration needed
db = DataFlow()

# Define your model
@db.model
class User:
    id: int
    name: str
    email: str

# DataFlow automatically creates:
# โœ… Database schema (PostgreSQL, MySQL, SQLite)
# โœ… 9 workflow nodes per model (CRUD + bulk ops)
# โœ… Real SQL operations with security
# โœ… Connection pooling and transaction management
# โœ… MongoDB-style query builder (implemented!)
# โš ๏ธ Redis query cache (planned)
# โš ๏ธ Multi-database runtime (PostgreSQL only)

You now have a production-ready database layer!

๐ŸŽฏ What Makes DataFlow Different?

Zero Configuration That Actually Works

# Development? Uses SQLite automatically
db = DataFlow()  # Just works!

# Production? Reads from environment
# DATABASE_URL=postgresql://...
db = DataFlow()  # Still just works!

# Need control? Progressive enhancement
db = DataFlow(
    pool_size=50,
    read_replicas=['replica1', 'replica2'],
    monitoring=True
)

Real Database Operations (Currently Available)

# Traditional ORMs: Imperative code
User.objects.create(name="Alice")  # Django
user = User(name="Alice"); session.add(user)  # SQLAlchemy

# DataFlow: Workflow-native database operations
workflow = WorkflowBuilder()
workflow.add_node("UserCreateNode", "create_user", {
    "name": "Alice",
    "email": "alice@example.com"
})
workflow.add_node("UserListNode", "find_users", {
    "limit": 10,
    "offset": 0
})

# Real SQL is executed: INSERT INTO users (name, email) VALUES ($1, $2)

MongoDB-Style Query Builder (NEW!)

# Get QueryBuilder from any model
builder = User.query_builder()

# MongoDB-style operators
builder.where("age", "$gte", 18)
builder.where("status", "$in", ["active", "premium"])
builder.where("email", "$regex", "^[a-z]+@company\.com$")
builder.order_by("created_at", "DESC")
builder.limit(10)

# Generates optimized SQL for your database
sql, params = builder.build_select()
# PostgreSQL: SELECT * FROM "users" WHERE "age" >= $1 AND "status" IN ($2, $3) AND "email" ~ $4 ORDER BY "created_at" DESC LIMIT 10

# Works seamlessly with ListNode
workflow.add_node("UserListNode", "search", {
    "filter": {
        "age": {"$gte": 18},
        "status": {"$in": ["active", "premium"]},
        "email": {"$regex": "^admin"}
    }
})

Database Requirements

# Current limitation: PostgreSQL only for execution
db = DataFlow(database_url="postgresql://user:pass@localhost/db")

# Schema generation works for all databases
schema_sql = db.generate_complete_schema_sql("sqlite")  # โœ… Works
schema_sql = db.generate_complete_schema_sql("mysql")   # โœ… Works
schema_sql = db.generate_complete_schema_sql("postgresql")  # โœ… Works

# But execution currently requires PostgreSQL
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow.build())  # โœ… PostgreSQL only

Database Operations as Workflow Nodes

# Traditional ORMs: Imperative code
user = User.objects.create(name="Alice")  # Django
user = User(name="Alice"); session.add(user)  # SQLAlchemy

# DataFlow: Workflow-native (9 nodes per model!)
workflow = WorkflowBuilder()
workflow.add_node("UserCreateNode", "create_user", {
    "name": "Alice",
    "email": "alice@example.com"
})
workflow.add_node("UserListNode", "find_users", {
    "filter": {"name": {"$like": "A%"}}
})

Enterprise Configuration

# Multi-tenancy configuration (query modification planned)
db = DataFlow(multi_tenant=True)

# Real SQL generation with security
db = DataFlow(
    database_url="postgresql://user:pass@localhost/db",
    pool_size=20,
    pool_max_overflow=30,
    monitoring=True,
    echo=False  # No SQL logging in production
)

# All generated nodes use parameterized queries for security
# INSERT INTO users (name, email) VALUES ($1, $2)  -- Safe from SQL injection

๐Ÿšฆ Implementation Status

โœ… Currently Available (Production-Ready)

  • Database Schema Generation: Complete CREATE TABLE for PostgreSQL, MySQL, SQLite
  • Real Database Operations: All 9 CRUD + bulk nodes execute actual SQL
  • SQL Security: Parameterized queries prevent SQL injection
  • Connection Management: Connection pooling, DDL execution, error handling
  • Workflow Integration: Full compatibility with WorkflowBuilder/LocalRuntime
  • Configuration System: Zero-config to enterprise patterns
  • MongoDB-Style Query Builder: Complete with all operators ($eq, $gt, $in, $regex, etc.)

โš ๏ธ Limitations

  • Database Runtime: PostgreSQL execution only (schema generation works for all)
  • AsyncSQLDatabaseNode: Current limitation requires PostgreSQL connection string

๐Ÿ”„ Planned Features (Roadmap)

  • Redis Query Caching: User.cached_query() with automatic invalidation
  • Multi-Database Runtime: SQLite/MySQL execution support
  • Advanced Multi-Tenancy: Automatic query modification for tenant isolation

๐Ÿ“š Documentation

Getting Started

Development

Production

๐Ÿ’ก Real-World Examples

E-Commerce Platform

# Define your models
@db.model
class Product:
    id: int
    name: str
    price: float
    stock: int

@db.model
class Order:
    id: int
    user_id: int
    total: float
    status: str

# Use in workflows
workflow = WorkflowBuilder()

# Check inventory
workflow.add_node("ProductGetNode", "check_stock", {
    "id": "{product_id}"
})

# Create order with transaction
workflow.add_node("TransactionContextNode", "tx_start")
workflow.add_node("OrderCreateNode", "create_order", {
    "user_id": "{user_id}",
    "total": "{total}"
})
workflow.add_node("ProductUpdateNode", "update_stock", {
    "id": "{product_id}",
    "stock": "{new_stock}"
})

Multi-Tenant SaaS (Current Implementation)

# Enable multi-tenancy configuration
db = DataFlow(
    database_url="postgresql://user:pass@localhost/db",
    multi_tenant=True
)

# Multi-tenant models get tenant_id field automatically
@db.model
class User:
    name: str
    email: str
    # tenant_id: str automatically added

# Use in workflows with real database operations
workflow.add_node("UserCreateNode", "create_user", {
    "name": "Alice",
    "email": "alice@acme-corp.com"
})
workflow.add_node("UserListNode", "list_users", {
    "limit": 10,
    "filter": {}
})

High-Performance ETL (Current Implementation)

# Bulk operations with real database execution
workflow.add_node("UserBulkCreateNode", "import_users", {
    "data": users_data,  # List of user records
    "batch_size": 1000,
    "conflict_resolution": "skip"
})

# Real bulk INSERT operations executed
# Uses parameterized queries for security
# Processes data in configurable batches

# List operations with filters
workflow.add_node("UserListNode", "active_users", {
    "limit": 1000,
    "offset": 0,
    "order_by": ["created_at"],
    "filter": {"active": True}
})

๐Ÿ—๏ธ Architecture

DataFlow seamlessly integrates with Kailash's workflow architecture:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 Your Application                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                    DataFlow                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”‚
โ”‚  โ”‚  Models  โ”‚  โ”‚   Nodes  โ”‚  โ”‚ Migrationsโ”‚         โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜         โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                Core Features                         โ”‚
โ”‚  QueryBuilder โ”‚ QueryCache โ”‚ Monitoring โ”‚ Multi-tenant โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”‚
โ”‚  โ”‚MongoDB-  โ”‚  โ”‚Redis     โ”‚  โ”‚Pattern   โ”‚         โ”‚
โ”‚  โ”‚style     โ”‚  โ”‚Caching   โ”‚  โ”‚Invalidateโ”‚         โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚               Kailash SDK                           โ”‚
โ”‚         Workflows โ”‚ Nodes โ”‚ Runtime                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿงช Testing

DataFlow includes comprehensive testing support:

# Test with in-memory database
def test_user_creation():
    db = DataFlow(testing=True)

    @db.model
    class User:
        id: int
        name: str

    # Automatic test isolation
    user = db.test_create(User, name="Test User")
    assert user.name == "Test User"

๐Ÿค Contributing

We welcome contributions! DataFlow follows Kailash SDK patterns:

  1. Use SDK components and patterns
  2. Maintain zero-config philosophy
  3. Write comprehensive tests
  4. Update documentation

See CONTRIBUTING.md for details.

๐Ÿ“Š Performance

DataFlow provides real database performance with PostgreSQL:

  • Real SQL execution with parameterized queries
  • Connection pooling with configurable pool sizes
  • Bulk operations with batching for large datasets
  • Production-ready database operations

Performance testing requires PostgreSQL database setup. Advanced caching and query optimization features are planned.

โšก Why DataFlow?

  • Real Database Operations: Actual SQL execution, not mocks
  • Workflow-Native: Database ops as first-class nodes
  • Production-Ready: PostgreSQL support with connection pooling
  • Progressive: Simple to start, enterprise features available
  • 100% Kailash: Built on proven SDK components

Built with Kailash SDK | Parent Project | SDK Docs

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kailash_dataflow-0.1.1.tar.gz (206.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kailash_dataflow-0.1.1-py3-none-any.whl (242.8 kB view details)

Uploaded Python 3

File details

Details for the file kailash_dataflow-0.1.1.tar.gz.

File metadata

  • Download URL: kailash_dataflow-0.1.1.tar.gz
  • Upload date:
  • Size: 206.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for kailash_dataflow-0.1.1.tar.gz
Algorithm Hash digest
SHA256 c3ace1c4eafc9e8e3b89257791b81a17d6c1e501cf57c01d26abb60999acc80f
MD5 02dcd8fd6150283d35949fb19f7c252d
BLAKE2b-256 8d3da0a6690edf7cb343774a41b77850b822b960bf71bfa31dda837dab57f620

See more details on using hashes here.

File details

Details for the file kailash_dataflow-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kailash_dataflow-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b2d0f791d69c91bd618d1d73403311a6e2eb64ce58ac2e01be356c0bb35d8976
MD5 83fb02ce88193fe4464a4118ec00eb44
BLAKE2b-256 6b3ed00c1682c4d55a068de2a65a4c1cd2011b58a424e29fdce0bd546403c3fe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page