Skip to main content

Build and run queries against data

Project description

PathsData - Enhanced DataFusion with Iceberg Support

Python 3.9+ Apache License

PathsData provides a simplified, enhanced interface for working with Apache Iceberg tables using DataFusion, specifically optimized for modern data workflows.

🚀 Key Features

  • Zero Configuration: Automatic Iceberg support with no manual setup
  • Enhanced S3 Credentials: Comprehensive AWS credential discovery
  • Simplified API: Clean, intuitive interface compared to raw DataFusion
  • Multi-Region Support: Easy AWS region configuration
  • Multiple Catalog Types: Support for AWS Glue, File-based, and REST catalogs
  • Full Compatibility: All DataFusion functionality remains available

📦 Installation

pip install pathsdata

🏃‍♂️ Quick Start

Basic Usage

from pathsdata import SessionContext

# Create context (automatically Iceberg-enabled)
ctx = SessionContext()

# Create and register catalog in one step (Default: Iceberg Glue Catalog)
catalog = ctx.create_catalog("warehouse", region="us-east-1")

# Create table (DDL)
catalog.execute_ddl(ctx, """
    CREATE EXTERNAL TABLE warehouse.default.products (
        id BIGINT,
        name VARCHAR,
        price DECIMAL(10,2),
        category VARCHAR
    ) STORED AS ICEBERG
    LOCATION 's3://your-bucket/warehouse/products'
""")

# Insert data (DML)
ctx.sql("""
    INSERT INTO warehouse.default.products VALUES
    (1, 'Laptop', 999.99, 'Electronics'),
    (2, 'Book', 29.99, 'Education')
""").collect()

# Query data
df = ctx.sql("SELECT * FROM warehouse.default.products WHERE price > 100")
results = df.collect()
print(f"Found {len(results)} products")

🔧 Core Components

SessionContext

The enhanced SessionContext is automatically configured with Iceberg support:

from pathsdata import SessionContext

# Pre-configured with IcebergQueryPlanner and enhanced S3 credentials
ctx = SessionContext()

# Create catalog with simplified API
catalog = ctx.create_catalog("warehouse", region="us-east-1")

Catalog Types

GlueCatalog (Default)

from pathsdata import GlueCatalog

# AWS Glue-based catalog
catalog = GlueCatalog("warehouse", region="us-east-1")

FileCatalog

from pathsdata import FileCatalog

# File-based catalog for local/distributed filesystems
catalog = FileCatalog("/path/to/catalog")

RestCatalog

from pathsdata import RestCatalog

# REST API-based catalog
catalog = RestCatalog("https://catalog.example.com", access_token="token123")

🔐 AWS Credential Discovery

PathsData automatically discovers AWS credentials from multiple sources:

  1. Environment Variables

    export AWS_ACCESS_KEY_ID=your_access_key
    export AWS_SECRET_ACCESS_KEY=your_secret_key
    export AWS_SESSION_TOKEN=your_session_token  # Optional
    
  2. AWS Credentials File (~/.aws/credentials)

    [default]
    aws_access_key_id = your_access_key
    aws_secret_access_key = your_secret_key
    
  3. EC2 Instance Profile - Automatic when running on EC2

  4. ECS Task Role - Automatic when running in ECS containers

  5. Anonymous Access - Fallback for public buckets

📚 Complete Examples

E-commerce Analytics Pipeline

from pathsdata import SessionContext

def analyze_sales():
    # Setup
    ctx = SessionContext()
    catalog = ctx.create_catalog("analytics", region="us-east-1")

    # Create sales table
    catalog.execute_ddl(ctx, """
        CREATE EXTERNAL TABLE analytics.default.sales (
            order_id BIGINT,
            customer_id BIGINT,
            product_id BIGINT,
            amount DECIMAL(10,2),
            order_date DATE
        ) STORED AS ICEBERG
        LOCATION 's3://analytics-bucket/sales'
    """)

    # Load sample data
    ctx.sql("""
        INSERT INTO analytics.default.sales VALUES
        (1001, 501, 101, 299.99, '2024-01-15'),
        (1002, 502, 102, 599.99, '2024-01-16'),
        (1003, 503, 103, 149.99, '2024-02-01')
    """).collect()

    # Monthly sales analysis
    monthly_sales = ctx.sql("""
        SELECT
            DATE_TRUNC('month', order_date) as month,
            COUNT(*) as total_orders,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM analytics.default.sales
        GROUP BY DATE_TRUNC('month', order_date)
        ORDER BY month
    """)

    return monthly_sales.collect()

# Execute analysis
results = analyze_sales()
for row in results:
    print(f"Month: {row['month']}, Orders: {row['total_orders']}, Revenue: ${row['total_revenue']}")

Multi-Catalog Data Pipeline

from pathsdata import SessionContext, GlueCatalog, FileCatalog

def setup_multi_catalog_pipeline():
    ctx = SessionContext()

    # Production data in AWS Glue
    prod_catalog = GlueCatalog("production", "us-east-1")
    ctx.register_catalog_provider("prod", prod_catalog._internal)

    # Development data in local files
    dev_catalog = FileCatalog("/path/to/dev/catalog")
    ctx.register_catalog_provider("dev", dev_catalog._internal)

    # Cross-catalog query
    comparison = ctx.sql("""
        SELECT
            'production' as environment,
            COUNT(*) as record_count
        FROM prod.default.events

        UNION ALL

        SELECT
            'development' as environment,
            COUNT(*) as record_count
        FROM dev.default.events
    """)

    return comparison.collect()

# Run pipeline
results = setup_multi_catalog_pipeline()
for row in results:
    print(f"{row['environment']}: {row['record_count']} records")

Multi-Region Data Access

from pathsdata import SessionContext

def setup_global_data_access():
    ctx = SessionContext()

    # US East data
    us_catalog = ctx.create_catalog("us_data", region="us-east-1")

    # EU West data
    eu_catalog = ctx.create_catalog("eu_data", region="eu-west-1")

    # Query both regions
    global_summary = ctx.sql("""
        SELECT 'US' as region, COUNT(*) as user_count
        FROM us_data.default.users

        UNION ALL

        SELECT 'EU' as region, COUNT(*) as user_count
        FROM eu_data.default.users
    """)

    return global_summary.collect()

🛠️ Advanced Usage

Custom Session Configuration

from pathsdata import SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig

# Custom configuration (optional)
config = SessionConfig()
runtime = RuntimeEnvBuilder()

ctx = SessionContext(config=config, runtime=runtime)

Error Handling

from pathsdata import SessionContext

ctx = SessionContext()

try:
    catalog = ctx.create_catalog("warehouse", region="us-east-1")

    # DDL operations
    catalog.execute_ddl(ctx, create_table_sql)
    print("✓ Table created successfully")

    # DML operations
    result = ctx.sql(insert_sql)
    batches = result.collect()
    print(f"✓ Inserted data in {len(batches)} batches")

except Exception as e:
    error_str = str(e).lower()
    if "credentials" in error_str or "service error" in error_str:
        print("⚠️ AWS credentials/permissions issue - check your AWS setup")
    else:
        print(f"❌ Error: {e}")

🆘 Troubleshooting

Common Issues

1. AWS Credential Errors

Error: service error: ... credentials

Solution: Configure AWS credentials using one of the supported methods above.

2. Region Mismatch

Error: ... region 'us-east-1' ... bucket in 'us-west-2'

Solution: Ensure catalog region matches your S3 bucket region.

3. Import Errors

ImportError: No module named 'pathsdata'

Solution: Install the package with pip install pathsdata.

Performance Tips

  1. Use appropriate regions: Create catalogs in the same region as your data
  2. Batch operations: Combine multiple INSERT statements when possible
  3. Optimize queries: Use column pruning and predicate pushdown
  4. Connection pooling: Reuse SessionContext instances when possible

🔗 API Reference

Core Classes

  • SessionContext: Main entry point with automatic Iceberg support

    • __init__(config=None, runtime=None)
    • create_catalog(name, region="us-east-1", catalog_type="glue", **kwargs)
    • All standard DataFusion methods (sql, read_csv, read_parquet, etc.)
  • GlueCatalog: AWS Glue-based catalog

    • __init__(warehouse_name, region="us-east-1")
    • execute_ddl(ctx, sql)
  • FileCatalog: File-based catalog

    • __init__(path)
    • execute_ddl(ctx, sql)
  • RestCatalog: REST API-based catalog

    • __init__(endpoint, access_token=None)
    • execute_ddl(ctx, sql)

Good news: All existing DataFusion code continues to work! Refer to datafusion-python for more functionalities

📞 Support & Resources

  • Issues: Report bugs and feature requests on GitHub
  • Migration: Both PathsData and DataFusion APIs provide identical functionality
  • Community: Join the DataFusion community for discussions and support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pathsdata-48.0.1.tar.gz (197.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pathsdata-48.0.1-cp39-abi3-manylinux_2_39_x86_64.whl (34.2 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.39+ x86-64

File details

Details for the file pathsdata-48.0.1.tar.gz.

File metadata

  • Download URL: pathsdata-48.0.1.tar.gz
  • Upload date:
  • Size: 197.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.9.4

File hashes

Hashes for pathsdata-48.0.1.tar.gz
Algorithm Hash digest
SHA256 68a8cbcb3592e32c86a650ada1bbfc98edf42d5ab5e2ee897b1a624ebd45ebf5
MD5 d44acd13b04b4ae7e57fc60a333c74e0
BLAKE2b-256 15ab5d20f74d6aa3b57361024f49208f48dadedc6e5c4e2ee761801ed78c0844

See more details on using hashes here.

File details

Details for the file pathsdata-48.0.1-cp39-abi3-manylinux_2_39_x86_64.whl.

File metadata

File hashes

Hashes for pathsdata-48.0.1-cp39-abi3-manylinux_2_39_x86_64.whl
Algorithm Hash digest
SHA256 2df24bd1d7909b369b83a29e7bb2ec5c60bb54d6df5c472a1d00837f3c8a7262
MD5 925ac7d1fc852c8a18016be700890ba5
BLAKE2b-256 4ae9a82a379363d86bfe70ac6123bb83f06b09e8c717c8ab6b0b1439b42cabfd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page