Skip to main content

Build and run queries against data

Project description

PathsData - Enhanced DataFusion with Iceberg Support

Python 3.9+ Apache License

PathsData provides a simplified, enhanced interface for working with Apache Iceberg tables using DataFusion, specifically optimized for modern data workflows.

🚀 Key Features

  • Zero Configuration: Automatic Iceberg support with no manual setup
  • Enhanced S3 Credentials: Comprehensive AWS credential discovery
  • Simplified API: Clean, intuitive interface compared to raw DataFusion
  • Multi-Region Support: Easy AWS region configuration
  • Multiple Catalog Types: Support for AWS Glue, File-based, and REST catalogs
  • Full Compatibility: All DataFusion functionality remains available

📦 Installation

pip install pathsdata

🏃‍♂️ Quick Start

Basic Usage

from pathsdata import SessionContext

# Create context (automatically Iceberg-enabled)
ctx = SessionContext()

# Create and register catalog in one step (Default: Iceberg Glue Catalog)
catalog = ctx.create_catalog("warehouse", region="us-east-1")

# Create table (DDL)
catalog.execute_ddl(ctx, """
    CREATE EXTERNAL TABLE warehouse.default.products (
        id BIGINT,
        name VARCHAR,
        price DECIMAL(10,2),
        category VARCHAR
    ) STORED AS ICEBERG
    LOCATION 's3://your-bucket/warehouse/products'
""")

# Insert data (DML)
ctx.sql("""
    INSERT INTO warehouse.default.products VALUES
    (1, 'Laptop', 999.99, 'Electronics'),
    (2, 'Book', 29.99, 'Education')
""").collect()

# Query data
df = ctx.sql("SELECT * FROM warehouse.default.products WHERE price > 100")
results = df.collect()
print(f"Found {len(results)} products")

🔧 Core Components

SessionContext

The enhanced SessionContext is automatically configured with Iceberg support:

from pathsdata import SessionContext

# Pre-configured with IcebergQueryPlanner and enhanced S3 credentials
ctx = SessionContext()

# Create catalog with simplified API
catalog = ctx.create_catalog("warehouse", region="us-east-1")

Catalog Types

GlueCatalog (Default)

from pathsdata import GlueCatalog

# AWS Glue-based catalog
catalog = GlueCatalog("warehouse", region="us-east-1")

FileCatalog

from pathsdata import FileCatalog

# File-based catalog for local/distributed filesystems
catalog = FileCatalog("/path/to/catalog")

RestCatalog

from pathsdata import RestCatalog

# REST API-based catalog
catalog = RestCatalog("https://catalog.example.com", access_token="token123")

🔐 AWS Credential Discovery

PathsData automatically discovers AWS credentials from multiple sources:

  1. Environment Variables

    export AWS_ACCESS_KEY_ID=your_access_key
    export AWS_SECRET_ACCESS_KEY=your_secret_key
    export AWS_SESSION_TOKEN=your_session_token  # Optional
    
  2. AWS Credentials File (~/.aws/credentials)

    [default]
    aws_access_key_id = your_access_key
    aws_secret_access_key = your_secret_key
    
  3. EC2 Instance Profile - Automatic when running on EC2

  4. ECS Task Role - Automatic when running in ECS containers

  5. Anonymous Access - Fallback for public buckets

📚 Complete Examples

E-commerce Analytics Pipeline

from pathsdata import SessionContext

def analyze_sales():
    # Setup
    ctx = SessionContext()
    catalog = ctx.create_catalog("analytics", region="us-east-1")

    # Create sales table
    catalog.execute_ddl(ctx, """
        CREATE EXTERNAL TABLE analytics.default.sales (
            order_id BIGINT,
            customer_id BIGINT,
            product_id BIGINT,
            amount DECIMAL(10,2),
            order_date DATE
        ) STORED AS ICEBERG
        LOCATION 's3://analytics-bucket/sales'
    """)

    # Load sample data
    ctx.sql("""
        INSERT INTO analytics.default.sales VALUES
        (1001, 501, 101, 299.99, '2024-01-15'),
        (1002, 502, 102, 599.99, '2024-01-16'),
        (1003, 503, 103, 149.99, '2024-02-01')
    """).collect()

    # Monthly sales analysis
    monthly_sales = ctx.sql("""
        SELECT
            DATE_TRUNC('month', order_date) as month,
            COUNT(*) as total_orders,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM analytics.default.sales
        GROUP BY DATE_TRUNC('month', order_date)
        ORDER BY month
    """)

    return monthly_sales.collect()

# Execute analysis
results = analyze_sales()
for row in results:
    print(f"Month: {row['month']}, Orders: {row['total_orders']}, Revenue: ${row['total_revenue']}")

Multi-Catalog Data Pipeline

from pathsdata import SessionContext, GlueCatalog, FileCatalog

def setup_multi_catalog_pipeline():
    ctx = SessionContext()

    # Production data in AWS Glue
    prod_catalog = GlueCatalog("production", "us-east-1")
    ctx.register_catalog_provider("prod", prod_catalog._internal)

    # Development data in local files
    dev_catalog = FileCatalog("/path/to/dev/catalog")
    ctx.register_catalog_provider("dev", dev_catalog._internal)

    # Cross-catalog query
    comparison = ctx.sql("""
        SELECT
            'production' as environment,
            COUNT(*) as record_count
        FROM prod.default.events

        UNION ALL

        SELECT
            'development' as environment,
            COUNT(*) as record_count
        FROM dev.default.events
    """)

    return comparison.collect()

# Run pipeline
results = setup_multi_catalog_pipeline()
for row in results:
    print(f"{row['environment']}: {row['record_count']} records")

Multi-Region Data Access

from pathsdata import SessionContext

def setup_global_data_access():
    ctx = SessionContext()

    # US East data
    us_catalog = ctx.create_catalog("us_data", region="us-east-1")

    # EU West data
    eu_catalog = ctx.create_catalog("eu_data", region="eu-west-1")

    # Query both regions
    global_summary = ctx.sql("""
        SELECT 'US' as region, COUNT(*) as user_count
        FROM us_data.default.users

        UNION ALL

        SELECT 'EU' as region, COUNT(*) as user_count
        FROM eu_data.default.users
    """)

    return global_summary.collect()

🛠️ Advanced Usage

Custom Session Configuration

from pathsdata import SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig

# Custom configuration (optional)
config = SessionConfig()
runtime = RuntimeEnvBuilder()

ctx = SessionContext(config=config, runtime=runtime)

Error Handling

from pathsdata import SessionContext

ctx = SessionContext()

try:
    catalog = ctx.create_catalog("warehouse", region="us-east-1")

    # DDL operations
    catalog.execute_ddl(ctx, create_table_sql)
    print("✓ Table created successfully")

    # DML operations
    result = ctx.sql(insert_sql)
    batches = result.collect()
    print(f"✓ Inserted data in {len(batches)} batches")

except Exception as e:
    error_str = str(e).lower()
    if "credentials" in error_str or "service error" in error_str:
        print("⚠️ AWS credentials/permissions issue - check your AWS setup")
    else:
        print(f"❌ Error: {e}")

🆘 Troubleshooting

Common Issues

1. AWS Credential Errors

Error: service error: ... credentials

Solution: Configure AWS credentials using one of the supported methods above.

2. Region Mismatch

Error: ... region 'us-east-1' ... bucket in 'us-west-2'

Solution: Ensure catalog region matches your S3 bucket region.

3. Import Errors

ImportError: No module named 'pathsdata'

Solution: Install the package with pip install pathsdata.

Performance Tips

  1. Use appropriate regions: Create catalogs in the same region as your data
  2. Batch operations: Combine multiple INSERT statements when possible
  3. Optimize queries: Use column pruning and predicate pushdown
  4. Connection pooling: Reuse SessionContext instances when possible

🔗 API Reference

Core Classes

  • SessionContext: Main entry point with automatic Iceberg support

    • __init__(config=None, runtime=None)
    • create_catalog(name, region="us-east-1", catalog_type="glue", **kwargs)
    • All standard DataFusion methods (sql, read_csv, read_parquet, etc.)
  • GlueCatalog: AWS Glue-based catalog

    • __init__(warehouse_name, region="us-east-1")
    • execute_ddl(ctx, sql)
  • FileCatalog: File-based catalog

    • __init__(path)
    • execute_ddl(ctx, sql)
  • RestCatalog: REST API-based catalog

    • __init__(endpoint, access_token=None)
    • execute_ddl(ctx, sql)

Good news: All existing DataFusion code continues to work! Refer to datafusion-python for more functionalities

📞 Support & Resources

  • Issues: Report bugs and feature requests on GitHub
  • Migration: Both PathsData and DataFusion APIs provide identical functionality
  • Community: Join the DataFusion community for discussions and support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pathsdata-48.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (37.9 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

pathsdata-48.0.4-cp39-abi3-macosx_11_0_arm64.whl (34.7 MB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

File details

Details for the file pathsdata-48.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for pathsdata-48.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 78b6d4273cc26944b4c0aad156e9f5ddb9689f3fff13beb3f906fcb7f53cd547
MD5 5fec5c20bc7dfdf079aa4f73e88d108f
BLAKE2b-256 c23c3e9371344cafaccd0eb30e8deb8fc87d9a219e7ffe638567b91ff3ef3337

See more details on using hashes here.

File details

Details for the file pathsdata-48.0.4-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for pathsdata-48.0.4-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 f1db205ece250072aa3c2294ce6ffc8dfb9f3f3b8221a2e8a479d379188717a7
MD5 2c8f0511212d2cfa4dc8f74c9a6cdeb2
BLAKE2b-256 9a962f52cd7a290ed36038b8e095aa35807897bc5eebd84691e8de5ea0b40a43

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page