Skip to main content

CafeDB: A Python-native, lightweight, schema-flexible, JSONL database with set-indexed querying, Python-native type support, and safe append-only storage.

Project description

CafeDB - Enhanced JSON Database

A simple, lightweight, and powerful Python database that stores data in human-readable JSON format with advanced querying capabilities.

Python 3.6+ License: MIT Pure Python

🚀 Quick Start

from cafedb import CafeDB

# Create or open database
db = CafeDB("my_database.json", verbose=True)

# Create table
db.create_table("users")

# Insert data
db.insert("users", {
    "name": "Alice Johnson",
    "age": 28,
    "city": "Paris",
    "email": "alice@gmail.com",
    "skills": ["Python", "JavaScript", "SQL"]
})

# Simple queries
all_users = db.select("users")
paris_users = db.select("users", {"city": "Paris"})

# Advanced queries with wildcards and comparisons
gmail_users = db.select("users", {"email": "*@gmail.com"})
young_adults = db.select("users", {"age": {"$between": [18, 30]}})
developers = db.select("users", {"skills": {"$contains": "Python"}})

📋 Table of Contents

💾 Installation

Option 1: Copy the Module

Download cafedb.py and place it in your project directory:

from cafedb import CafeDB

Option 2: Install as Package

# Clone repository
git clone https://github.com/yourusername/cafedb.git
cd cafedb

# Install in development mode
pip install -e .

Requirements

  • Python 3.6+
  • No external dependencies (pure Python!)

✨ Core Features

🎯 Simple & Intuitive

  • Human-readable JSON storage format
  • Pythonic API that feels natural
  • Zero configuration required
  • Single file database

Advanced Querying

  • Wildcard pattern matching (*, ?)
  • Comparison operators ($gt, $lt, $between, etc.)
  • String operations ($contains, $startswith, $endswith)
  • Regular expression matching
  • List membership testing ($in, $nin)

🔧 Developer Friendly

  • Verbose mode for debugging
  • Atomic file operations (crash-safe)
  • Detailed error messages
  • Built-in statistics and introspection

📈 Production Ready

  • Automatic backup creation during writes
  • Table metadata management
  • Data validation and type checking
  • Thread-safe operations

🛠️ Basic Operations

Database Management

from cafedb import CafeDB

# Initialize database
db = CafeDB("myapp.json", verbose=True)

# List all tables
tables = db.list_tables()
print(f"Tables: {tables}")

# Check if table exists
if db.exists_table("users"):
    print("Users table exists")

# Get database statistics
stats = db.stats("users")
print(f"Total users: {stats['total_rows']}")

Table Operations

# Create table
db.create_table("products")

# Drop table (be careful!)
db.drop_table("old_table")

# Create table only if it doesn't exist
if not db.exists_table("logs"):
    db.create_table("logs")

CRUD Operations

Insert Data

# Insert single record
db.insert("users", {
    "id": 1,
    "name": "Alice Johnson",
    "age": 28,
    "city": "Paris",
    "email": "alice@example.com",
    "active": True,
    "tags": ["developer", "python", "remote"]
})

# Insert multiple records
users_data = [
    {"name": "Bob Smith", "age": 34, "city": "London"},
    {"name": "Carol Davis", "age": 29, "city": "Berlin"},
    {"name": "David Wilson", "age": 31, "city": "Paris"}
]

for user in users_data:
    db.insert("users", user)

Select Data

# Get all records
all_users = db.select("users")

# Get specific records with filters
paris_users = db.select("users", {"city": "Paris"})

# Count records
total_users = db.count("users")
active_users = db.count("users", {"active": True})

Update Data

# Update with dictionary filters and updates
db.update("users", 
    {"city": "Paris"}, 
    {"timezone": "CET", "updated_at": "2024-01-01"}
)

# Update with custom function
db.update("users",
    {"age": {"$gte": 30}},
    lambda user: {**user, "category": "senior", "discount": 0.1}
)

# Get update count
updated = db.update("users", {"active": False}, {"status": "inactive"})
print(f"Updated {updated} inactive users")

Delete Data

# Delete with filters
deleted = db.delete("users", {"active": False})
print(f"Deleted {deleted} inactive users")

# Delete with complex conditions
db.delete("users", {
    "age": {"$lt": 18},
    "verified": False
})

🔍 Advanced Querying

Wildcard Pattern Matching

# Names starting with 'A'
a_names = db.select("users", {"name": "A*"})

# Gmail users
gmail_users = db.select("users", {"email": "*@gmail.com"})

# Files with specific extension
pdf_files = db.select("documents", {"filename": "*.pdf"})

# Single character wildcard
codes = db.select("products", {"code": "US?"})  # US1, US2, USA, etc.

# Multiple wildcards
patterns = db.select("logs", {"message": "*error*database*"})

Comparison Operators

# Age ranges
adults = db.select("users", {"age": {"$gte": 18}})
seniors = db.select("users", {"age": {"$between": [65, 100]}})
young_adults = db.select("users", {"age": {"$gte": 18, "$lt": 35}})

# Price comparisons
expensive = db.select("products", {"price": {"$gt": 100}})
on_sale = db.select("products", {"discount": {"$ne": 0}})

# Date ranges (works with ISO date strings)
recent = db.select("orders", {
    "created_at": {"$gte": "2024-01-01T00:00:00Z"}
})

String Operations

# Case-insensitive contains
python_devs = db.select("users", {"bio": {"$contains": "python"}})
remote_workers = db.select("users", {"description": {"$contains": "remote"}})

# String starts/ends with
mr_users = db.select("users", {"name": {"$startswith": "Mr."}})
com_emails = db.select("users", {"email": {"$endswith": ".com"}})

# Regular expressions
phone_pattern = db.select("users", {
    "phone": {"$regex": r"\(\d{3}\) \d{3}-\d{4}"}
})

us_phones = db.select("contacts", {
    "phone": {"$regex": r"^\+1"}
})

List Operations

# Value in list
major_cities = db.select("users", {
    "city": {"$in": ["New York", "London", "Paris", "Tokyo"]}
})

# Value not in list
active_users = db.select("users", {
    "status": {"$nin": ["banned", "suspended", "deleted"]}
})

# Multiple list conditions
qualified = db.select("candidates", {
    "skills": {"$contains": "Python"},
    "location": {"$in": ["Remote", "New York", "San Francisco"]},
    "experience": {"$gte": 2}
})

Complex Multi-Field Queries

# Combine multiple condition types
premium_users = db.select("users", {
    "name": "A*",                              # Wildcard
    "age": {"$between": [25, 45]},            # Range
    "city": {"$in": ["Paris", "London"]},     # List membership
    "email": "*@gmail.com",                   # Wildcard
    "bio": {"$contains": "developer"},        # String contains
    "score": {"$gte": 80},                    # Comparison
    "active": True                            # Exact match
})

# Complex business logic
target_customers = db.select("customers", {
    "age": {"$between": [25, 55]},
    "income": {"$gte": 50000},
    "location": {"$nin": ["Rural"]},
    "last_purchase": {"$gte": "2023-01-01"},
    "email": {"$endswith": ".com"},
    "preferences": {"$contains": "premium"}
})

📊 Query Operators Reference

Comparison Operators

Operator Description Example SQL Equivalent
$eq Equal to {"age": {"$eq": 30}} age = 30
$ne Not equal to {"status": {"$ne": "inactive"}} status != 'inactive'
$gt Greater than {"score": {"$gt": 80}} score > 80
$gte Greater than or equal {"age": {"$gte": 18}} age >= 18
$lt Less than {"price": {"$lt": 100}} price < 100
$lte Less than or equal {"discount": {"$lte": 0.5}} discount <= 0.5
$between Between (inclusive) {"age": {"$between": [18, 65]}} age BETWEEN 18 AND 65

List Operators

Operator Description Example SQL Equivalent
$in Value in list {"city": {"$in": ["Paris", "London"]}} city IN ('Paris', 'London')
$nin Value not in list {"status": {"$nin": ["banned"]}} status NOT IN ('banned')

String Operators

Operator Description Example SQL Equivalent
$like Wildcard matching {"name": {"$like": "A*"}} name LIKE 'A%'
$regex Regular expression {"email": {"$regex": ".*@gmail\\.com"}} email REGEXP '.*@gmail\.com'
$contains String contains (case-insensitive) {"bio": {"$contains": "python"}} LOWER(bio) LIKE '%python%'
$startswith String starts with {"name": {"$startswith": "Dr."}} name LIKE 'Dr.%'
$endswith String ends with {"email": {"$endswith": ".edu"}} email LIKE '%.edu'

Wildcard Patterns

Pattern Matches Example
* Any sequence of characters "A*" matches "Alice", "Andrew", "A"
? Single character "A?" matches "Al", "A1", but not "Alice"
*word* Contains word "*dev*" matches "developer", "development"
prefix* Starts with prefix "Dr.*" matches "Dr. Smith", "Dr. Jones"
*suffix Ends with suffix "*.pdf" matches "report.pdf", "doc.pdf"

🗃️ Data Types Support

CafeDB supports all JSON-serializable Python data types:

Primitive Types

db.insert("mixed_data", {
    "string_field": "Hello World",
    "integer_field": 42,
    "float_field": 3.14159,
    "boolean_field": True,
    "null_field": None
})

Collections

db.insert("collections", {
    "list_field": [1, 2, 3, "mixed", True],
    "dict_field": {
        "nested_string": "value",
        "nested_number": 100,
        "deep_nest": {
            "level2": "deep value"
        }
    }
})

Date and Time

from datetime import datetime

db.insert("events", {
    "name": "Conference",
    "start_date": "2024-03-15",
    "start_time": "09:00:00",
    "created_at": datetime.now().isoformat(),
    "timestamp": 1640995200  # Unix timestamp
})

# Query dates (as strings)
recent_events = db.select("events", {
    "start_date": {"$gte": "2024-01-01"}
})

Complex Nested Data

db.insert("users", {
    "name": "Alice Johnson",
    "profile": {
        "personal": {
            "age": 28,
            "location": "Paris"
        },
        "professional": {
            "title": "Senior Developer",
            "skills": ["Python", "JavaScript", "Docker"],
            "experience": 5
        }
    },
    "preferences": {
        "notifications": True,
        "theme": "dark",
        "languages": ["en", "fr"]
    }
})

# Note: Queries work on top-level fields only
# For nested data, store flattened versions or use custom logic

🚀 Performance Guidelines

Optimization Tips

  1. Keep Tables Reasonably Sized

    # Good: < 10,000 records per table
    # Acceptable: 10,000 - 100,000 records
    # Consider alternatives: > 100,000 records
    
  2. Use Appropriate Data Types

    # Good: Use numbers for numeric comparisons
    {"age": 25, "score": 85.5}
    
    # Less efficient: String numbers
    {"age": "25", "score": "85.5"}
    
  3. Optimize Query Patterns

    # Efficient: Exact matches and simple conditions
    db.select("users", {"city": "Paris", "active": True})
    
    # Less efficient: Complex regex on large datasets
    db.select("users", {"bio": {"$regex": ".*complex.*pattern.*"}})
    
  4. Batch Operations

    # Efficient: Batch inserts
    for record in large_dataset:
        db.insert("table", record)
    
    # Consider: Disable verbose mode for bulk operations
    db.verbose = False
    # ... bulk operations ...
    db.verbose = True
    

Performance Benchmarks

Typical performance on modern hardware:

Operation Small Table (<1K) Medium Table (10K) Large Table (100K)
Insert <1ms <1ms 1-5ms
Simple Select <1ms 10-50ms 100-500ms
Complex Query 1-5ms 50-200ms 500ms-2s
Update 1-10ms 50-200ms 500ms-2s
Delete 1-10ms 50-200ms 500ms-2s

💡 Best Practices

Database Design

  1. Table Structure

    # Good: Logical table separation
    db.create_table("users")
    db.create_table("orders") 
    db.create_table("products")
    
    # Avoid: Everything in one table
    db.create_table("everything")
    
  2. Field Naming

    # Good: Consistent, descriptive names
    {
        "user_id": 123,
        "created_at": "2024-01-01T00:00:00Z",
        "is_active": True
    }
    
    # Avoid: Inconsistent naming
    {
        "id": 123,
        "CreatedDate": "2024-01-01",
        "active": 1
    }
    
  3. Data Consistency

    # Good: Consistent data types
    {"age": 25, "score": 85}
    
    # Avoid: Mixed types for same field
    {"age": "25", "score": 85}  # age as string, score as number
    

Query Optimization

  1. Use Specific Filters

    # Good: Specific conditions
    db.select("users", {"city": "Paris", "active": True})
    
    # Less efficient: Broad patterns
    db.select("users", {"name": "*"})  # Returns everyone
    
  2. Combine Conditions Effectively

    # Good: Multiple specific conditions (AND logic)
    db.select("products", {
        "category": "Electronics",
        "price": {"$between": [100, 500]},
        "in_stock": True
    })
    
  3. Use Appropriate Operators

    # Good: Use $between for ranges
    {"age": {"$between": [18, 65]}}
    
    # Less efficient: Multiple conditions
    {"age": {"$gte": 18, "$lte": 65}}
    

Error Prevention

  1. Always Check Table Existence

    if not db.exists_table("users"):
        db.create_table("users")
    
    # Or use try/except
    try:
        db.create_table("users")
    except ValueError:
        pass  # Table already exists
    
  2. Validate Data Before Insert

    def validate_user(user_data):
        required_fields = ["name", "email"]
        for field in required_fields:
            if field not in user_data:
                raise ValueError(f"Missing required field: {field}")
        return user_data
    
    # Use validation
    user = validate_user({"name": "Alice", "email": "alice@example.com"})
    db.insert("users", user)
    
  3. Handle Exceptions Gracefully

    try:
        results = db.select("users", {"invalid_field": {"$unknown": "value"}})
    except ValueError as e:
        print(f"Query error: {e}")
        # Handle error appropriately
    

⚠️ Error Handling

Common Exceptions

from cafedb import CafeDB

db = CafeDB("test.json")

# Table doesn't exist
try:
    db.select("nonexistent_table")
except ValueError as e:
    print(f"Error: {e}")  # "Table 'nonexistent_table' does not exist."

# Table already exists
try:
    db.create_table("users")
    db.create_table("users")  # Will fail
except ValueError as e:
    print(f"Error: {e}")  # "Table 'users' already exists."

# Invalid operator
try:
    db.select("users", {"age": {"$invalid": 25}})
except ValueError as e:
    print(f"Error: {e}")  # "Unknown operator: $invalid"

# Invalid $between format
try:
    db.select("users", {"age": {"$between": [25]}})  # Need 2 values
except ValueError as e:
    print(f"Error: {e}")  # "$between requires array of exactly 2 values"

Error Recovery

# Database file corruption recovery
try:
    db = CafeDB("corrupted.json")
except json.JSONDecodeError:
    print("Database file corrupted, creating new one")
    # Backup corrupted file
    import shutil
    shutil.copy("corrupted.json", "corrupted.json.backup")
    
    # Create fresh database
    Path("corrupted.json").unlink()
    db = CafeDB("corrupted.json")

# Graceful degradation
def safe_query(db, table, filters):
    try:
        return db.select(table, filters)
    except ValueError as e:
        print(f"Query failed: {e}")
        return []  # Return empty results instead of crashing

results = safe_query(db, "users", {"age": {"$gte": 18}})

📚 Examples

Example 1: User Management System

from cafedb import CafeDB
from datetime import datetime

# Initialize database
db = CafeDB("user_management.json", verbose=True)
db.create_table("users")
db.create_table("sessions")

# User registration
def register_user(name, email, age, city):
    # Check if user already exists
    existing = db.select("users", {"email": email})
    if existing:
        raise ValueError("User with this email already exists")
    
    user = {
        "name": name,
        "email": email,
        "age": age,
        "city": city,
        "created_at": datetime.now().isoformat(),
        "active": True,
        "login_count": 0
    }
    
    db.insert("users", user)
    return user

# User authentication simulation
def login_user(email):
    users = db.select("users", {"email": email, "active": True})
    if not users:
        raise ValueError("User not found or inactive")
    
    user = users[0]
    
    # Update login count
    db.update("users", 
        {"email": email},
        lambda u: {**u, "login_count": u.get("login_count", 0) + 1, 
                  "last_login": datetime.now().isoformat()}
    )
    
    # Create session
    session = {
        "email": email,
        "login_time": datetime.now().isoformat(),
        "active": True
    }
    db.insert("sessions", session)
    
    return user

# Analytics
def get_user_analytics():
    total_users = db.count("users")
    active_users = db.count("users", {"active": True})
    recent_users = db.count("users", {
        "created_at": {"$gte": "2024-01-01T00:00:00Z"}
    })
    
    # Most common cities
    all_users = db.select("users")
    city_count = {}
    for user in all_users:
        city = user.get("city", "Unknown")
        city_count[city] = city_count.get(city, 0) + 1
    
    return {
        "total_users": total_users,
        "active_users": active_users,
        "recent_users": recent_users,
        "cities": dict(sorted(city_count.items(), key=lambda x: x[1], reverse=True))
    }

# Usage
register_user("Alice Johnson", "alice@example.com", 28, "Paris")
register_user("Bob Smith", "bob@example.com", 34, "London")

user = login_user("alice@example.com")
print(f"Welcome back, {user['name']}!")

analytics = get_user_analytics()
print(f"Analytics: {analytics}")

Example 2: E-commerce Product Catalog

from cafedb import CafeDB
import uuid

db = CafeDB("ecommerce.json", verbose=True)
db.create_table("products")
db.create_table("orders")

# Product management
def add_product(name, category, price, description, tags):
    product = {
        "id": str(uuid.uuid4()),
        "name": name,
        "category": category,
        "price": price,
        "description": description,
        "tags": tags,
        "in_stock": True,
        "created_at": datetime.now().isoformat()
    }
    
    db.insert("products", product)
    return product

# Advanced product search
def search_products(query=None, category=None, min_price=None, max_price=None, tags=None):
    filters = {}
    
    # Text search in name and description
    if query:
        # This is a limitation - we'd need to search both fields
        # For now, search in name only
        filters["name"] = {"$contains": query}
    
    if category:
        filters["category"] = category
    
    if min_price is not None and max_price is not None:
        filters["price"] = {"$between": [min_price, max_price]}
    elif min_price is not None:
        filters["price"] = {"$gte": min_price}
    elif max_price is not None:
        filters["price"] = {"$lte": max_price}
    
    if tags:
        # For simplicity, check if any tag matches
        # In a real system, you might store tags as separate records
        for tag in tags:
            filters[f"tags"] = {"$contains": tag}
    
    filters["in_stock"] = True
    
    return db.select("products", filters)

# Order management
def create_order(customer_email, product_ids):
    # Validate products exist
    order_items = []
    total_amount = 0
    
    for product_id in product_ids:
        products = db.select("products", {"id": product_id, "in_stock": True})
        if not products:
            raise ValueError(f"Product {product_id} not found or out of stock")
        
        product = products[0]
        order_items.append({
            "product_id": product_id,
            "name": product["name"],
            "price": product["price"]
        })
        total_amount += product["price"]
    
    order = {
        "id": str(uuid.uuid4()),
        "customer_email": customer_email,
        "items": order_items,
        "total_amount": total_amount,
        "status": "pending",
        "created_at": datetime.now().isoformat()
    }
    
    db.insert("orders", order)
    return order

# Analytics
def get_sales_report():
    orders = db.select("orders", {"status": {"$ne": "cancelled"}})
    
    total_orders = len(orders)
    total_revenue = sum(order["total_amount"] for order in orders)
    avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
    
    return {
        "total_orders": total_orders,
        "total_revenue": total_revenue,
        "average_order_value": avg_order_value
    }

# Usage
add_product("Laptop Pro", "Electronics", 1299.99, "High-performance laptop", ["laptop", "computer", "electronics"])
add_product("Wireless Mouse", "Electronics", 29.99, "Ergonomic wireless mouse", ["mouse", "wireless", "accessories"])
add_product("Coffee Mug", "Home", 12.99, "Ceramic coffee mug", ["mug", "coffee", "ceramic"])

# Search examples
electronics = search_products(category="Electronics")
affordable = search_products(max_price=50)
laptops = search_products(query="laptop")

print(f"Found {len(electronics)} electronics products")
print(f"Found {len(affordable)} affordable products")

Example 3: Log Analysis System

from cafedb import CafeDB
import re
from datetime import datetime, timedelta

db = CafeDB("logs.json", verbose=True)
db.create_table("access_logs")
db.create_table("error_logs")

# Log parsing and storage
def parse_and_store_log_line(log_line, log_type="access"):
    # Example log line: "2024-01-01 10:30:45 [INFO] User alice@example.com accessed /dashboard"
    pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)"
    match = re.match(pattern, log_line)
    
    if not match:
        return None
    
    timestamp, level, message = match.groups()
    
    log_entry = {
        "timestamp": timestamp,
        "level": level,
        "message": message,
        "parsed_at": datetime.now().isoformat()
    }
    
    # Extract additional info based on log type
    if log_type == "access":
        # Try to extract user and endpoint
        user_match = re.search(r"User (\S+) accessed (\S+)", message)
        if user_match:
            log_entry["user"] = user_match.group(1)
            log_entry["endpoint"] = user_match.group(2)
        
        db.insert("access_logs", log_entry)
    
    elif log_type == "error":
        # Try to extract error type
        if "database" in message.lower():
            log_entry["error_type"] = "database"
        elif "network" in message.lower():
            log_entry["error_type"] = "network"
        else:
            log_entry["error_type"] = "general"
        
        db.insert("error_logs", log_entry)
    
    return log_entry

# Log analysis functions
def analyze_access_patterns():
    # Most active users
    logs = db.select("access_logs")
    user_counts = {}
    endpoint_counts = {}
    
    for log in logs:
        user = log.get("user", "unknown")
        endpoint = log.get("endpoint", "unknown")
        
        user_counts[user] = user_counts.get(user, 0) + 1
        endpoint_counts[endpoint] = endpoint_counts.get(endpoint, 0) + 1
    
    return {
        "top_users": sorted(user_counts.items(), key=lambda x: x[1], reverse=True)[:10],
        "top_endpoints": sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:10]
    }

def find_errors_in_timeframe(start_time, end_time):
    # Find errors in specific time range
    return db.select("error_logs", {
        "timestamp": {"$gte": start_time, "$lte": end_time}
    })

def get_error_summary():
    # Get error breakdown by type
    errors = db.select("error_logs")
    error_types = {}
    levels = {}
    
    for error in errors:
        error_type = error.get("error_type", "unknown")
        level = error.get("level", "unknown")
        
        error_types[error_type] = error_types.get(error_type, 0) + 1
        levels[level] = levels.get(level, 0) + 1
    
    return {
        "total_errors": len(errors),
        "by_type": error_types,
        "by_level": levels
    }

# Alert system
def check_for_critical_errors():
    # Find critical errors in last hour
    one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()
    
    critical_errors = db.select("error_logs", {
        "level": {"$in": ["CRITICAL", "ERROR"]},
        "timestamp": {"$gte": one_hour_ago}
    })
    
    if len(critical_errors) > 10:
        return {
            "alert": True,
            "message": f"High number of critical errors: {len(critical_errors)}",
            "errors": critical_errors[:5]  # Return first 5 for review
        }
    
    return {"alert": False}

# Usage
sample_logs = [
    "2024-01-01 10:30:45 [INFO] User alice@example.com accessed /dashboard",
    "2024-01-01 10:31:12 [ERROR] Database connection failed",
    "2024-01-01 10:32:00 [INFO] User bob@example.com accessed /profile",
    "2024-01-01 10:35:23 [ERROR] Network timeout on external API call",
]

for log_line in sample_logs:
    if "ERROR" in log_line or "CRITICAL" in log_line:
        parse_and_store_log_line(log_line, "error")
    else:
        parse_and_store_log_line(log_line, "access")

# Analysis
access_patterns = analyze_access_patterns()
print("Access patterns:", access_patterns)

error_summary = get_error_summary()
print("Error summary:", error_summary)

alerts = check_for_critical_errors()
if alerts["alert"]:
    print(f"ALERT: {alerts['message']}")

Example 4: Content Management System

from cafedb import CafeDB
from datetime import datetime
import hashlib

db = CafeDB("cms.json", verbose=True)
db.create_table("articles")
db.create_table("authors")
db.create_table("categories")

# Author management
def create_author(name, email, bio):
    # Check if author exists
    existing = db.select("authors", {"email": email})
    if existing:
        return existing[0]
    
    author = {
        "id": hashlib.md5(email.encode()).hexdigest()[:8],
        "name": name,
        "email": email,
        "bio": bio,
        "created_at": datetime.now().isoformat(),
        "article_count": 0
    }
    
    db.insert("authors", author)
    return author

# Article management
def publish_article(title, content, author_email, category, tags):
    # Get author
    authors = db.select("authors", {"email": author_email})
    if not authors:
        raise ValueError("Author not found")
    
    author = authors[0]
    
    article = {
        "id": hashlib.md5(f"{title}{author_email}".encode()).hexdigest()[:12],
        "title": title,
        "content": content,
        "author_id": author["id"],
        "author_name": author["name"],
        "category": category,
        "tags": tags,
        "status": "published",
        "views": 0,
        "likes": 0,
        "published_at": datetime.now().isoformat(),
        "updated_at": datetime.now().isoformat()
    }
    
    db.insert("articles", article)
    
    # Update author article count
    db.update("authors", 
        {"email": author_email},
        lambda a: {**a, "article_count": a.get("article_count", 0) + 1}
    )
    
    return article

# Content discovery
def search_articles(keyword=None, category=None, author=None, tag=None, status="published"):
    filters = {"status": status}
    
    if keyword:
        # Search in title and content (limitation: can only search one field)
        # Workaround: search title OR create a full-text search field
        filters["title"] = {"$contains": keyword}
    
    if category:
        filters["category"] = category
    
    if author:
        filters["author_name"] = {"$contains": author}
    
    if tag:
        # Assuming tags is a comma-separated string or list stored as string
        filters["tags"] = {"$contains": tag}
    
    return db.select("articles", filters)

# Article interactions
def increment_views(article_id):
    articles = db.select("articles", {"id": article_id})
    if not articles:
        return False
    
    db.update("articles",
        {"id": article_id},
        lambda a: {**a, "views": a.get("views", 0) + 1}
    )
    return True

def like_article(article_id):
    articles = db.select("articles", {"id": article_id})
    if not articles:
        return False
    
    db.update("articles",
        {"id": article_id},
        lambda a: {**a, "likes": a.get("likes", 0) + 1}
    )
    return True

# Analytics
def get_content_stats():
    all_articles = db.select("articles", {"status": "published"})
    
    total_articles = len(all_articles)
    total_views = sum(article.get("views", 0) for article in all_articles)
    total_likes = sum(article.get("likes", 0) for article in all_articles)
    
    # Most popular articles
    popular = sorted(all_articles, key=lambda x: x.get("views", 0), reverse=True)[:5]
    
    # Category distribution
    categories = {}
    for article in all_articles:
        cat = article.get("category", "Uncategorized")
        categories[cat] = categories.get(cat, 0) + 1
    
    return {
        "total_articles": total_articles,
        "total_views": total_views,
        "total_likes": total_likes,
        "avg_views": total_views / total_articles if total_articles > 0 else 0,
        "popular_articles": [{"title": a["title"], "views": a["views"]} for a in popular],
        "categories": categories
    }

# Usage
author1 = create_author("Alice Johnson", "alice@example.com", "Tech writer and developer")
author2 = create_author("Bob Smith", "bob@example.com", "Data scientist and ML expert")

article1 = publish_article(
    "Introduction to Python",
    "Python is a versatile programming language...",
    "alice@example.com",
    "Programming",
    ["python", "tutorial", "beginner"]
)

article2 = publish_article(
    "Machine Learning Basics",
    "Machine learning is transforming how we...",
    "bob@example.com",
    "Data Science",
    ["machine-learning", "ai", "data-science"]
)

# Simulate interactions
increment_views(article1["id"])
increment_views(article1["id"])
like_article(article1["id"])

# Search and discover
python_articles = search_articles(keyword="Python")
ml_articles = search_articles(tag="machine-learning")

stats = get_content_stats()
print("Content stats:", stats)

Example 5: Task Management System

from cafedb import CafeDB
from datetime import datetime, timedelta
import uuid

db = CafeDB("tasks.json", verbose=True)
db.create_table("tasks")
db.create_table("projects")
db.create_table("comments")

# Project management
def create_project(name, description, owner):
    project = {
        "id": str(uuid.uuid4()),
        "name": name,
        "description": description,
        "owner": owner,
        "status": "active",
        "created_at": datetime.now().isoformat(),
        "task_count": 0,
        "completed_tasks": 0
    }
    
    db.insert("projects", project)
    return project

# Task management
def create_task(title, description, project_id, assignee, priority="medium", due_date=None):
    task = {
        "id": str(uuid.uuid4()),
        "title": title,
        "description": description,
        "project_id": project_id,
        "assignee": assignee,
        "priority": priority,
        "status": "todo",
        "due_date": due_date,
        "created_at": datetime.now().isoformat(),
        "updated_at": datetime.now().isoformat(),
        "completed_at": None
    }
    
    db.insert("tasks", task)
    
    # Update project task count
    db.update("projects",
        {"id": project_id},
        lambda p: {**p, "task_count": p.get("task_count", 0) + 1}
    )
    
    return task

# Task updates
def update_task_status(task_id, new_status):
    tasks = db.select("tasks", {"id": task_id})
    if not tasks:
        return False
    
    task = tasks[0]
    updates = {
        "status": new_status,
        "updated_at": datetime.now().isoformat()
    }
    
    if new_status == "completed":
        updates["completed_at"] = datetime.now().isoformat()
        
        # Update project completed count
        db.update("projects",
            {"id": task["project_id"]},
            lambda p: {**p, "completed_tasks": p.get("completed_tasks", 0) + 1}
        )
    
    db.update("tasks", {"id": task_id}, updates)
    return True

# Query functions
def get_my_tasks(assignee, status=None):
    filters = {"assignee": assignee}
    if status:
        filters["status"] = status
    return db.select("tasks", filters)

def get_overdue_tasks():
    today = datetime.now().date().isoformat()
    
    # Get all non-completed tasks
    tasks = db.select("tasks", {
        "status": {"$nin": ["completed", "cancelled"]}
    })
    
    # Filter overdue (due_date exists and is in the past)
    overdue = []
    for task in tasks:
        due_date = task.get("due_date")
        if due_date and due_date < today:
            overdue.append(task)
    
    return overdue

def get_high_priority_tasks():
    return db.select("tasks", {
        "priority": "high",
        "status": {"$nin": ["completed", "cancelled"]}
    })

def get_project_progress(project_id):
    projects = db.select("projects", {"id": project_id})
    if not projects:
        return None
    
    project = projects[0]
    tasks = db.select("tasks", {"project_id": project_id})
    
    total_tasks = len(tasks)
    completed_tasks = len([t for t in tasks if t["status"] == "completed"])
    in_progress = len([t for t in tasks if t["status"] == "in_progress"])
    todo = len([t for t in tasks if t["status"] == "todo"])
    
    progress_percent = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0
    
    return {
        "project": project["name"],
        "total_tasks": total_tasks,
        "completed": completed_tasks,
        "in_progress": in_progress,
        "todo": todo,
        "progress_percent": round(progress_percent, 1)
    }

# Team analytics
def get_team_workload():
    tasks = db.select("tasks", {
        "status": {"$nin": ["completed", "cancelled"]}
    })
    
    workload = {}
    for task in tasks:
        assignee = task.get("assignee", "Unassigned")
        if assignee not in workload:
            workload[assignee] = {
                "total": 0,
                "high_priority": 0,
                "overdue": 0
            }
        
        workload[assignee]["total"] += 1
        
        if task.get("priority") == "high":
            workload[assignee]["high_priority"] += 1
        
        due_date = task.get("due_date")
        if due_date and due_date < datetime.now().date().isoformat():
            workload[assignee]["overdue"] += 1
    
    return workload

# Usage
project = create_project(
    "Website Redesign",
    "Complete redesign of company website",
    "alice@example.com"
)

task1 = create_task(
    "Design homepage mockup",
    "Create initial design concepts",
    project["id"],
    "bob@example.com",
    priority="high",
    due_date=(datetime.now() + timedelta(days=3)).date().isoformat()
)

task2 = create_task(
    "Review color scheme",
    "Finalize brand colors",
    project["id"],
    "alice@example.com",
    priority="medium",
    due_date=(datetime.now() + timedelta(days=5)).date().isoformat()
)

# Update task status
update_task_status(task1["id"], "in_progress")

# Get task lists
my_tasks = get_my_tasks("alice@example.com")
high_priority = get_high_priority_tasks()
overdue = get_overdue_tasks()

# Project tracking
progress = get_project_progress(project["id"])
print("Project progress:", progress)

workload = get_team_workload()
print("Team workload:", workload)

🔧 API Reference

Database Class

__init__(db_path: str, verbose: bool = False)

Initialize or open a database.

Parameters:

  • db_path (str): Path to JSON database file
  • verbose (bool): Enable detailed logging

Example:

db = CafeDB("myapp.json", verbose=True)

create_table(table_name: str)

Create a new table.

Parameters:

  • table_name (str): Name of the table to create

Raises:

  • ValueError: If table already exists

Example:

db.create_table("users")

drop_table(table_name: str)

Delete a table and all its data.

Parameters:

  • table_name (str): Name of the table to drop

Raises:

  • ValueError: If table doesn't exist

Example:

db.drop_table("old_table")

insert(table_name: str, row: dict)

Insert a new record into a table.

Parameters:

  • table_name (str): Name of the table
  • row (dict): Data to insert

Raises:

  • ValueError: If table doesn't exist

Example:

db.insert("users", {"name": "Alice", "age": 30})

select(table_name: str, filters: Union[Dict, Callable] = None) -> List[dict]

Query records from a table.

Parameters:

  • table_name (str): Name of the table
  • filters (dict or callable): Filter conditions or function

Returns:

  • List of matching records

Raises:

  • ValueError: If table doesn't exist or invalid operator

Examples:

# All records
all_users = db.select("users")

# With filters
adults = db.select("users", {"age": {"$gte": 18}})

# With custom function
young = db.select("users", lambda u: u.get("age", 0) < 25)

update(table_name: str, filters: Union[Dict, Callable], updater: Union[Dict, Callable]) -> int

Update records matching filters.

Parameters:

  • table_name (str): Name of the table
  • filters (dict or callable): Filter conditions
  • updater (dict or callable): Updates to apply

Returns:

  • Number of records updated

Examples:

# Update with dict
count = db.update("users", {"city": "Paris"}, {"timezone": "CET"})

# Update with function
count = db.update("users", 
    {"age": {"$gte": 65}},
    lambda u: {**u, "category": "senior"}
)

delete(table_name: str, filters: Union[Dict, Callable]) -> int

Delete records matching filters.

Parameters:

  • table_name (str): Name of the table
  • filters (dict or callable): Filter conditions

Returns:

  • Number of records deleted

Example:

count = db.delete("users", {"active": False})

count(table_name: str, filters: Union[Dict, Callable] = None) -> int

Count records matching filters.

Parameters:

  • table_name (str): Name of the table
  • filters (dict or callable, optional): Filter conditions

Returns:

  • Number of matching records

Example:

total = db.count("users")
active = db.count("users", {"active": True})

list_tables() -> List[str]

Get list of all tables in database.

Returns:

  • List of table names

Example:

tables = db.list_tables()
print(f"Tables: {', '.join(tables)}")

exists_table(table_name: str) -> bool

Check if a table exists.

Parameters:

  • table_name (str): Name of the table

Returns:

  • True if table exists, False otherwise

Example:

if db.exists_table("users"):
    print("Users table exists")

stats(table_name: str) -> dict

Get detailed statistics about a table.

Parameters:

  • table_name (str): Name of the table

Returns:

  • Dictionary containing table statistics

Example:

stats = db.stats("users")
print(f"Total rows: {stats['total_rows']}")
print(f"Fields: {stats['fields']}")

🔄 Migration Guide

From Legacy Function-Based Queries

If you're using the old callable filter style:

# Old style (still supported)
results = db.select("users", lambda u: u.get("age", 0) > 25)

# New style (recommended)
results = db.select("users", {"age": {"$gt": 25}})

From SQL Databases

Common SQL patterns translated to CafeDB:

-- SQL: SELECT * FROM users WHERE age >= 18
# CafeDB
db.select("users", {"age": {"$gte": 18}})
-- SQL: SELECT * FROM users WHERE city IN ('Paris', 'London')
# CafeDB
db.select("users", {"city": {"$in": ["Paris", "London"]}})
-- SQL: SELECT * FROM users WHERE name LIKE 'A%'
# CafeDB
db.select("users", {"name": "A*"})
-- SQL: SELECT * FROM users WHERE age BETWEEN 18 AND 65
# CafeDB
db.select("users", {"age": {"$between": [18, 65]}})

🤝 Contributing

Contributions are welcome! Here's how you can help:

  1. Report Bugs: Open an issue with detailed reproduction steps
  2. Suggest Features: Describe your use case and proposed solution
  3. Submit Pull Requests: Follow the coding style and add tests
  4. Improve Documentation: Fix typos, add examples, clarify instructions

📝 License

MIT License - see LICENSE file for details.

🙋 FAQ

Q: Can CafeDB handle millions of records?
A: CafeDB is optimized for small to medium datasets (< 100K records). For larger datasets, consider a traditional database.

Q: Is CafeDB thread-safe?
A: The current implementation uses file-level locking, which provides basic thread safety. For high-concurrency scenarios, use a proper database.

Q: Can I use CafeDB in production?
A: CafeDB is great for prototypes, small applications, and internal tools. For mission-critical production systems with high load, use PostgreSQL, MongoDB, etc.

Q: How do I backup my database?
A: Simply copy the JSON file: cp mydb.json mydb.backup.json

Q: Can I query nested fields?
A: Currently, queries work on top-level fields only. For nested data, flatten it or use custom logic.

Q: What about JOIN operations?
A: CafeDB doesn't support JOINs. You'll need to query multiple tables and combine results in your application code.

Q: How do I implement full-text search?
A: Use the $contains operator for simple text search, or $regex for pattern matching. For advanced full-text search, integrate with a dedicated search engine.


Made with ☕ by the CafeDB Team

Simple databases for simple needs

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cafedb-0.0.2.tar.gz (43.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cafedb-0.0.2-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file cafedb-0.0.2.tar.gz.

File metadata

  • Download URL: cafedb-0.0.2.tar.gz
  • Upload date:
  • Size: 43.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.2

File hashes

Hashes for cafedb-0.0.2.tar.gz
Algorithm Hash digest
SHA256 9f55bd241511ba0262ae2e871cb188d3630eb885f109c135b7d9064bd1eda5ae
MD5 bbb7e868a44099948679ef4035c972e8
BLAKE2b-256 23fce65af5b5e9e97054c479ed685c9b0cd3a25c1183eba0b9adbf5d7d3d6cf0

See more details on using hashes here.

File details

Details for the file cafedb-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: cafedb-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 20.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.2

File hashes

Hashes for cafedb-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 389dbceb6f34ee08249e299fb6c79008ecf8fc5172e060f68045f7d4f892073a
MD5 52130e7328ebc021cbfe739668ff05a3
BLAKE2b-256 17c3396dcbafde7966ffade2182d2564ef25f5f6cebd3536f3bcb7190075e7fe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page