Skip to main content

PySpark-like DataFrame API that compiles to SQL

Project description

Moltres

CI Python 3.9+ License: MIT Code style: ruff

A PySpark-inspired DataFrame API that compiles to SQL and runs on your existing database.

Moltres provides a familiar DataFrame API similar to PySpark, but instead of running on a Spark cluster, it compiles your operations into ANSI SQL and executes them directly against your database through SQLAlchemy. Compose column expressions, joins, aggregates, and mutations with the same ergonomics you'd expect from Spark—all while leveraging your existing SQL infrastructure.

✨ Features

  • 🚀 PySpark-like API - Familiar DataFrame operations (select, filter, join, groupBy, etc.)
  • 🗄️ SQL Compilation - All operations compile to ANSI SQL and run on your database
  • 📊 Multiple Formats - Read/write CSV, JSON, JSONL, Parquet, and more
  • 🌊 Streaming Support - Handle datasets larger than memory with chunked processing
  • 🔧 Type Safe - Full type hints and mypy support
  • 🎯 Zero Dependencies - Works with just SQLAlchemy (pandas/polars optional)
  • 🔒 Security First - Built-in SQL injection prevention and validation
  • Performance Monitoring - Optional hooks for query performance tracking
  • 🌍 Environment Config - Configure via environment variables for 12-factor apps

🆕 What's New in 0.2.0

  • Environment Variable Support - Configure Moltres via MOLTRES_DSN, MOLTRES_POOL_SIZE, etc.
  • Performance Monitoring Hooks - Track query execution time with register_performance_hook()
  • Enhanced Security - Comprehensive SQL injection prevention and security documentation
  • Modular Architecture - Refactored file readers into organized, maintainable modules
  • Comprehensive Testing - 113 test cases covering edge cases, security, and error handling
  • Better Documentation - Security guide, troubleshooting, and examples documentation

📋 Requirements

  • Python 3.9+
  • SQLAlchemy 2.0+ (for database connectivity)
  • A supported SQLAlchemy driver (SQLite, PostgreSQL, MySQL, etc.)

📦 Installation

pip install moltres

For optional dependencies:

# For pandas support
pip install moltres[pandas]

# For polars support
pip install moltres[polars]

# For both
pip install moltres[pandas,polars]

🚀 Quick Start

from moltres import col, connect
from moltres.expressions.functions import sum

# Connect to your database
db = connect("sqlite:///example.db")

# Compose queries lazily
df = (
    db.table("orders")
    .select()
    .join(db.table("customers").select(), on=[("customer_id", "id")])
    .where(col("customers.active") == True)  # noqa: E712
    .group_by("customers.country")
    .agg(sum(col("orders.amount")).alias("total_amount"))
)

# Execute and get results
results = df.collect()  # Returns list of dicts by default

💡 Why Moltres?

Moltres bridges the gap between PySpark's elegant DataFrame API and traditional SQL databases:

  • No Spark Cluster Required - Run on your existing SQL database
  • Familiar API - PySpark developers feel at home immediately
  • Type Safe - Full type hints for better IDE support and fewer bugs
  • Production Ready - Environment variables, connection pooling, monitoring hooks
  • Secure by Default - SQL injection prevention built-in
  • Flexible - Works with SQLite, PostgreSQL, MySQL, and more

📖 Core Concepts

Lazy Evaluation

All DataFrame operations are lazy—they build a logical plan that only executes when you call collect():

# This doesn't execute any SQL yet
df = db.table("users").select().where(col("age") > 18)

# SQL is compiled and executed here
results = df.collect()

Column Expressions

Build complex expressions using column operations:

from moltres.expressions.functions import sum, avg, count, concat

df = (
    db.table("sales")
    .select(
        col("product"),
        (col("price") * col("quantity")).alias("revenue"),
        concat(col("first_name"), lit(" "), col("last_name")).alias("full_name"),
    )
    .where(col("date") >= "2024-01-01")
    .group_by("product")
    .agg(
        sum(col("revenue")).alias("total_revenue"),
        avg(col("price")).alias("avg_price"),
        count("*").alias("order_count"),
    )
)

📥 Reading Data

From Database Tables

# Simple table read
df = db.read.table("customers")

# With column selection
df = db.table("customers").select("id", "name", "email")

From Files

Moltres supports reading from various file formats:

# CSV files
df = db.read.csv("data.csv")
df = db.read.option("delimiter", "|").csv("pipe_delimited.csv")
df = db.read.option("header", False).schema([...]).csv("no_header.csv")

# JSON files
df = db.read.json("data.json")  # Array of objects
df = db.read.jsonl("data.jsonl")  # One JSON object per line

# Parquet files (requires pandas and pyarrow)
df = db.read.parquet("data.parquet")

# Text files (one line per row)
df = db.read.text("log.txt", column_name="line")

# Generic format reader
df = db.read.format("csv").option("header", True).load("data.csv")

Schema Inference and Explicit Schemas

Schema is automatically inferred from data, but you can provide explicit schemas:

from moltres.table.schema import ColumnDef

schema = [
    ColumnDef(name="id", type_name="INTEGER"),
    ColumnDef(name="name", type_name="TEXT"),
    ColumnDef(name="score", type_name="REAL"),
]

df = db.read.schema(schema).csv("data.csv")

File Format Options:

  • CSV: header (default: True), delimiter (default: ","), inferSchema (default: True)
  • JSON: multiline (default: False) - if True, reads as JSONL
  • Parquet: Requires pandas and pyarrow

Note: File-based readers materialize data into memory (not lazy) since files aren't in the SQL database.

📤 Writing Data

To Database Tables

# Write with automatic schema inference and table creation
df.write.save_as_table("target_table")

# Write modes
df.write.mode("append").save_as_table("target")  # Add to existing (default)
df.write.mode("overwrite").save_as_table("target")  # Replace contents
df.write.mode("error_if_exists").save_as_table("target")  # Fail if exists

# Insert into existing table (table must exist)
df.write.insertInto("existing_table")

# With explicit schema
from moltres.table.schema import ColumnDef
schema = [
    ColumnDef(name="id", type_name="INTEGER"),
    ColumnDef(name="name", type_name="TEXT"),
]
df.write.schema(schema).save_as_table("target")

To Files

# Various formats
df.write.csv("output.csv")
df.write.json("output.json")
df.write.jsonl("output.jsonl")
df.write.parquet("output.parquet")  # Requires pandas and pyarrow

# Generic save
df.write.save("output.csv")  # Infers format from extension
df.write.save("data.txt", format="csv")  # Explicit format

# With options
df.write.option("header", True).option("delimiter", "|").csv("output.csv")
df.write.option("compression", "gzip").parquet("output.parquet")

# Partitioned writes
df.write.partitionBy("country", "year").csv("partitioned_data")

File Formats:

  • CSV: Standard comma-separated values (options: header, delimiter)
  • JSON: Array of objects (options: indent)
  • JSONL: One JSON object per line
  • Parquet: Columnar format (requires pandas and pyarrow, options: compression)

🌊 Streaming for Large Datasets

Moltres supports streaming operations for datasets larger than available memory:

# Enable streaming mode
df = db.read.stream().option("chunk_size", 10000).csv("large_file.csv")

# Process chunks one at a time
for chunk in df.collect(stream=True):
    process_chunk(chunk)

# Or materialize all data (backward compatible)
all_rows = df.collect()  # Still works, materializes all chunks

# Streaming writes
df.write.stream().mode("overwrite").save_as_table("large_table")
df.write.stream().csv("output.csv")

# Streaming SQL queries
df = db.table("large_table").select()
for chunk in df.collect(stream=True):
    process_chunk(chunk)

Streaming Options:

  • .stream(): Enable streaming mode (default: False for backward compatibility)
  • .option("chunk_size", N): Set chunk size for reads (default: 10000)
  • .option("batch_size", N): Set batch size for SQL inserts (default: 10000)
  • collect(stream=True): Return iterator of row chunks instead of materializing

When to Use Streaming:

  • Files or tables larger than available RAM
  • Processing data in batches for memory efficiency
  • Incremental processing pipelines
  • Large data transformations

🗄️ Table Management

Creating Tables

from moltres import column, connect

db = connect("sqlite:///example.db")

# Create a table with schema definition
customers = db.create_table(
    "customers",
    [
        column("id", "INTEGER", nullable=False, primary_key=True),
        column("name", "TEXT", nullable=False),
        column("email", "TEXT", nullable=True),
        column("active", "INTEGER", default=1),
    ],
)

# Insert data
customers.insert([
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"},
])

# Drop tables
db.drop_table("customers")

The column() helper accepts:

  • name: Column name
  • type_name: SQL type (e.g., "INTEGER", "TEXT", "REAL", "VARCHAR(255)")
  • nullable: Whether the column allows NULL (default: True)
  • default: Default value for the column (optional)
  • primary_key: Whether this is a primary key column (default: False)

✏️ Data Mutations

Insert, update, and delete operations run eagerly:

from moltres import col

customers = db.table("customers")

# Insert rows (batch optimized)
customers.insert([
    {"id": 1, "name": "Alice", "active": 1},
    {"id": 2, "name": "Bob", "active": 0},
])

# Update rows
customers.update(where=col("id") == 2, set={"active": 1})

# Delete rows
customers.delete(where=col("active") == 0)

📊 Result Formats

By default, collect() returns a list of dictionaries (fetch_format="records"), so Moltres works even when pandas/polars are unavailable. You can configure the result format when connecting:

# Default: list of dicts
db = connect("sqlite:///example.db")
results = df.collect()  # List[Dict[str, Any]]

# Pandas DataFrame (requires pandas)
db = connect("sqlite:///example.db", fetch_format="pandas")
results = df.collect()  # pandas.DataFrame

# Polars DataFrame (requires polars)
db = connect("sqlite:///example.db", fetch_format="polars")
results = df.collect()  # polars.DataFrame

⚙️ Configuration

Programmatic Configuration

db = connect(
    "postgresql://user:pass@host/dbname",
    echo=True,  # Enable SQL logging
    fetch_format="pandas",
    pool_size=10,
    max_overflow=5,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True,  # Connection health checks
)

Environment Variables

Moltres supports configuration via environment variables for easier deployment (12-factor app friendly):

export MOLTRES_DSN="postgresql://user:pass@host/dbname"
export MOLTRES_POOL_SIZE=10
export MOLTRES_POOL_PRE_PING=true
export MOLTRES_FETCH_FORMAT="pandas"

Then in your code:

from moltres import connect

# Uses MOLTRES_DSN from environment
db = connect()

Supported environment variables:

  • MOLTRES_DSN: Database connection string
  • MOLTRES_ECHO: Enable SQL logging (true/false)
  • MOLTRES_FETCH_FORMAT: "records", "pandas", or "polars"
  • MOLTRES_DIALECT: Override SQL dialect
  • MOLTRES_POOL_SIZE: Connection pool size
  • MOLTRES_MAX_OVERFLOW: Maximum pool overflow
  • MOLTRES_POOL_TIMEOUT: Pool timeout in seconds
  • MOLTRES_POOL_RECYCLE: Connection recycle time
  • MOLTRES_POOL_PRE_PING: Enable connection health checks (true/false)

Configuration Precedence: Programmatic arguments > Environment variables > Defaults

📈 Performance Monitoring

Moltres provides optional performance monitoring hooks for tracking query execution:

from moltres.engine import register_performance_hook

def log_slow_queries(sql: str, elapsed: float, metadata: dict):
    if elapsed > 1.0:
        print(f"Slow query ({elapsed:.2f}s): {sql[:100]}")
        print(f"  Rows affected: {metadata.get('rowcount', 'N/A')}")

register_performance_hook("query_end", log_slow_queries)

# Now all queries will be monitored
db = connect("sqlite:///example.db")
df.collect()  # Slow queries will be logged

# Unregister when done
from moltres.engine import unregister_performance_hook
unregister_performance_hook("query_end", log_slow_queries)

Available Events:

  • query_start: Fired when a query begins execution
  • query_end: Fired when a query completes (includes elapsed time and metadata)

🔒 Security

Moltres includes built-in security features to prevent SQL injection:

  • SQL Identifier Validation - All table and column names are validated
  • Parameterized Queries - All user data is passed as parameters, never string concatenation
  • Input Sanitization - Comprehensive validation of identifiers and inputs

See docs/SECURITY.md for security best practices and guidelines.

📚 Advanced Examples

Complex Joins and Aggregations

from moltres import col
from moltres.expressions.functions import sum, avg, count

# Multi-table join with aggregations
df = (
    db.table("orders")
    .select()
    .join(db.table("customers").select(), on=[("customer_id", "id")])
    .join(db.table("products").select(), on=[("product_id", "id")])
    .where(col("orders.date") >= "2024-01-01")
    .group_by("customers.country", "products.category")
    .agg(
        sum(col("orders.amount")).alias("total_revenue"),
        avg(col("orders.amount")).alias("avg_order_value"),
        count("*").alias("order_count"),
    )
    .order_by(col("total_revenue").desc())
    .limit(10)
)

results = df.collect()

Window Functions

# Complex expressions with window functions
df = (
    db.table("sales")
    .select(
        col("product"),
        col("amount"),
        col("date"),
        (col("amount") - avg(col("amount")).over()).alias("deviation_from_avg"),
    )
    .where(col("date") >= "2024-01-01")
)

Complete ETL Pipeline

# Complete ETL pipeline
db = connect("postgresql://user:pass@localhost/warehouse")

# Extract: Read from CSV
raw_data = db.read.csv("raw_sales.csv")

# Transform: Clean and aggregate
cleaned = (
    raw_data
    .select(
        col("order_id"),
        col("product").upper().alias("product"),
        col("amount").cast("REAL"),
        col("date"),
    )
    .where(col("amount") > 0)
    .group_by("product", "date")
    .agg(sum(col("amount")).alias("daily_revenue"))
)

# Load: Write to database
cleaned.write.mode("overwrite").save_as_table("daily_sales_summary")

🛠️ Supported Operations

DataFrame Operations

  • select() - Project columns
  • where() / filter() - Filter rows
  • join() - Join with other DataFrames
  • group_by() / groupBy() - Group rows
  • agg() - Aggregate functions
  • order_by() - Sort rows
  • limit() - Limit number of rows

Column Expressions

  • Arithmetic: +, -, *, /, %
  • Comparisons: ==, !=, <, >, <=, >=
  • Boolean: &, |, ~
  • Functions: sum(), avg(), count(), concat(), coalesce(), upper(), lower(), etc.

Supported SQL Dialects

  • ✅ SQLite
  • ✅ PostgreSQL
  • ✅ MySQL (basic support)
  • ✅ Other SQLAlchemy-supported databases (with ANSI SQL fallback)

🧪 Development

Setup

# Clone the repository
git clone https://github.com/eddiethedean/moltres.git
cd moltres

# Install in development mode
pip install -e ".[dev]"

# Install pre-commit hooks
pre-commit install

Running Tests

# Run all tests
pytest

# Run tests in parallel
pytest -n 9

# Run with coverage
pytest --cov=src/moltres --cov-report=html

Code Quality

# Linting
ruff check .

# Formatting
ruff format .

# Type checking
mypy src

📖 Documentation

Additional documentation is available in the docs/ directory:

🤝 Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

📄 License

MIT License - see LICENSE file for details.

👤 Author

Odos Matthews

🙏 Acknowledgments

  • Inspired by PySpark's DataFrame API
  • Built on SQLAlchemy for database connectivity
  • Thanks to all contributors and users

Made with ❤️ for the Python data community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

moltres-0.2.0.tar.gz (42.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

moltres-0.2.0-py3-none-any.whl (49.7 kB view details)

Uploaded Python 3

File details

Details for the file moltres-0.2.0.tar.gz.

File metadata

  • Download URL: moltres-0.2.0.tar.gz
  • Upload date:
  • Size: 42.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.8.18

File hashes

Hashes for moltres-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f894cd13f3356772a2b72c29381db2f224513cd9feb013aad7f48347a3a36c7f
MD5 d672d5b18e6430f658474e02e837763b
BLAKE2b-256 d082e73c8dd79a3d414c18dbfa73a38fd4ee01482b468dcc87e03883a58b4a07

See more details on using hashes here.

File details

Details for the file moltres-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: moltres-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 49.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.8.18

File hashes

Hashes for moltres-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5eae49f413a637188c8693bd0efd78baac16442e1992c75a5d6e774d47bd1a1a
MD5 4ef923a256c16c5aad666453584dcb28
BLAKE2b-256 20f0c8bf719a7d010e1fa03a44231ac7aac59831827ed186e7de39839fcfe760

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page