DataFrame API with SQL pushdown execution and real SQL CRUD - the missing layer for SQL in Python
Project description
Moltres
The Missing DataFrame Layer for SQL in Python
MOLTRES: Modern Operations Layer for Transformations, Relational Execution, and SQL
Moltres fills a major gap in the Python data ecosystem: it's the only library that combines a DataFrame API (like Pandas/Polars), SQL pushdown execution (no data loading into memory), and real SQL CRUD operations (INSERT, UPDATE, DELETE) in one unified interface.
Transform millions of rows using familiar DataFrame operations—all executed directly in SQL without materializing data. Update, insert, and delete with column-aware, type-safe operations. No juggling between Pandas, SQLAlchemy, and raw SQL. Just one library that does it all.
🎉 Version 0.9.0 Milestone: Moltres now achieves ~98% API compatibility with PySpark for core DataFrame operations, making migration from PySpark seamless while maintaining SQL-first design principles.
📑 Table of Contents
- Features
- What Makes Moltres Unique
- Installation
- Quick Start
- Why Moltres?
- Core Concepts
- Reading Data
- Writing Data
- Streaming for Large Datasets
- Table Management
- Data Mutations
- Result Formats
- Configuration
- Performance Monitoring
- Security
- Advanced Examples
- Supported Operations
- Development
- Documentation
- Contributing
✨ Features
Core Capabilities
- 🚀 DataFrame API - Familiar operations (select, filter, join, groupBy, etc.) like Pandas/Polars/PySpark
- 🎯 98% PySpark API Compatibility - Near-complete compatibility with PySpark's DataFrame API for seamless migration
- 🗄️ SQL Pushdown Execution - All operations compile to SQL and run on your database—no data loading into memory
- 📊 Operates Directly on SQL Tables - Transform tables without materialization
SQL & Query Features
- 🔤 Raw SQL Support - Execute raw SQL queries with
db.sql()and get back lazy DataFrames (similar to PySpark'sspark.sql()) - 📝 SQL Expression Selection - Write SQL expressions directly with
selectExpr()(similar to PySpark'sselectExpr()) - ✏️ Real SQL CRUD - INSERT, UPDATE, DELETE operations with DataFrame-style syntax
- 🔄 Advanced Operations - Pivot, window functions, semi-joins, anti-joins, and more
Data I/O
- 📊 Multiple Formats - Read/write CSV, JSON, JSONL, Parquet, and more
- 🌊 Streaming Support - Handle datasets larger than memory with chunked processing
- 📥 Flexible Reading - Read from files, databases, or raw SQL queries
Developer Experience
- 🔧 Type Safe - Full type hints with strict mypy checking and custom type stubs for dependencies
- 🎯 Zero Dependencies - Works with just SQLAlchemy (pandas/polars optional)
- ⚡ Async Support - Full async/await support for all operations (optional dependency)
- 🔒 Security First - Built-in SQL injection prevention and validation
- ⚡ Performance Monitoring - Optional hooks for query performance tracking
- 🌍 Environment Config - Configure via environment variables for 12-factor apps
🔥 What Makes Moltres Unique
Moltres is the only Python library that provides:
| Feature | Pandas/Polars | Ibis | SQLAlchemy | SQLModel | Moltres |
|---|---|---|---|---|---|
| DataFrame API | ✔ | ✔ | ❌ | ❌ | ✔ |
| SQL Pushdown Execution | ❌ | ✔ | ✔ | ✔ | ✔ |
| Row-Level INSERT/UPDATE/DELETE | ❌ | ❌ | ✔ | ✔ | ✔ |
| Lazy query building | ✔ (Polars) | ✔ | ⚠️ | ⚠️ | ✔ |
| Operates directly on SQL tables | ⚠️ limited | ✔ | ✔ | ✔ | ✔ |
| Column-oriented transformations | ✔ | ✔ | ❌ | ❌ | ✔ |
The combination of DataFrame API + SQL pushdown + CRUD does not exist anywhere else in Python.
Key Differentiators
- Only library with DataFrame API + SQL pushdown + CRUD - No other Python library offers this combination
- No data loading into memory for transformations - All DataFrame operations execute directly in SQL
- Works with existing SQL infrastructure - No cluster required, works with SQLite, PostgreSQL, MySQL, and more
- Type-safe CRUD operations - Validated, column-aware INSERT, UPDATE, DELETE with DataFrame-style syntax
- SQL-first design - Focuses on providing full SQL feature support through a DataFrame API, not replicating every PySpark feature. Features are included only if they map to SQL/SQLAlchemy capabilities and align with SQL pushdown execution.
🆕 What's New in 0.9.0
🎉 Major Milestone: 98% PySpark API Compatibility!
Moltres now achieves ~98% API compatibility with PySpark for core DataFrame operations, making migration from PySpark seamless while maintaining SQL-first design principles.
🚀 New Features
Raw SQL & Expression Support
# Raw SQL queries (PySpark-style)
df = db.sql("SELECT * FROM users WHERE id = :id", id=1)
df = db.sql("SELECT * FROM orders").where(col("amount") > 100)
# SQL expression selection
df.selectExpr("amount * 1.1 as with_tax", "UPPER(name) as name_upper")
df.selectExpr("(amount + tax) * 1.1 as total")
Enhanced DataFrame API
# Select all columns
df.select("*") # Explicitly select all
df.select("*", col("new_col")) # All columns plus new ones
# SQL string predicates
df.filter("age > 18")
df.where("amount >= 100 AND status = 'active'")
# String and dictionary aggregations
df.group_by("category").agg("amount") # String (defaults to sum)
df.group_by("category").agg({"amount": "sum", "price": "avg"}) # Dict
# Pivot on GroupBy (PySpark-style)
df.group_by("category").pivot("status").agg("amount") # Auto-infers values
# Explode function
df.select(explode(col("array_col")).alias("value"))
# PySpark-style aliases
df.orderBy("name") # Alias for order_by()
df.sort("name") # Alias for order_by()
df.write.saveAsTable("table") # Alias for save_as_table()
# Improved withColumn
df.withColumn("new_col", col("amount") * 1.1) # Add or replace
All major DataFrame methods now match PySpark's API!
Version 0.8.0
- PySpark-style
db.read.table()API - Newdb.read.table()anddb.read.*methods that match PySpark'sspark.read.table()pattern. Read database tables and files with a familiar API:df = db.read.table("customers")ordf = db.read.csv("data.csv"). Returns lazy DataFrame objects for consistency with PySpark. - LazyRecords for
db.read.records.*- Thedb.read.records.*methods now return lazyLazyRecords/AsyncLazyRecordsobjects that materialize on-demand when used. Records automatically materialize when you use Sequence operations (len(),[], iteration), callinsert_into(), or use them withcreateDataFrame(). Explicitly materialize with.collect()when needed. This provides better performance by deferring file reads until necessary. - Lazy CRUD and DDL Operations - All DataFrame CRUD and DDL operations are now lazy, requiring an explicit
.collect()call for execution. This improves composability and aligns with PySpark's lazy evaluation model. - Transaction Management - All operations within a single
.collect()call are part of a single session that rolls back all changes if any failure occurs. - Batch Operation API - New
db.batch()context manager to queue multiple lazy operations and execute them together within a single transaction. - Type Checking Improvements - Added pandas-stubs for proper mypy type checking and fixed type compatibility issues.
Version 0.7.0
- PostgreSQL and MySQL Testing Infrastructure - Comprehensive test support for multiple database backends with ephemeral database instances, database fixtures, and extensive test coverage
- Enhanced Type Safety - Type overloads for
collect()methods with improved type inference and better IDE support - Improved Code Quality - Fixed all mypy type checking errors and ruff linting issues, with comprehensive type annotations throughout
- Database Connection Management - Added
close()methods toDatabaseandAsyncDatabaseclasses for proper resource cleanup - Cross-Database Compatibility - Fixed PostgreSQL and MySQL-specific issues with JSON extraction, array functions, and async DSN parsing
- Test Coverage - 301 passing tests across SQLite, PostgreSQL, and MySQL with async support
Version 0.6.0
- Null Handling Convenience Methods - New
naproperty on DataFrame:df.na.drop()anddf.na.fill(value)for convenient null handling - Random Sampling - New
sample(fraction, seed=None)method for random row sampling with dialect-specific SQL compilation - Enhanced Type System - New data types:
decimal(),uuid(),json(), andinterval()helpers with full SQL support and dialect-specific compilation - Interval Arithmetic - New
date_add()anddate_sub()functions for date/time interval operations - Join Hints - New
hintsparameter forjoin()method to provide query optimization hints - Complex Join Conditions - Enhanced
join()method to support arbitrary Column expressions in join conditions - Query Plan Analysis - New
explain(analyze=False)method to return query execution plans - Pivot Operations - New
pivot()method for data reshaping with cross-dialect compatibility
Version 0.5.0
- Compressed File Reading - Automatic detection and support for gzip, bz2, and xz compression in CSV, JSON, JSONL, and text file readers (both sync and async)
- Array/JSON Functions - New functions for working with JSON and array data:
json_extract(),array(),array_length(),array_contains(),array_position()with dialect-specific SQL compilation - Collect Aggregations - New aggregation functions
collect_list()andcollect_set()for array aggregation (usesARRAY_AGGin PostgreSQL,group_concatin SQLite/MySQL) - Semi-Join and Anti-Join - New
semi_join()andanti_join()methods that compile to efficientEXISTS/NOT EXISTSsubqueries - MERGE/UPSERT Operations - New
merge()method on tables for upsert operations with dialect-specific support (SQLiteON CONFLICT, PostgreSQLMERGE, MySQLON DUPLICATE KEY) - Comprehensive Test Coverage - All new features include full test coverage with execution tests
Previous Releases
Version 0.4.0
- Strict Type Checking - Full mypy strict mode compliance with comprehensive type annotations
- Type Stubs for PyArrow - Custom type stubs to provide type information for pyarrow library
- PEP 561 Compliance - Added
py.typedmarker file - Enhanced Type Safety - Complete type annotations with improved type inference
Version 0.3.0
- Separation of File Reads and SQL Operations - File readers return
Recordsinstead ofDataFrame - Records Class - New
RecordsandAsyncRecordsclasses for file data - Full Async/Await Support - Complete async API for all operations
- Async Streaming - Process large datasets asynchronously
Version 0.2.0
- Environment Variable Support - Configure via environment variables
- Performance Monitoring Hooks - Track query execution time
- Enhanced Security - Comprehensive SQL injection prevention
- Modular Architecture - Refactored file readers
📦 Installation
Requirements
- Python 3.9+
- SQLAlchemy 2.0+ (for database connectivity)
- A supported SQLAlchemy driver (SQLite, PostgreSQL, MySQL, etc.)
Install Moltres
pip install moltres
For optional dependencies:
# For pandas support
pip install moltres[pandas]
# For polars support
pip install moltres[polars]
# For async support (requires async database drivers)
pip install moltres[async] # Core async support (aiofiles)
pip install moltres[async-postgresql] # PostgreSQL async (includes async + asyncpg)
pip install moltres[async-mysql] # MySQL async (includes async + aiomysql)
pip install moltres[async-sqlite] # SQLite async (includes async + aiosqlite)
# For both pandas and polars
pip install moltres[pandas,polars]
🚀 Quick Start
Basic DataFrame Operations
from moltres import col, connect
from moltres.expressions.functions import sum
# Connect to your database
db = connect("sqlite:///example.db")
# DataFrame operations with SQL pushdown (no data loading into memory)
df = (
db.table("orders")
.select()
.join(db.table("customers").select(), on=[("customer_id", "id")])
.where(col("active") == True) # noqa: E712
.group_by("country")
.agg(sum(col("amount")).alias("total_amount"))
)
# Execute and get results (SQL is compiled and executed here)
results = df.collect() # Returns list of dicts by default
Raw SQL & SQL Expressions
# Raw SQL queries (PySpark-style)
df = db.sql("SELECT * FROM users WHERE age > 18")
df = db.sql("SELECT * FROM orders WHERE id = :id", id=1).where(col("amount") > 100)
# SQL expression selection
df.selectExpr("amount * 1.1 as with_tax", "UPPER(name) as name_upper")
CRUD Operations
from moltres.io.records import Records
# Insert rows
records = Records(
_data=[
{"id": 1, "name": "Alice", "email": "alice@example.com", "active": 1},
{"id": 2, "name": "Bob", "email": "bob@example.com", "active": 0},
],
_database=db,
)
records.insert_into("customers") # Executes immediately
# Update rows
df = db.table("customers").select()
df.write.update(
"customers",
where=col("active") == 0,
set={"active": 1, "updated_at": "2024-01-01"}
) # Executes immediately
# Delete rows
df.write.delete("customers", where=col("email").is_null()) # Executes immediately
Async Support
Moltres also supports async/await for all database operations:
import asyncio
from moltres import async_connect, col
async def main():
# Connect asynchronously
db = async_connect("postgresql+asyncpg://user:pass@localhost/db")
# All operations are async
# For SQL operations, use db.table().select()
df = db.table("orders").select()
results = await df.collect()
# Streaming support
async for chunk in await df.collect(stream=True):
process_chunk(chunk)
await db.close()
asyncio.run(main())
Note: Async support requires async database drivers. Install with:
pip install moltres[async-postgresql]for PostgreSQL (includes async + asyncpg)pip install moltres[async-mysql]for MySQL (includes async + aiomysql)pip install moltres[async-sqlite]for SQLite (includes async + aiosqlite)
💡 Why Moltres?
The Gap in Python's Ecosystem
Python has powerful DataFrame tools (Pandas, Polars) and powerful SQL tools (SQLAlchemy, SQLModel), but no library connects them in a unified, ergonomic way.
The problem: Developers must juggle:
- Pandas or Polars for DataFrame transformations (but data must be loaded into memory)
- SQLAlchemy/ORMs for persistence (but not DataFrame-style)
- Raw SQL for updates/deletes (but not type-safe or composable)
Moltres fixes this by providing:
- ✅ DataFrame API - Transform data with familiar operations (select, filter, join, groupBy)
- ✅ SQL Pushdown Execution - All operations compile to SQL and run on your database—no data loading into memory
- ✅ Real SQL CRUD - INSERT, UPDATE, DELETE with DataFrame-style syntax
- ✅ 98% PySpark API Compatibility - Seamless migration from PySpark while maintaining SQL-first design
- ✅ Works with Existing SQL Infrastructure - No cluster required, works with SQLite, PostgreSQL, MySQL, and more
- ✅ Type Safe - Full type hints for better IDE support and fewer bugs
- ✅ Production Ready - Environment variables, connection pooling, monitoring hooks
- ✅ Secure by Default - SQL injection prevention built-in
Who Needs Moltres?
- Data Engineers - Avoid loading millions of rows into memory just to update a subset
- Backend Developers - Replace many ORM operations with cleaner, column-aware DataFrame syntax
- Analytics Engineers / dbt Users - Express SQL models in Python code with DataFrame chaining
- Product Engineers - Validated, type-safe CRUD without hand-writing SQL
- Teams migrating off Spark - Familiar DataFrame API style for traditional SQL databases—no cluster required. Note: Moltres focuses on SQL features, not PySpark feature parity. Features are included only if they map to SQL capabilities.
📖 Core Concepts
Design Philosophy: Moltres provides a DataFrame API that compiles to SQL. We focus on supporting SQL features (standard SQL and common dialect extensions) rather than replicating every PySpark feature. If a feature doesn't map to SQL/SQLAlchemy or doesn't align with SQL pushdown execution, it's not included. However, we maintain ~98% API compatibility with PySpark for core DataFrame operations to enable seamless migration.
Key Principles
- SQL-First Design - All operations compile to SQL and execute on the database
- No Data Materialization - Transformations happen in SQL, not in memory
- PySpark API Compatibility - Familiar API for teams migrating from PySpark
- Type Safety - Full type hints and validation throughout
- Security by Default - Built-in SQL injection prevention
Lazy Evaluation
All DataFrame query operations are lazy—they build a logical plan that only executes when you call collect(). DataFrame write operations (insertInto, update, delete) execute eagerly (immediately), matching PySpark's behavior. DDL operations (create_table, drop_table) are lazy and require .collect() to execute. The plan is compiled to SQL and executed on your database:
# This doesn't execute any SQL yet
df = db.table("users").select().where(col("age") > 18)
# SQL is compiled and executed here
results = df.collect()
Column Expressions
Moltres supports multiple ways to reference columns, giving you flexibility:
from moltres import col, lit
from moltres.expressions.functions import sum, avg, count, concat
# String names (traditional)
df.select("id", "name", "age")
df.select("*") # Select all columns
df.select("*", col("new_col")) # All columns plus new ones
df.where(col("age") > 18)
df.filter("age > 18") # SQL string predicate (PySpark-compatible)
df.where("age >= 18 AND status = 'active'") # Complex SQL strings
# Dot notation (PySpark-style)
df.select(df.id, df.name, df.age)
df.where(df.age > 18)
df.order_by(df.name)
df.group_by(df.category).agg(sum(df.amount))
# col() function
df.select(col("id"), col("name"))
df.where(col("age") > 18)
# Mix and match all three methods
df = (
db.table("sales")
.select(
df.product, # Dot notation
(col("price") * col("quantity")).alias("revenue"), # col() function
concat(df.first_name, lit(" "), df.last_name).alias("full_name"), # Mixed
)
.where(df.date >= "2024-01-01") # Dot notation in filters
.group_by("product") # String name
.agg(
sum(df.revenue).alias("total_revenue"), # Dot notation
avg(col("price")).alias("avg_price"), # col() function
count("*").alias("order_count"),
"quantity", # String column name (defaults to sum)
{"price": "avg", "amount": "sum"}, # Dictionary syntax
)
)
Note: Dot notation works by accessing attributes on the DataFrame. If an attribute doesn't exist as a method or property, it's treated as a column name. Existing methods like select, where, limit, and properties like na, write continue to work as before.
📥 Reading Data
From Database Tables
# Option 1: Use db.table().select() (original API)
df = db.table("customers").select()
df = db.table("customers").select("id", "name", "email")
# Option 2: Use db.read.table() (PySpark-style API)
df = db.read.table("customers")
df = db.read.table("customers").where(col("active") == True).select("id", "name")
Both APIs return lazy DataFrame objects that can be transformed before execution. The db.read.table() API matches PySpark's spark.read.table() pattern for consistency.
SQL Expression Selection
Use selectExpr() to write SQL expressions directly, similar to PySpark's selectExpr():
# Basic column selection
df.selectExpr("id", "name", "email")
# With expressions and aliases
df.selectExpr("id", "amount * 1.1 as with_tax", "UPPER(name) as name_upper")
# Complex expressions
df.selectExpr(
"(amount + tax) * 1.1 as total",
"CASE WHEN status = 'active' THEN 1 ELSE 0 END as is_active"
)
# Chaining with other operations
df.selectExpr("id", "amount").where(col("amount") > 100)
Key Features:
- Write SQL expressions directly as strings
- Supports arithmetic, functions, comparisons, and aliases
- Returns lazy
DataFrameobjects that can be chained - Works with both synchronous and asynchronous DataFrames
Raw SQL Queries
Execute raw SQL queries and get back a lazy DataFrame, similar to PySpark's spark.sql():
# Basic SQL query
df = db.sql("SELECT * FROM users WHERE age > 18")
results = df.collect()
# Parameterized queries (use :param_name syntax)
df = db.sql("SELECT * FROM users WHERE id = :id AND status = :status", id=1, status="active")
results = df.collect()
# Chain DataFrame operations on SQL results
df = db.sql("SELECT * FROM orders").where(col("amount") > 100).limit(10)
results = df.collect()
# Use with aggregations and joins
df = (
db.sql("SELECT product, region, SUM(amount) as total FROM sales GROUP BY product, region")
.where(col("total") > 100)
.order_by(col("total").desc())
)
results = df.collect()
Key Features:
- Returns lazy
DataFrameobjects that can be chained with other operations - Supports parameterized queries using named parameters (
:param_name) - SQL dialect is determined by the database connection
- Raw SQL is wrapped in a subquery when chained, enabling full DataFrame API compatibility
- Works with both synchronous and asynchronous databases
From Files
Moltres supports loading data from various file formats. File readers (db.load.* and db.read.*) return lazy DataFrame objects - matching PySpark's API. Files are materialized into temporary tables when .collect() is called, enabling SQL pushdown for subsequent operations.
# Option 1: Use db.load.* (original API)
df = db.load.csv("data.csv")
df = db.load.option("delimiter", "|").csv("pipe_delimited.csv")
df = db.load.option("header", False).schema([...]).csv("no_header.csv")
# Option 2: Use db.read.* (PySpark-style API)
df = db.read.csv("data.csv")
df = db.read.json("data.json") # Array of objects
df = db.read.jsonl("data.jsonl") # One JSON object per line
df = db.read.parquet("data.parquet") # Requires pandas and pyarrow
df = db.read.text("log.txt", column_name="line")
df = db.read.format("csv").option("header", True).load("data.csv")
# Both APIs work the same way
df = db.load.csv("data.csv") # Same as db.read.csv("data.csv")
df = db.read.csv("data.csv") # Same as db.load.csv("data.csv")
# Transform before materialization
df = db.read.csv("data.csv").where(col("score") > 90).select("name", "score")
rows = df.collect() # Materializes file and executes SQL operations
# Use db.read.records.* to get LazyRecords (lazy materialization)
lazy_records = db.read.records.csv("data.csv") # Returns LazyRecords (lazy, not materialized yet)
# LazyRecords automatically materialize when used:
len(lazy_records) # Auto-materializes when you check length
for row in lazy_records: # Auto-materializes when you iterate
process(row)
lazy_records.insert_into("table_name") # Auto-materializes and executes immediately
rows = lazy_records.rows() # Auto-materializes when you get rows
# Explicitly materialize with .collect() if needed
records = lazy_records.collect() # Returns materialized Records object
# For already-materialized data, use dicts() which returns Records directly
records = db.read.records.dicts([{"id": 1, "name": "Alice"}]) # Returns Records (already materialized)
Schema Inference and Explicit Schemas
Schema is automatically inferred from data, but you can provide explicit schemas:
from moltres.table.schema import ColumnDef
schema = [
ColumnDef(name="id", type_name="INTEGER"),
ColumnDef(name="name", type_name="TEXT"),
ColumnDef(name="score", type_name="REAL"),
]
df = db.load.schema(schema).csv("data.csv")
# Or for LazyRecords:
lazy_records = db.read.records.schema(schema).csv("data.csv")
File Format Options:
- CSV:
header(default: True),delimiter(default: ","),inferSchema(default: True) - JSON:
multiline(default: False) - if True, reads as JSONL - Parquet: Requires
pandasandpyarrow
Important:
db.load.*anddb.read.*methods return lazyDataFrameobjects - files are materialized into temporary tables when.collect()is called, enabling SQL pushdown for subsequent operations. Thedb.read.*API matches PySpark'sspark.read.*pattern for consistency.db.read.table()- PySpark-style API for reading database tables:df = db.read.table("customers")db.read.records.*methods return lazyLazyRecords/AsyncLazyRecordsobjects - Records materialize on-demand when you use Sequence operations (len(), indexing, iteration), callinsert_into(), or use them withcreateDataFrame(). Explicitly materialize with.collect()when needed. This provides better performance by deferring file reads until necessary.
- LazyRecords automatically materialize when: using
len(), indexing (records[0]), iteration (for row in records), callinginsert_into(), or using withcreateDataFrame()- Explicitly materialize:
records = lazy_records.collect()to get a materializedRecordsobject- For already-materialized data, use
db.read.records.dicts([...])which returnsRecordsdirectlyWhen to use each:
- Use
db.load.*ordb.read.*(DataFrames) when you want to transform data before materialization or need SQL pushdown- Use
db.read.table()(PySpark-style) for consistency with PySpark'sspark.read.table()API- Use
db.read.records.*(LazyRecords) when you want lazy materialization and Records-style operations (insert_into, direct iteration, etc.)
📤 Writing Data
To Database Tables
# Write with automatic schema inference and table creation
df.write.save_as_table("target_table")
# Write modes
df.write.mode("append").save_as_table("target") # Add to existing (default)
df.write.mode("overwrite").save_as_table("target") # Replace contents
df.write.mode("error_if_exists").save_as_table("target") # Fail if exists
# Insert into existing table (table must exist)
df.write.insertInto("existing_table")
# Update rows in a table (eager execution)
df.write.update("table_name", where=col("id") == 1, set={"name": "Updated"})
# Delete rows from a table (eager execution)
df.write.delete("table_name", where=col("id") == 1)
# With explicit schema
from moltres.table.schema import ColumnDef
schema = [
ColumnDef(name="id", type_name="INTEGER"),
ColumnDef(name="name", type_name="TEXT"),
]
df.write.schema(schema).save_as_table("target")
To Files
# Various formats
df.write.csv("output.csv")
df.write.json("output.json")
df.write.jsonl("output.jsonl")
df.write.parquet("output.parquet") # Requires pandas and pyarrow
# Generic save
df.write.save("output.csv") # Infers format from extension
df.write.save("data.txt", format="csv") # Explicit format
# With options
df.write.option("header", True).option("delimiter", "|").csv("output.csv")
df.write.option("compression", "gzip").parquet("output.parquet")
# Partitioned writes
df.write.partitionBy("country", "year").csv("partitioned_data")
File Formats:
- CSV: Standard comma-separated values (options:
header,delimiter) - JSON: Array of objects (options:
indent) - JSONL: One JSON object per line
- Parquet: Columnar format (requires
pandasandpyarrow, options:compression)
🌊 Streaming for Large Datasets
Moltres supports streaming operations for datasets larger than available memory:
# Enable streaming mode for LazyRecords (using read.records)
lazy_records = db.read.records.stream().option("chunk_size", 10000).csv("large_file.csv")
# Process records (LazyRecords auto-materialize when iterated, streaming processes in chunks)
for row in lazy_records:
process(row)
# Or materialize all at once
all_rows = lazy_records.rows() # Auto-materializes all data
# Explicitly materialize to get Records object
records = lazy_records.collect() # Returns materialized Records
# For DataFrames, use collect(stream=True)
df = db.load.csv("large_file.csv")
for chunk in df.collect(stream=True): # Returns iterator of row chunks
process_chunk(chunk)
# Streaming writes
df.write.stream().mode("overwrite").save_as_table("large_table")
df.write.stream().csv("output.csv")
# Streaming SQL queries
df = db.table("large_table").select()
for chunk in df.collect(stream=True):
process_chunk(chunk)
Streaming Options:
.stream(): Enable streaming mode (default: False for backward compatibility).option("chunk_size", N): Set chunk size for reads (default: 10000).option("batch_size", N): Set batch size for SQL inserts (default: 10000)collect(stream=True): Return iterator of row chunks instead of materializing
When to Use Streaming:
- Files or tables larger than available RAM
- Processing data in batches for memory efficiency
- Incremental processing pipelines
- Large data transformations
🗄️ Table Management
Creating Tables
from moltres import column, connect
db = connect("sqlite:///example.db")
# Create a table with schema definition (lazy operation)
customers = db.create_table(
"customers",
[
column("id", "INTEGER", nullable=False, primary_key=True),
column("name", "TEXT", nullable=False),
column("email", "TEXT", nullable=True),
column("active", "INTEGER", default=1),
],
).collect() # Execute the create_table operation
# Insert data using Records
from moltres.io.records import Records
records = Records(
_data=[
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
],
_database=db,
)
records.insert_into("customers") # Executes immediately
# Drop tables (lazy operation)
db.drop_table("customers").collect() # Execute the drop_table operation
The column() helper accepts:
name: Column nametype_name: SQL type (e.g., "INTEGER", "TEXT", "REAL", "VARCHAR(255)")nullable: Whether the column allows NULL (default: True)default: Default value for the column (optional)primary_key: Whether this is a primary key column (default: False)
✏️ Data Mutations
DataFrame write operations (insert, update, delete) execute eagerly (immediately), matching PySpark's behavior. DDL operations (create_table, drop_table) are lazy and require .collect() to execute:
from moltres import col
from moltres.io.records import Records
# Insert rows using Records (for raw data)
records = Records(
_data=[
{"id": 1, "name": "Alice", "active": 1},
{"id": 2, "name": "Bob", "active": 0},
],
_database=db,
)
records.insert_into("customers") # Executes immediately
# Or insert from DataFrame
df = db.table("source").select()
df.write.insertInto("customers") # Executes immediately
# Update rows using DataFrame write API (eager execution)
df = db.table("customers").select()
df.write.update("customers", where=col("id") == 2, set={"active": 1}) # Executes immediately
# Delete rows using DataFrame write API (eager execution)
df.write.delete("customers", where=col("active") == 0) # Executes immediately
Note: DDL operations (create_table, drop_table) remain lazy and require .collect() to execute.
Transaction Support
All operations within a transaction context share the same database connection and transaction. If any operation fails, all changes are automatically rolled back:
# Execute multiple operations in a single transaction
from moltres.io.records import Records
with db.transaction() as txn:
# All operations share the same transaction
records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db)
records.insert_into("customers")
records2 = Records(_data=[{"id": 1, "customer_id": 1, "amount": 100.0}], _database=db)
records2.insert_into("orders")
# DataFrame write operations also participate in the transaction
df = db.table("customers").select()
df.write.update("customers", where=col("id") == 1, set={"name": "Alice Updated"})
# If any operation fails, all changes are rolled back
# If all succeed, transaction commits automatically
You can also explicitly control the transaction:
with db.transaction() as txn:
records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db)
records.insert_into("customers")
txn.commit() # Explicit commit
# Or txn.rollback() to rollback
Note: By default, each write operation executes in its own auto-commit transaction. Use db.transaction() to group multiple operations into a single atomic transaction.
Batch Operations
The batch API allows you to queue multiple lazy operations and execute them together in a single transaction:
# Queue multiple DDL operations and execute them atomically
# Note: DataFrame write operations (insertInto, update, delete) are eager and execute immediately
# Only lazy DDL operations (create_table, drop_table) can be batched
with db.batch():
# All DDL operations are queued and executed together on exit
db.create_table("users", [
ColumnDef(name="id", type_name="INTEGER"),
ColumnDef(name="name", type_name="TEXT"),
])
# After table is created, use Records or DataFrame writes for inserts
# (These execute immediately, not in the batch)
# All DDL operations execute together in a single transaction
# If any operation fails, all changes are rolled back
The batch context manager automatically:
- Queues all lazy operations created within the context
- Executes them together in a single transaction when exiting the context
- Rolls back all changes if any operation fails
- Supports both synchronous and asynchronous operations
📊 Result Formats
By default, collect() returns a list of dictionaries (fetch_format="records"), so Moltres works even when pandas/polars are unavailable. You can configure the result format when connecting:
# Default: list of dicts
db = connect("sqlite:///example.db")
results = df.collect() # List[Dict[str, Any]]
# Pandas DataFrame (requires pandas)
db = connect("sqlite:///example.db", fetch_format="pandas")
results = df.collect() # pandas.DataFrame
# Polars DataFrame (requires polars)
db = connect("sqlite:///example.db", fetch_format="polars")
results = df.collect() # polars.DataFrame
⚙️ Configuration
Programmatic Configuration
db = connect(
"postgresql://user:pass@host/dbname",
echo=True, # Enable SQL logging
fetch_format="pandas",
pool_size=10,
max_overflow=5,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True, # Connection health checks
)
Environment Variables
Moltres supports configuration via environment variables for easier deployment (12-factor app friendly):
export MOLTRES_DSN="postgresql://user:pass@host/dbname"
export MOLTRES_POOL_SIZE=10
export MOLTRES_POOL_PRE_PING=true
export MOLTRES_FETCH_FORMAT="pandas"
Then in your code:
from moltres import connect
# Uses MOLTRES_DSN from environment
db = connect()
Supported environment variables:
MOLTRES_DSN: Database connection stringMOLTRES_ECHO: Enable SQL logging (true/false)MOLTRES_FETCH_FORMAT: "records", "pandas", or "polars"MOLTRES_DIALECT: Override SQL dialectMOLTRES_POOL_SIZE: Connection pool sizeMOLTRES_MAX_OVERFLOW: Maximum pool overflowMOLTRES_POOL_TIMEOUT: Pool timeout in secondsMOLTRES_POOL_RECYCLE: Connection recycle timeMOLTRES_POOL_PRE_PING: Enable connection health checks (true/false)
Configuration Precedence: Programmatic arguments > Environment variables > Defaults
📈 Performance Monitoring
Moltres provides optional performance monitoring hooks for tracking query execution:
from moltres.engine import register_performance_hook
def log_slow_queries(sql: str, elapsed: float, metadata: dict):
if elapsed > 1.0:
print(f"Slow query ({elapsed:.2f}s): {sql[:100]}")
print(f" Rows affected: {metadata.get('rowcount', 'N/A')}")
register_performance_hook("query_end", log_slow_queries)
# Now all queries will be monitored
db = connect("sqlite:///example.db")
df.collect() # Slow queries will be logged
# Unregister when done
from moltres.engine import unregister_performance_hook
unregister_performance_hook("query_end", log_slow_queries)
Available Events:
query_start: Fired when a query begins executionquery_end: Fired when a query completes (includes elapsed time and metadata)
🔒 Security
Moltres includes built-in security features to prevent SQL injection:
- SQL Identifier Validation - All table and column names are validated
- Parameterized Queries - All user data is passed as parameters, never string concatenation
- Input Sanitization - Comprehensive validation of identifiers and inputs
See docs/SECURITY.md for security best practices and guidelines.
📚 Advanced Examples
Complex Joins and Aggregations
from moltres import col
from moltres.expressions.functions import sum, avg, count
# Multi-table join with aggregations
df = (
db.table("orders")
.select()
.join(db.table("customers").select(), on=[("customer_id", "id")])
.join(db.table("products").select(), on=[("product_id", "id")])
.where(col("date") >= "2024-01-01")
.group_by("country", "category")
.agg(
sum(col("amount")).alias("total_revenue"),
avg(col("amount")).alias("avg_order_value"),
count("*").alias("order_count"),
)
.order_by(col("total_revenue").desc())
.limit(10)
)
results = df.collect()
Window Functions
# Complex expressions with window functions
df = (
db.table("sales")
.select(
col("product"),
col("amount"),
col("date"),
(col("amount") - avg(col("amount")).over()).alias("deviation_from_avg"),
)
.where(col("date") >= "2024-01-01")
)
Complete ETL Pipeline
# Complete ETL pipeline
db = connect("postgresql://user:pass@localhost/warehouse")
# Extract: Load from CSV (returns DataFrame, or use read.records for LazyRecords)
raw_df = db.load.csv("raw_sales.csv")
# Or for LazyRecords (lazy materialization):
raw_records = db.read.records.csv("raw_sales.csv")
# Load raw data into staging table (lazy operation)
db.create_table("staging_sales", [
column("order_id", "INTEGER"),
column("product", "TEXT"),
column("amount", "REAL"),
column("date", "DATE"),
]).collect() # Execute the create_table operation
# LazyRecords auto-materialize when insert_into() is called
raw_records.insert_into("staging_sales") # Auto-materializes and executes immediately
# Transform: Clean and aggregate using SQL operations
cleaned = (
db.table("staging_sales")
.select(
col("order_id"),
col("product").upper().alias("product"),
col("amount").cast("REAL"),
col("date"),
)
.where(col("amount") > 0)
.group_by("product", "date")
.agg(sum(col("amount")).alias("daily_revenue"))
)
# Load: Write to database
cleaned.write.mode("overwrite").save_as_table("daily_sales_summary")
🛠️ Supported Operations
DataFrame Operations (PySpark-Compatible)
select()/selectExpr()- Project columns or SQL expressionswhere()/filter()- Filter rows (supports SQL strings)join()- Join with other DataFramesgroup_by()/groupBy()- Group rowsagg()- Aggregate functions (supports strings and dictionaries)order_by()/orderBy()/sort()- Sort rowslimit()- Limit number of rowsdistinct()- Remove duplicate rowswithColumn()/withColumnRenamed()- Add or rename columnspivot()- Pivot operations (includinggroupBy().pivot())explode()- Explode array/JSON columnsdb.sql()- Execute raw SQL queries
DataFrame Write Operations
df.write.insertInto("table")- Insert DataFrame into existing table (eager execution)df.write.update("table", where=..., set={...})- Update rows in table (eager execution)df.write.delete("table", where=...)- Delete rows from table (eager execution)df.write.save_as_table("table")/saveAsTable()- Write DataFrame to table (eager execution)
Column Expressions
- Arithmetic:
+,-,*,/,% - Comparisons:
==,!=,<,>,<=,>= - Boolean:
&,|,~ - Functions:
sum(),avg(),count(),concat(),coalesce(),upper(),lower(),explode(), etc. - Window Functions:
over(),partition_by(),order_by()
Supported SQL Dialects
- ✅ SQLite - Full support
- ✅ PostgreSQL - Full support with dialect-specific optimizations
- ✅ MySQL - Full support with dialect-specific optimizations
- ✅ Other SQLAlchemy-supported databases - ANSI SQL fallback
🧪 Development
Setup
# Clone the repository
git clone https://github.com/eddiethedean/moltres.git
cd moltres
# Install in development mode
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
Running Tests
# Run all tests
pytest
# Run tests in parallel
pytest -n 9
# Run with coverage
pytest --cov=src/moltres --cov-report=html
Code Quality
# Linting
ruff check .
# Formatting
ruff format .
# Type checking (strict mode enabled)
mypy src
📖 Documentation
Additional documentation is available in the docs/ directory:
- Why Moltres? - Understanding the gap Moltres fills and who needs it
- Examples - Common patterns, use cases, and examples for each audience
- Security Guide - Security best practices and SQL injection prevention
- Troubleshooting - Common issues and solutions
- Design Notes - High-level architecture and design decisions
- Advocacy Document - Detailed positioning and comparison with alternatives
🤝 Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
Quick Start:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Before submitting:
- Run tests:
pytest - Check code quality:
ruff check . && mypy src - Update documentation if needed
👤 Author
Odos Matthews
- GitHub: @eddiethedean
- Email: odosmatthews@gmail.com
🙏 Acknowledgments
- Inspired by PySpark's DataFrame API style, but focused on SQL feature support rather than PySpark feature parity
- Built on SQLAlchemy for database connectivity and SQL compilation
- Thanks to all contributors and users
📄 License
MIT License - see LICENSE file for details.
Made with ❤️ for the Python data community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file moltres-0.9.0.tar.gz.
File metadata
- Download URL: moltres-0.9.0.tar.gz
- Upload date:
- Size: 155.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6dfcbf7c98a799de6f196fc6314d0345987eb6764a9d141bf420e415be78e97
|
|
| MD5 |
bab219d70d9e6132632519d9fb3cae97
|
|
| BLAKE2b-256 |
1648014f3fc3906de9b678909828e8ac1ecc2fa193500b1f422be15e5363170d
|
File details
Details for the file moltres-0.9.0-py3-none-any.whl.
File metadata
- Download URL: moltres-0.9.0-py3-none-any.whl
- Upload date:
- Size: 161.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cb32b02b31e2f1e392c2a6d09e5dd6f96966234ef603af5b5fc2300fdc559953
|
|
| MD5 |
afa6dfc101cf16a756cabd0fd31db5c8
|
|
| BLAKE2b-256 |
e23cfc053ed1eb09a5cff43f37878e53dd16c82ffc0f41c89c895181f91d706e
|