Skip to main content

High-performance Delta Lake database with POSIX interface and Python bindings

Project description

posixlake Python Bindings

High-performance Delta Lake database with Python API and POSIX interface

Python API for posixlake (File Store Database) - Access Delta Lake operations, SQL queries, time travel, and use Unix commands (`cat`, `grep`, `awk`, `wc`, `head`, `tail`, `sort`, `cut`, `echo >>`, `sed -i`, `vim`, `mkdir`, `mv`, `cp`, `rmdir`, `rm`) to query and trigger Delta Lake transactions. Mount databases as POSIX filesystems where standard Unix tools execute ACID operations. Works with local filesystem directories, S3/MinIO, Azure Blob Storage, and Microsoft Fabric OneLake. Built on Rust for maximum performance.

Python PyPI Delta Lake License Rust

Arrow DataFusion S3 Compatible Azure Fabric NFS Server


Key Features:

  • Delta Lake Native: Full ACID transactions with native _delta_log/ format
  • SQL Queries: DataFusion-powered SQL engine embedded in Python
  • Time Travel: Query historical versions and timestamps
  • CSV/Parquet Import: Create databases from CSV (auto schema inference) or Parquet files
  • Buffered Inserts: 10x performance improvement for small batch writes
  • NFS Server: Mount Delta Lake as POSIX filesystem - standard Unix tools work directly
  • Storage Backends: Works with local filesystem, S3/MinIO, and Azure Blob Storage (Azurite) - same unified API
  • Performance: Rust-powered engine with buffered inserts (~10x faster for small batches)
  • No Special Drivers: Uses OS built-in NFS client - zero installation
  • Delta Lake Compatible: Tables readable by Spark, Databricks, and Athena immediately

Installation

From PyPI (Recommended)

pip install posixlake

Requirements:

  • Python 3.11+ (required for prebuilt wheels with native library)
  • x86_64 and ARM64 are supported on supported platforms
  • As with any native extension, Python and the native library/wheel must match architecture (x86_64x86_64, arm64arm64)
  • For other Python versions, install from source (see below)

PyPI Package: https://pypi.org/project/posixlake/

From Source

# 1. Clone the repository
git clone https://github.com/npiesco/posixlake.git
cd posixlake

# 2. Build Rust library
cargo build --release

# 3. Generate Python API
cargo run --bin uniffi-bindgen -- generate \
    --library target/release/<platform-library> \
    --language python \
    --out-dir bindings/python/posixlake

# 4. Copy library
cp target/release/<platform-library> bindings/python/posixlake/

# 5. Install Python package
cd bindings/python
pip install -e .

Use the correct library name for your OS:

  • Linux: libposixlake.so
  • macOS: libposixlake.dylib
  • Windows: posixlake.dll

If you want to build for a specific architecture, set the Rust target explicitly before generating bindings:

  • Windows x86_64 Python: --target x86_64-pc-windows-msvc
  • Windows ARM64 Python: --target aarch64-pc-windows-msvc

Prerequisites:

  • Python 3.8+ (3.11+ recommended for prebuilt wheels)
  • Rust 1.70+ (for building from source)
  • NFS client (built-in on macOS/Linux/Windows Pro)

Quick Start

Example 1: Basic Database Operations

from posixlake import DatabaseOps, Schema, Field, PosixLakeError

# Create a schema
schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
    Field(name="age", data_type="Int32", nullable=True),
    Field(name="salary", data_type="Float64", nullable=True),
], primary_key="id")

# Create database on local filesystem
try:
    db = DatabaseOps.create("/path/to/db", schema)
    print("✓ Database created")
except PosixLakeError as e:
    print(f"✗ Error: {e}")

# Insert data (JSON format)
data = '[{"id": 1, "name": "Alice", "age": 30, "salary": 75000.0}]'
db.insert_json(data)

# Query with SQL
results = db.query_json("SELECT * FROM data WHERE age > 25")
print(results)
# [{"id": 1, "name": "Alice", "age": 30, "salary": 75000.0}]

# Delete rows
db.delete_rows_where("id = 1")
print("✓ Row deleted")

Example 2: Buffered Insert (High Performance)

from posixlake import DatabaseOps, Schema, Field
import json

schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
    Field(name="email", data_type="String", nullable=False),
], primary_key="id")

db = DatabaseOps.create("/path/to/db", schema)

# Insert many small batches efficiently (buffers up to 1000 rows)
print("Inserting 100 small batches using buffered insert...")
for i in range(100):
    db.insert_buffered_json(json.dumps([{
        "id": i,
        "name": f"User_{i}",
        "email": f"user{i}@example.com"
    }]))
    if (i + 1) % 20 == 0:
        print(f"  Buffered {i + 1}/100 batches...")

# Flush buffer to commit all data
print("\nFlushing write buffer...")
db.flush_write_buffer()
print("✓ All buffered data committed to Delta Lake")

# Result: ~1-2 Delta Lake transactions instead of 100!
# Performance improvement: ~10x faster for small batches

Example 3: S3 / Object Storage Backend

from posixlake import DatabaseOps, Schema, Field, S3Config

schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
    Field(name="value", data_type="Float64", nullable=True),
], primary_key="id")

# Create database on S3/MinIO
s3_config = S3Config(
    endpoint="http://localhost:9000",  # MinIO or AWS S3 endpoint
    access_key_id="minioadmin",
    secret_access_key="minioadmin",
    region="us-east-1"
)

db = DatabaseOps.create_with_s3("s3://bucket-name/db-path", schema, s3_config)

# Same API works with S3!
db.insert_json('[{"id": 1, "name": "Alice", "value": 123.45}]')
results = db.query_json("SELECT * FROM data WHERE value > 100")
print(results)

# All data stored in S3 with Delta Lake ACID transactions

Example 3b: Azure Blob Storage / Azurite Backend

from posixlake import DatabaseOps, Schema, Field, AzureConfig

schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
    Field(name="value", data_type="Float64", nullable=True),
], primary_key="id")

# Create database on Azure (Azurite for local testing)
azure_config = AzureConfig(
    account_name="devstoreaccount1",
    account_key="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
    endpoint="http://127.0.0.1:10000"
)

db = DatabaseOps.create_with_azure("az://posixlake-test", schema, azure_config)

# Same API works with Azure!
db.insert_json('[{"id": 1, "name": "Alice", "value": 123.45}]')
results = db.query_json("SELECT * FROM data WHERE value > 100")
print(results)

# All data stored in Azure Blob Storage with Delta Lake ACID transactions

Running the Python Azure Integration Test

The repository includes a real Azure/Azurite integration script at scripts/test_python_azure.py. It exercises the UniFFI-exposed Python surface for:

  • DatabaseOps.create_with_azure()
  • DatabaseOps.open_with_azure()
  • DatabaseOps.health_check()
  • insert_json() / query_json() persistence across reopen
  • merge_json() against an Azure-backed table

It expects Azurite on http://127.0.0.1:10000; the script auto-creates a unique Azure container per test database.

From the repo root:

python scripts/test_python_azure.py

The repository includes a real S3/MinIO integration script at scripts/test_python_s3.py. It exercises the UniFFI-exposed Python surface for:

  • DatabaseOps.create_with_s3()
  • DatabaseOps.open_with_s3()
  • DatabaseOps.health_check()
  • insert_json() / query_json() persistence across reopen
  • merge_json() against an S3-backed table

It expects MinIO on http://localhost:9000 with bucket posixlake-test.

From the repo root:

python scripts/test_python_s3.py

If you are using the repo-managed Python environment, run:

bindings/python/.venv/Scripts/python.exe scripts/test_python_s3.py

The larger manual integration harness at scripts/full_test.py also invokes this S3 script as part of its run.

Example 4: POSIX Access via NFS Server

from posixlake import DatabaseOps, Schema, Field, NfsServer
import time
import subprocess

# Create database
schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
    Field(name="age", data_type="Int32", nullable=True),
], primary_key="id")
db = DatabaseOps.create("/path/to/db", schema)

# Insert data
db.insert_json('[{"id": 1, "name": "Alice", "age": 30}, {"id": 2, "name": "Bob", "age": 25}]')

# Start NFS server on port 12049
nfs_port = 12049
nfs_server = NfsServer(db, nfs_port)
print(f"✓ NFS server started on port {nfs_port}")

# Wait for server to be ready
time.sleep(0.5)
if nfs_server.is_ready():
    print("✓ NFS server is ready!")
else:
    print("⚠ NFS server not ready, POSIX operations may fail")

# Mount filesystem (requires sudo - run this in terminal)
# sudo mount_nfs -o nolocks,vers=3,tcp,port=12049,mountport=12049 localhost:/share /mnt/posixlake

# Now use standard Unix tools to query and trigger Delta Lake operations:
# $ cat /mnt/posixlake/data/data.csv  # Queries Parquet data, converts to CSV
# id,name,age
# 1,Alice,30
# 2,Bob,25
#
# $ grep "Alice" /mnt/posixlake/data/data.csv | awk -F',' '{print $2}'  # Search and process
# Alice
#
# $ wc -l /mnt/posixlake/data/data.csv  # Count records
# 3 /mnt/posixlake/data/data.csv
#
# $ echo "3,Charlie,28" >> /mnt/posixlake/data/data.csv  # Triggers Delta Lake INSERT transaction!
#
# $ sed -i 's/Alice,30/Alice,31/' /mnt/posixlake/data/data.csv  # Triggers Delta Lake MERGE (UPDATE) transaction!
#
# $ grep -v "Bob" /mnt/posixlake/data/data.csv > /tmp/temp && cat /tmp/temp > /mnt/posixlake/data/data.csv  # Triggers MERGE (DELETE) transaction!

# Shutdown NFS server when done
# nfs_server.shutdown()

Example 5: Time Travel Queries

from posixlake import DatabaseOps, Schema, Field

schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
], primary_key="id")

db = DatabaseOps.create("/path/to/db", schema)

# Insert initial data
db.insert_json('[{"id": 1, "name": "Alice"}]')

# Insert more data
db.insert_json('[{"id": 2, "name": "Bob"}]')

# Query by version (creation is version 0, first insert is version 1, second insert is version 2)
results_v1 = db.query_version_json("SELECT * FROM data", 1)
print(f"Data at version 1: {results_v1}")
# [{"id": 1, "name": "Alice"}]

results_v2 = db.query_version_json("SELECT * FROM data", 2)
print(f"Data at version 2: {results_v2}")
# [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

# Query by timestamp (milliseconds since epoch)
import time
timestamp = int(time.time() * 1000)
results = db.query_timestamp_json("SELECT * FROM data", timestamp)
print(f"Data at timestamp {timestamp}: {results}")

Example 6: Import from CSV (Auto Schema Inference)

from posixlake import DatabaseOps
import json

# Create database by importing CSV - schema is automatically inferred!
# Column types detected: Int64, Float64, Boolean, String
db = DatabaseOps.create_from_csv("/path/to/new_db", "/path/to/data.csv")

# Query the imported data
results = db.query_json("SELECT * FROM data LIMIT 5")
print(json.loads(results))

# Check inferred schema
schema = db.get_schema()
for field in schema.fields:
    print(f"  {field.name}: {field.data_type} (nullable={field.nullable})")

Example 7: Import from Parquet

from posixlake import DatabaseOps
import json

# Create database from existing Parquet file(s)
# Schema is read directly from Parquet metadata
db = DatabaseOps.create_from_parquet("/path/to/new_db", "/path/to/data.parquet")

# Supports glob patterns for multiple files
db = DatabaseOps.create_from_parquet("/path/to/db", "/data/*.parquet")

# Query the imported data
results = db.query_json("SELECT COUNT(*) as total FROM data")
print(json.loads(results))

Example 8: Delta Lake Operations

from posixlake import DatabaseOps, Schema, Field

db = DatabaseOps.open("/path/to/db")

# OPTIMIZE: Compact small Parquet files into larger ones
optimize_result = db.optimize()
print(f"✓ OPTIMIZE completed: {optimize_result}")

# VACUUM: Remove old files (retention period in hours)
vacuum_result = db.vacuum(retention_hours=168)  # 7 days
print(f"✓ VACUUM completed: {vacuum_result}")

# Z-ORDER: Multi-dimensional clustering for better query performance
zorder_result = db.zorder(columns=["id", "name"])
print(f"✓ Z-ORDER completed: {zorder_result}")

# Get data skipping statistics
stats = db.get_data_skipping_stats()
print(f"Data skipping stats: {stats}")

Core Features

Database Operations

Creating and Opening Databases

from posixlake import DatabaseOps, Schema, Field, S3Config

# Local filesystem with explicit schema
schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
], primary_key="id")
db = DatabaseOps.create("/path/to/db", schema)
db = DatabaseOps.open("/path/to/db")

# Import from CSV (auto schema inference)
db = DatabaseOps.create_from_csv("/path/to/db", "/path/to/data.csv")

# Import from Parquet (schema from metadata)
db = DatabaseOps.create_from_parquet("/path/to/db", "/path/to/data.parquet")
db = DatabaseOps.create_from_parquet("/path/to/db", "/data/*.parquet")  # glob pattern

# With authentication
db = DatabaseOps.create_with_auth("/path/to/db", schema, auth_enabled=True)
db = DatabaseOps.open_with_credentials("/path/to/db", credentials)

# S3 backend
s3_config = S3Config(
    endpoint="http://localhost:9000",
    access_key_id="minioadmin",
    secret_access_key="minioadmin",
    region="us-east-1"
)
db = DatabaseOps.create_with_s3("s3://bucket/db-path", schema, s3_config)
db = DatabaseOps.open_with_s3("s3://bucket/db-path", s3_config)

# Azure Blob Storage backend
azure_config = AzureConfig(
    account_name="devstoreaccount1",
    account_key="...",
    endpoint="http://127.0.0.1:10000"
)
db = DatabaseOps.create_with_azure("az://container-name", schema, azure_config)
db = DatabaseOps.open_with_azure("az://container-name", azure_config)

Data Insertion

# Regular insert (one transaction per call)
db.insert_json('[{"id": 1, "name": "Alice"}]')

# Buffered insert (batches multiple writes)
db.insert_buffered_json('[{"id": 2, "name": "Bob"}]')
db.insert_buffered_json('[{"id": 3, "name": "Charlie"}]')
db.flush_write_buffer()  # Commit all buffered data

# MERGE (UPSERT) operation
merge_data = [
    {"id": 1, "name": "Alice Updated", "_op": "UPDATE"},
    {"id": 4, "name": "David", "_op": "INSERT"},
    {"id": 2, "_op": "DELETE"}
]
result = db.merge_json(json.dumps(merge_data), "id")
# Returns: {"rows_inserted": 1, "rows_updated": 1, "rows_deleted": 1}

SQL Queries

# Basic query
results = db.query_json("SELECT * FROM data WHERE id > 0")

# Aggregations
results = db.query_json("SELECT COUNT(*) as count, AVG(age) as avg_age FROM data")

# Joins (if multiple tables)
results = db.query_json("""
    SELECT a.id, a.name, b.value 
    FROM data a 
    JOIN other_table b ON a.id = b.id
""")

# Time travel queries
results = db.query_version_json("SELECT * FROM data", version=5)
results = db.query_timestamp_json("SELECT * FROM data", timestamp_ms=1234567890000)

Row Deletion

# Delete by condition
db.delete_rows_where("id = 5")
db.delete_rows_where("age < 18")
db.delete_rows_where("name LIKE '%test%'")

# Delete all rows (truncate)
db.delete_rows_where("1=1")

Time Travel

posixlake supports Delta Lake's time travel feature, allowing you to query historical versions of your data:

# Query by version
results = db.query_version_json("SELECT * FROM data", version=10)

# Query by timestamp (milliseconds since epoch)
import time
timestamp_ms = int(time.time() * 1000) - 3_600_000  # 1 hour ago
results = db.query_timestamp_json("SELECT * FROM data", timestamp_ms)

Delta Lake Operations

OPTIMIZE (File Compaction)

# Compact small Parquet files into larger ones for better query performance
result = db.optimize()
print(f"Files compacted: {result}")

VACUUM (Cleanup Old Files)

# Remove old files (retention period in hours)
# Default: 168 hours (7 days)
result = db.vacuum(retention_hours=168)
print(f"Files removed: {result}")

Z-ORDER (Multi-dimensional Clustering)

# Cluster data by multiple columns for better query performance
result = db.zorder(columns=["id", "name", "age"])
print(f"Z-ORDER completed: {result}")

Data Skipping Statistics

# Get statistics for query optimization
stats = db.get_data_skipping_stats()
print(f"Data skipping stats: {stats}")

NFS Server (POSIX Filesystem Access)

The NFS server allows you to mount your Delta Lake database as a standard POSIX filesystem. Unix commands don't just read data - they trigger Delta Lake operations: cat queries Parquet data, grep searches, echo >> triggers INSERT transactions, sed -i triggers MERGE (UPDATE/DELETE) transactions. All operations are ACID-compliant Delta Lake transactions.

Starting the NFS Server

from posixlake import DatabaseOps, Schema, Field, NfsServer
import time

# Create/open database
db = DatabaseOps.open("/path/to/db")

# Start NFS server on port 12049
nfs = NfsServer(db, 12049)

# Wait for server to be ready
time.sleep(0.5)
if nfs.is_ready():
    print("✓ NFS server ready")
else:
    print("⚠ NFS server not ready")

Mounting the Filesystem

# Mount command (requires sudo)
sudo mount_nfs -o nolocks,vers=3,tcp,port=12049,mountport=12049 localhost:/share /mnt/posixlake

# Verify mount
ls -la /mnt/posixlake/
# data/
# schema.sql
# .query

Using POSIX Commands

Once mounted, your Delta Lake table is accessible like any other directory:

# 1. List directory contents
ls -la /mnt/posixlake/data/

# 2. Read all data as CSV
cat /mnt/posixlake/data/data.csv
# id,name,age
# 1,Alice,30
# 2,Bob,25

# 3. Search for specific records with grep
grep "Alice" /mnt/posixlake/data/data.csv
# 1,Alice,30

# 4. Process columns with awk
awk -F',' '{print $2, $3}' /mnt/posixlake/data/data.csv
# name age
# Alice 30
# Bob 25

# 5. Count lines/records with wc
wc -l /mnt/posixlake/data/data.csv
# 3 /mnt/posixlake/data/data.csv (includes header)

# 6. Sort data by a column
sort -t',' -k2 /mnt/posixlake/data/data.csv  # Sort by name

# 7. Append new data (triggers Delta Lake INSERT transaction!)
echo "3,Charlie,28" >> /mnt/posixlake/data/data.csv
# → Executes: Delta Lake INSERT transaction with ACID guarantees
cat /mnt/posixlake/data/data.csv
# id,name,age
# 1,Alice,30
# 2,Bob,25
# 3,Charlie,28

# 8. Edit data (triggers Delta Lake MERGE transaction - atomic INSERT/UPDATE/DELETE!)
# Example: Update Alice's age to 31
sed -i 's/Alice,30/Alice,31/' /mnt/posixlake/data/data.csv
# → Executes: Delta Lake MERGE transaction (UPDATE operation)
cat /mnt/posixlake/data/data.csv
# id,name,age
# 1,Alice,31
# 2,Bob,25
# 3,Charlie,28

# Example: Delete Bob (id=2)
grep -v "2,Bob" /mnt/posixlake/data/data.csv > /tmp/temp_data.csv
cat /tmp/temp_data.csv > /mnt/posixlake/data/data.csv
# → Executes: Delta Lake MERGE transaction (DELETE operation)
cat /mnt/posixlake/data/data.csv
# id,name,age
# 1,Alice,31
# 3,Charlie,28

# 9. Truncate table (triggers Delta Lake DELETE ALL transaction!)
rm /mnt/posixlake/data/data.csv
# → Executes: Delta Lake DELETE ALL transaction
cat /mnt/posixlake/data/data.csv
# id,name,age

Unmounting and Shutdown

# Unmount filesystem
sudo umount /mnt/posixlake
# Shutdown NFS server
nfs.shutdown()

How It Works:

  • Read Operations (cat, grep, awk, wc): NFS server queries Parquet files → converts to CSV on-demand → caches result
  • Append Operations (echo >>): NFS server parses CSV → converts to RecordBatch → Delta Lake INSERT transaction
  • Overwrite Operations (sed -i, cat > file): Detects INSERT/UPDATE/DELETE by comparing old vs new CSV → executes MERGE transaction (atomic INSERT/UPDATE/DELETE)
  • Delete Operations (rm file): Triggers Delta Lake DELETE ALL transaction
  • No Special Drivers: Uses OS built-in NFS client - works everywhere

Authentication & Security

from posixlake import DatabaseOps, Schema, Field

# Create database with authentication enabled
schema = Schema(fields=[...], primary_key=None)
db = DatabaseOps.create_with_auth("/path/to/db", schema, auth_enabled=True)
db.create_user("admin", "secret", ["admin"])

# Open with credentials
db = DatabaseOps.open_with_credentials("/path/to/db", "admin", "secret")

# User management
db.create_user("alice", "password123", ["read", "write"])

# Role-based access control
# Permissions checked automatically on all operations

Backup & Restore

from posixlake import (
    get_backup_metadata,
    get_backup_metadata_with_credentials,
    restore,
    restore_to_transaction,
    restore_to_transaction_with_credentials,
    restore_with_credentials,
    verify_backup,
    verify_backup_with_credentials,
)

# Full backup
db.backup("/path/to/backup")

# Incremental backup
db.backup_incremental("/path/to/base_backup", "/path/to/incremental_backup")

# Backup inspection
metadata = get_backup_metadata("/path/to/backup")
report = verify_backup("/path/to/backup")

# Restore helpers
restore("/path/to/backup", "/path/to/restore")
restore_to_transaction("/path/to/backup", "/path/to/restore_txn", metadata.timestamp)

# Auth-enabled backup helpers
metadata = get_backup_metadata_with_credentials("/path/to/auth_backup", "admin", "secret")
report = verify_backup_with_credentials("/path/to/auth_backup", "admin", "secret")
restore_with_credentials("/path/to/auth_backup", "/path/to/auth_restore", "admin", "secret")
restore_to_transaction_with_credentials(
    "/path/to/auth_backup",
    "/path/to/auth_restore_txn",
    metadata.timestamp,
    "admin",
    "secret",
)

Monitoring

# Get real-time metrics
metrics = db.get_metrics()
print(metrics.total_queries, metrics.total_inserts, metrics.uptime_seconds)

# Health check
health = db.health_check()
print(health.status, health.total_files, health.total_size_bytes)

# Data skipping statistics
stats = db.get_data_skipping_stats()
print(stats.files_read, stats.files_skipped, stats.bytes_scanned)

API Reference

DatabaseOps

Main class for database operations.

Methods

Method Description Returns
create(path, schema) Create new database DatabaseOps
create_from_csv(db_path, csv_path) Create from CSV (auto schema) DatabaseOps
create_from_parquet(db_path, parquet_path) Create from Parquet DatabaseOps
open(path) Open existing database DatabaseOps
create_with_auth(path, schema, auth_enabled) Create with authentication DatabaseOps
open_with_credentials(path, username, password) Open with credentials DatabaseOps
create_with_s3(s3_path, schema, s3_config) Create on S3 DatabaseOps
open_with_s3(s3_path, s3_config) Open from S3 DatabaseOps
insert_json(json_data) Insert data from JSON u64 (rows inserted)
insert_buffered_json(json_data) Buffered insert u64 (rows inserted)
flush_write_buffer() Flush buffered writes None
merge_json(json_data, key_column) MERGE (UPSERT) operation str (JSON metrics)
query(sql) Execute SQL query list[Row]
query_json(sql) Execute SQL query str (JSON results)
query_version(sql, version) Time travel query by version list[Row]
query_version_json(sql, version) Time travel query by version str (JSON results)
query_timestamp(sql, timestamp_ms) Time travel query by timestamp list[Row]
query_timestamp_json(sql, timestamp_ms) Time travel query by timestamp str (JSON results)
delete_rows_where(condition) Delete rows by condition u64 (rows deleted)
optimize() Compact Parquet files None
optimize_with_target_size(target_size_bytes) Compact with target size None
optimize_with_filter(filter) Compact with filter None
vacuum(retention_hours) Remove old files None
vacuum_dry_run(retention_hours) Preview files to remove list[str]
zorder(columns) Multi-dimensional clustering None
get_data_skipping_stats() Get skipping statistics DataSkippingStats
get_metrics() Get real-time metrics DatabaseMetrics
health_check() Health check HealthStatus
get_schema() Get database schema Schema
primary_key() Get primary key column `str
set_primary_key(column_name) Persist primary key metadata None
backup(path) Full backup None
backup_incremental(base_backup_path, incremental_path) Incremental backup None

Schema

Database schema definition.

from posixlake import Schema, Field

schema = Schema(fields=[
    Field(name="id", data_type="Int32", nullable=False),
    Field(name="name", data_type="String", nullable=False),
    Field(name="age", data_type="Int32", nullable=True),
    Field(name="salary", data_type="Float64", nullable=True),
], primary_key="id")

Supported Data Types

Primitive Types:

  • Int8, Int16, Int32, Int64
  • UInt8, UInt16, UInt32, UInt64
  • Float32, Float64
  • String, LargeUtf8, Binary, LargeBinary
  • Boolean
  • Date32, Date64
  • Timestamp

Complex Types:

  • Decimal128(precision,scale) - e.g., Decimal128(10,2) for currency
  • List<ElementType> - e.g., List<Int32>, List<String>
  • Map<KeyType,ValueType> - e.g., Map<String,Int64>
  • Struct<field1:Type1,field2:Type2> - e.g., Struct<x:Int32,y:Int32>

Field

Schema field definition.

# Simple types
Field(name="id", data_type="Int32", nullable=False)
Field(name="price", data_type="Decimal128(10,2)", nullable=False)

# Complex types
Field(name="tags", data_type="List<String>", nullable=True)
Field(name="metadata", data_type="Map<String,String>", nullable=True)
Field(name="address", data_type="Struct<city:String,zip:Int32>", nullable=True)

NfsServer

NFS server for POSIX filesystem access.

nfs = NfsServer(db, port=12049)
nfs.is_ready()  # Check if server is ready
nfs.shutdown()  # Shutdown server

S3Config

S3 configuration for object storage backend.

s3_config = S3Config(
    endpoint="http://localhost:9000",
    access_key_id="minioadmin",
    secret_access_key="minioadmin",
    region="us-east-1"
)

PosixLakeError

Exception class for all posixlake errors.

from posixlake import PosixLakeError

try:
    db.insert_json(data)
except PosixLakeError as e:
    print(f"Error: {e}")

Error Types

  • PosixLakeError.IoError - I/O operations
  • PosixLakeError.SerializationError - JSON/Arrow serialization
  • PosixLakeError.DeltaLakeError - Delta Lake operations
  • PosixLakeError.InvalidOperation - Invalid operations
  • PosixLakeError.QueryError - SQL query errors
  • PosixLakeError.AuthenticationError - Authentication failures
  • PosixLakeError.PermissionDenied - Permission errors
  • PosixLakeError.SchemaError - Schema-related errors
  • PosixLakeError.VersionError - Version conflicts
  • PosixLakeError.StorageError - Storage backend errors
  • PosixLakeError.NetworkError - Network operations
  • PosixLakeError.TimeoutError - Operation timeouts
  • PosixLakeError.NotFound - Resource not found
  • PosixLakeError.AlreadyExists - Resource already exists

Performance

Buffered Inserts

10x performance improvement for small batch writes:

# Regular insert: 100 separate Delta Lake transactions
for i in range(100):
    db.insert_json(f'[{{"id": {i}, "name": "User_{i}"}}]')
# Time: ~5-10 seconds (50-100ms per transaction)

# Buffered insert: ~1-2 batched transactions
for i in range(100):
    db.insert_buffered_json(f'[{{"id": {i}, "name": "User_{i}"}}]')
db.flush_write_buffer()
# Time: ~0.5-1 second (10x faster!)

How It Works:

  • Buffers multiple small writes in memory
  • Auto-flushes at 1000 rows (configurable in Rust)
  • Batches all buffered data into fewer Delta Lake transactions
  • Reduces transaction overhead significantly

Efficient Operations

  • Optimized data transfer between Rust and Python
  • Arrow RecordBatches shared efficiently
  • Minimal memory copying for large datasets

Async Operations

  • Operations run on async runtime
  • Synchronous Python API for ease of use
  • Optimal concurrency for I/O-bound workloads

Error Handling

All Rust errors are properly mapped to Python exceptions:

from posixlake import PosixLakeError

try:
    db = DatabaseOps.create("/path/to/db", schema)
    db.insert_json(data)
    results = db.query_json("SELECT * FROM data")
except PosixLakeError.IoError as e:
    print(f"I/O error: {e}")
except PosixLakeError.SerializationError as e:
    print(f"Serialization error: {e}")
except PosixLakeError.DeltaLakeError as e:
    print(f"Delta Lake error: {e}")
except PosixLakeError.InvalidOperation as e:
    print(f"Invalid operation: {e}")
except PosixLakeError as e:
    print(f"posixlake error: {e}")

Error Types:

  • All errors inherit from PosixLakeError
  • Specific error types for different failure modes
  • Comprehensive error messages with context
  • Stack traces preserved from Rust

Architecture

System Overview

┌─────────────────────────────────────────┐
│  Python Application                     │
│  from posixlake import DatabaseOps      │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│  Python API Layer                       │
│  • Type conversion                      │
│  • Error handling                       │
│  • Async runtime bridge                 │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│  Rust Library (.so/.dylib/.dll)         │
│  • DatabaseOps                          │
│  • Delta Lake operations                │
│  • DataFusion SQL engine                │
│  • NFS server                           │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│  Delta Lake Protocol                    │
│  • ACID transactions                    │
│  • Time travel                          │
│  • Parquet storage                      │
└─────────────────────────────────────────┘

Key Features:

  • Type Safety: Automatic type conversion between Rust and Python
  • Error Handling: Comprehensive error mapping to Python exceptions
  • Efficient Data Transfer: Optimized data sharing via Arrow
  • Async Support: Async runtime for optimal performance
  • Memory Safety: Rust's memory safety guarantees

Storage Backends

posixlake Python bindings support multiple storage backends:

  • Local Filesystem: Standard directory paths
  • S3/MinIO: Object storage with S3-compatible API
  • Unified API: Same Python code works with both

What Makes This Awesome

  1. Performance: Rust-powered engine with buffered inserts (~10x faster for small batches)
  2. No Special Drivers: NFS server uses OS built-in NFS client - zero installation
  3. Unix Commands Trigger Delta Operations: cat queries data, grep searches, echo >> triggers INSERT, sed -i triggers MERGE (UPDATE/DELETE) - all as ACID transactions
  4. Standard Tools: grep, awk, sed, wc, sort work on your data lake and trigger Delta Lake operations - no special libraries needed
  5. Smart Batching: Auto-flushes at 1000 rows, reducing transaction overhead
  6. Delta Lake Compatible: Tables readable by Spark, Databricks, and Athena immediately
  7. Robust: Comprehensive error handling, async support, and testing
  8. Type Safety: Complete type hints and comprehensive error handling
  9. Efficient: Optimized data transfer with minimal overhead
  10. Unified Storage: Same API works with local filesystem and S3

Use Unix commands to query and trigger Delta Lake operations - cat queries Parquet data, grep searches, echo >> triggers INSERT transactions, sed -i triggers MERGE (UPDATE/DELETE) transactions. No special libraries, no drivers, just mount and use standard Unix tools. Plus buffered inserts for 10x performance when loading many small batches.


License

Apache License 2.0

Copyright 2025 posixlake Contributors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.

See LICENSE.md for the full license text.


Contributing

Contributions welcome! Please follow these guidelines:

  1. Write tests first - TDD approach for all features
  2. Run full suite - Ensure all tests pass
  3. Update documentation - Keep README and docs up to date
  4. Commit messages - Use conventional commits

Acknowledgments

Built with:


Questions? Open an issue

Like this project? Star the repo and share with your data engineering team!

PyPI Package: https://pypi.org/project/posixlake/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

posixlake-0.4.0.tar.gz (55.3 MB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

posixlake-0.4.0-cp311-cp311-win_amd64.whl (46.7 MB view details)

Uploaded CPython 3.11Windows x86-64

posixlake-0.4.0-cp311-cp311-manylinux2014_x86_64.whl (55.7 MB view details)

Uploaded CPython 3.11

posixlake-0.4.0-cp311-cp311-macosx_11_0_universal2.whl (48.9 MB view details)

Uploaded CPython 3.11macOS 11.0+ universal2 (ARM64, x86-64)

posixlake-0.4.0-cp311-cp311-macosx_10_12_universal2.whl (52.1 MB view details)

Uploaded CPython 3.11macOS 10.12+ universal2 (ARM64, x86-64)

File details

Details for the file posixlake-0.4.0.tar.gz.

File metadata

  • Download URL: posixlake-0.4.0.tar.gz
  • Upload date:
  • Size: 55.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for posixlake-0.4.0.tar.gz
Algorithm Hash digest
SHA256 545b74a8016a8ee76ef5f523d9df649ab47b9d9535b26ad35aa061afc860084a
MD5 96e64902fc6a0d6ad01b7ae1647dd2a5
BLAKE2b-256 09bc6362e09206f32cabe92677a4c53c43f85d28a71e59afe043c94ac4cf4e29

See more details on using hashes here.

File details

Details for the file posixlake-0.4.0-cp311-cp311-win_amd64.whl.

File metadata

  • Download URL: posixlake-0.4.0-cp311-cp311-win_amd64.whl
  • Upload date:
  • Size: 46.7 MB
  • Tags: CPython 3.11, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for posixlake-0.4.0-cp311-cp311-win_amd64.whl
Algorithm Hash digest
SHA256 f94b81268dabfb60d15ba2cdf3dc560d6bf08a126412aaf82e9564c2fdd485a5
MD5 8077af0b6742678fd52f3edc91bf8fb0
BLAKE2b-256 bcef2b6e894ada5889adcde23dc8c62ca9866394738ebdb1dca1a13f94fbdb62

See more details on using hashes here.

File details

Details for the file posixlake-0.4.0-cp311-cp311-manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for posixlake-0.4.0-cp311-cp311-manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 21fccd9c4128e52b87fb5fcfb70d8093d95500db65c44df5544236fef70276d3
MD5 739a38ebbc753950474674b7b2a9e994
BLAKE2b-256 c1dbbc0f8fc713d1a5e9fcd55d89b426ab94925fb4906ba7fa654a9eca1311ff

See more details on using hashes here.

File details

Details for the file posixlake-0.4.0-cp311-cp311-macosx_11_0_universal2.whl.

File metadata

File hashes

Hashes for posixlake-0.4.0-cp311-cp311-macosx_11_0_universal2.whl
Algorithm Hash digest
SHA256 03a6498581b6059e5ece1e672303ef8fc7c8712b48776896863ef85e0ea2898d
MD5 5e59ca04d9fb111caa52d29e3c57b0e4
BLAKE2b-256 221f6bc3d1703719f3fd866cf226e3c40edf4423448b223d1f0ede23b3f41aed

See more details on using hashes here.

File details

Details for the file posixlake-0.4.0-cp311-cp311-macosx_10_12_universal2.whl.

File metadata

File hashes

Hashes for posixlake-0.4.0-cp311-cp311-macosx_10_12_universal2.whl
Algorithm Hash digest
SHA256 288e9135fa0a72a447e919286769492c728b88fa34992b5aec8afd4c0a65b777
MD5 19ed83f60beb7a6ee68600d34000b33e
BLAKE2b-256 be54f9a98438ec666a50762cb1cfb1b6775406859a65cfe1c0b4ab9ec8efce7a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page