Skip to main content

Workflow-native database framework for Kailash SDK

Project description

Kailash DataFlow

Multi-Database Alpha Framework - Django simplicity meets enterprise-grade production quality with PostgreSQL and SQLite support.

โš ๏ธ ALPHA RELEASE: DataFlow supports PostgreSQL (full features) and SQLite (near-complete parity). MySQL support coming in beta.

โœ… v0.4.7+ Context-Aware Improvements: String IDs preserved, multi-instance isolation, deferred schema operations

๐Ÿš€ Quick Start

Prerequisites

  • PostgreSQL 12+ (recommended for production) OR SQLite 3.x (development/testing)
  • Python 3.8+

Installation

pip install kailash-dataflow
# or
pip install kailash[dataflow]

Basic Usage

from dataflow import DataFlow

# PostgreSQL (production) or SQLite (development)
db = DataFlow("postgresql://user:pass@localhost/dbname")
# db = DataFlow("sqlite:///app.db")  # SQLite alternative

# Define your model
@db.model
class User:
    id: str  # String IDs now preserved! (v0.4.7+)
    name: str
    email: str

# DataFlow automatically creates:
# โœ… Database schema with migrations (PostgreSQL)
# โœ… 9 workflow nodes per model (CRUD + bulk ops)
# โœ… Real SQL operations with injection protection
# โœ… Connection pooling and transaction management
# โœ… MongoDB-style query builder
# โœ… Concurrent access protection with locking
# โœ… Schema state management with rollback

๐ŸŽฏ What Makes DataFlow Different?

Multi-Database Support

# Production PostgreSQL
db = DataFlow("postgresql://user:pass@localhost/dbname")

# Development SQLite
db = DataFlow("sqlite:///app.db")

# Environment-based configuration
# DATABASE_URL=postgresql://... or sqlite:///...
db = DataFlow()  # Reads from DATABASE_URL

# Advanced features (both databases)
db = DataFlow(
    "postgresql://...",  # or "sqlite:///..."
    pool_size=50,
    auto_migrate=True,
    monitoring=True
)

Real Database Operations (Currently Available)

# Traditional ORMs: Imperative code
User.objects.create(name="Alice")  # Django
user = User(name="Alice"); session.add(user)  # SQLAlchemy

# DataFlow: Workflow-native database operations
workflow = WorkflowBuilder()
workflow.add_node("UserCreateNode", "create_user", {
    "name": "Alice",
    "email": "alice@example.com"
})
workflow.add_node("UserListNode", "find_users", {
    "limit": 10,
    "offset": 0
})

# Real SQL is executed: INSERT INTO users (name, email) VALUES ($1, $2)

MongoDB-Style Query Builder (NEW!)

# Get QueryBuilder from any model
builder = User.query_builder()

# MongoDB-style operators
builder.where("age", "$gte", 18)
builder.where("status", "$in", ["active", "premium"])
builder.where("email", "$regex", "^[a-z]+@company\.com$")
builder.order_by("created_at", "DESC")
builder.limit(10)

# Generates optimized SQL for your database
sql, params = builder.build_select()
# PostgreSQL: SELECT * FROM "users" WHERE "age" >= $1 AND "status" IN ($2, $3) AND "email" ~ $4 ORDER BY "created_at" DESC LIMIT 10

# Works seamlessly with ListNode
workflow.add_node("UserListNode", "search", {
    "filter": {
        "age": {"$gte": 18},
        "status": {"$in": ["active", "premium"]},
        "email": {"$regex": "^admin"}
    }
})

Database Support Status

# PostgreSQL: Full feature support
db = DataFlow(database_url="postgresql://user:pass@localhost/db")

# SQLite: Near-complete parity (missing only schema discovery)
db = DataFlow(database_url="sqlite:///app.db")

# Both support full workflow execution
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow.build())  # โœ… Works with both databases

# Only limitation: Real schema discovery (PostgreSQL only)
schema = db.discover_schema(use_real_inspection=True)  # PostgreSQL only

Database Operations as Workflow Nodes

# Traditional ORMs: Imperative code
user = User.objects.create(name="Alice")  # Django
user = User(name="Alice"); session.add(user)  # SQLAlchemy

# DataFlow: Workflow-native (9 nodes per model!)
workflow = WorkflowBuilder()
workflow.add_node("UserCreateNode", "create_user", {
    "name": "Alice",
    "email": "alice@example.com"
})
workflow.add_node("UserListNode", "find_users", {
    "filter": {"name": {"$like": "A%"}}
})

Enterprise Configuration

# Multi-tenancy configuration (query modification planned)
db = DataFlow(multi_tenant=True)

# Real SQL generation with security
db = DataFlow(
    database_url="postgresql://user:pass@localhost/db",
    pool_size=20,
    pool_max_overflow=30,
    monitoring=True,
    echo=False  # No SQL logging in production
)

# All generated nodes use parameterized queries for security
# INSERT INTO users (name, email) VALUES ($1, $2)  -- Safe from SQL injection

๐Ÿ”ง Context-Aware Improvements (v0.4.7+)

String ID Preservation

# String IDs are now preserved without forced integer conversion
@db.model
class Session:
    id: str  # Explicitly string - no more PostgreSQL type errors!
    user_id: str
    token: str

# String IDs work correctly in all operations
workflow.add_node("SessionCreateNode", "create", {
    "id": "sess-uuid-12345",  # Preserved as string
    "user_id": "user-uuid-67890",
    "token": "token-abc-def"
})

Multi-Instance Isolation

# Each DataFlow instance maintains separate context
dev_db = DataFlow("sqlite:///dev.db")
prod_db = DataFlow("postgresql://prod...")

# Models registered to specific instances
@dev_db.model
class User:
    name: str

@prod_db.model
class User:  # Same name, different instance - works!
    name: str
    email: str

# Nodes bound to correct instance automatically

Deferred Schema Operations

  • Synchronous registration: Models register immediately with @db.model
  • Async table creation: Tables created on first use, not registration
  • Better performance: No blocking during model definition phase

๐Ÿšฆ Implementation Status

โœ… Currently Available (Production-Ready)

  • Database Schema Generation: Complete CREATE TABLE for PostgreSQL, MySQL, SQLite
  • Auto-Migration System: PostgreSQL-only, production-ready automatic schema synchronization
  • Real Database Operations: All 9 CRUD + bulk nodes execute actual SQL
  • SQL Security: Parameterized queries prevent SQL injection
  • Connection Management: Connection pooling, DDL execution, error handling
  • Workflow Integration: Full compatibility with WorkflowBuilder/LocalRuntime
  • Configuration System: Zero-config to enterprise patterns
  • MongoDB-Style Query Builder: Complete with all operators ($eq, $gt, $in, $regex, etc.)
  • Concurrent Access Protection: Migration locking and atomic operations
  • Schema State Management: Change detection, caching, and rollback capabilities
  • String ID Preservation: String/UUID IDs preserved without forced conversion (v0.4.7+)
  • Multi-Instance Isolation: Separate contexts for different DataFlow instances (v0.4.7+)
  • Deferred Schema Operations: Better performance with lazy table creation (v0.4.7+)

โš ๏ธ ALPHA LIMITATIONS

  • Schema Discovery: Real database introspection (discover_schema(use_real_inspection=True)) is PostgreSQL-only
  • MySQL Support: Not available in alpha release
  • Complex Migrations: Some SQLite migration operations limited by ALTER TABLE syntax
  • Production Use: Alpha software - thorough testing recommended for production deployments

๐Ÿ”„ Planned Features (Roadmap)

  • Redis Query Caching: User.cached_query() with automatic invalidation
  • Multi-Database Runtime: SQLite/MySQL execution support
  • Advanced Multi-Tenancy: Automatic query modification for tenant isolation

๐Ÿ“š Documentation

Getting Started

Development

Production

๐Ÿ’ก Real-World Examples

E-Commerce Platform

# Define your models
@db.model
class Product:
    id: int
    name: str
    price: float
    stock: int

@db.model
class Order:
    id: int
    user_id: int
    total: float
    status: str

# Use in workflows
workflow = WorkflowBuilder()

# Check inventory
workflow.add_node("ProductGetNode", "check_stock", {
    "id": "{product_id}"
})

# Create order with transaction
workflow.add_node("TransactionContextNode", "tx_start")
workflow.add_node("OrderCreateNode", "create_order", {
    "user_id": "{user_id}",
    "total": "{total}"
})
workflow.add_node("ProductUpdateNode", "update_stock", {
    "id": "{product_id}",
    "stock": "{new_stock}"
})

Multi-Tenant SaaS (Current Implementation)

# Enable multi-tenancy configuration
db = DataFlow(
    database_url="postgresql://user:pass@localhost/db",
    multi_tenant=True
)

# Multi-tenant models get tenant_id field automatically
@db.model
class User:
    name: str
    email: str
    # tenant_id: str automatically added

# Use in workflows with real database operations
workflow.add_node("UserCreateNode", "create_user", {
    "name": "Alice",
    "email": "alice@acme-corp.com"
})
workflow.add_node("UserListNode", "list_users", {
    "limit": 10,
    "filter": {}
})

High-Performance ETL (Current Implementation)

# Bulk operations with real database execution
workflow.add_node("UserBulkCreateNode", "import_users", {
    "data": users_data,  # List of user records
    "batch_size": 1000,
    "conflict_resolution": "skip"
})

# Real bulk INSERT operations executed
# Uses parameterized queries for security
# Processes data in configurable batches

# List operations with filters
workflow.add_node("UserListNode", "active_users", {
    "limit": 1000,
    "offset": 0,
    "order_by": ["created_at"],
    "filter": {"active": True}
})

๐Ÿ—๏ธ Architecture

DataFlow seamlessly integrates with Kailash's workflow architecture:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 Your Application                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                    DataFlow                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”‚
โ”‚  โ”‚  Models  โ”‚  โ”‚   Nodes  โ”‚  โ”‚ Migrationsโ”‚         โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜         โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                Core Features                         โ”‚
โ”‚  QueryBuilder โ”‚ QueryCache โ”‚ Monitoring โ”‚ Multi-tenant โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”‚
โ”‚  โ”‚MongoDB-  โ”‚  โ”‚Redis     โ”‚  โ”‚Pattern   โ”‚         โ”‚
โ”‚  โ”‚style     โ”‚  โ”‚Caching   โ”‚  โ”‚Invalidateโ”‚         โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚               Kailash SDK                           โ”‚
โ”‚         Workflows โ”‚ Nodes โ”‚ Runtime                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿงช Testing

DataFlow includes comprehensive testing support:

# Test with in-memory database
def test_user_creation():
    db = DataFlow(testing=True)

    @db.model
    class User:
        id: int
        name: str

    # Automatic test isolation
    user = db.test_create(User, name="Test User")
    assert user.name == "Test User"

๐Ÿค Contributing

We welcome contributions! DataFlow follows Kailash SDK patterns:

  1. Use SDK components and patterns
  2. Maintain zero-config philosophy
  3. Write comprehensive tests
  4. Update documentation

See CONTRIBUTING.md for details.

๐Ÿ“Š Performance & Testing Status

Current Performance (PostgreSQL Alpha)

  • Real SQL execution with parameterized queries
  • Connection pooling with configurable pool sizes
  • Bulk operations with batching for large datasets
  • 95% unit test pass rate (615/648 tests passing)

Recent Test Improvements

  • 100% NO MOCKING compliance in Tier 2-3 tests
  • Real infrastructure testing with PostgreSQL
  • 167 test files covering all scenarios
  • 3-tier testing strategy (Unit/Integration/E2E)
  • Fixed critical bugs: checksum tracking, field type serialization

Alpha Testing Requirements

  • PostgreSQL 12+ required for all testing
  • Performance benchmarks available for PostgreSQL only
  • Advanced caching and query optimization features planned for beta

โšก Why DataFlow?

  • Real Database Operations: Actual SQL execution, not mocks
  • Workflow-Native: Database ops as first-class nodes
  • Production-Ready: PostgreSQL support with connection pooling
  • Progressive: Simple to start, enterprise features available
  • 100% Kailash: Built on proven SDK components

Built with Kailash SDK | Parent Project | SDK Docs

Project details


Release history Release notifications | RSS feed

This version

0.5.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kailash_dataflow-0.5.1.tar.gz (642.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kailash_dataflow-0.5.1-py3-none-any.whl (730.7 kB view details)

Uploaded Python 3

File details

Details for the file kailash_dataflow-0.5.1.tar.gz.

File metadata

  • Download URL: kailash_dataflow-0.5.1.tar.gz
  • Upload date:
  • Size: 642.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.9

File hashes

Hashes for kailash_dataflow-0.5.1.tar.gz
Algorithm Hash digest
SHA256 779a70221d3e4abcf093da049682e0f14b39a766e94e50e85fc70afaefe3ba9c
MD5 e31cc6bb2145cfb0fcd028b68a9f14c9
BLAKE2b-256 8db3294a2ed91ff81dcaaa17aed8c47f7f73cb523105b2f9d4fccbfd5f03134c

See more details on using hashes here.

File details

Details for the file kailash_dataflow-0.5.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kailash_dataflow-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7b69fccbd20d0efdadcd4edf8195bdc1fda4bb774649087b8a7108244b4dab45
MD5 0d15f14bcfe0d38e1b0f21ff7c671597
BLAKE2b-256 0f5671ebaea2273d3a7fc2ed44cc0c18337afbc0a2793c1507afc0ce9a5f93f1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page