Skip to main content

A lightweight, wrapper around Google Cloud BigQuery with Polars integration. Simplifies querying and data ingestion with a unified interface, supporting read, write, insert, and delete operations on BigQuery tables.

Project description

zbq

A lightweight, enhanced wrapper around Google Cloud BigQuery and Storage with Polars integration. Simplifies querying and data operations with a unified interface, supporting read, write, insert, delete operations on BigQuery tables, and advanced file upload/download with pattern matching, parallel processing, and comprehensive error handling.

Features

BigQuery Operations

  • Parameter substitution - Secure @param_name syntax with automatic type handling and SQL injection protection
  • Conditional queries - Optional @condition parameters for dynamic WHERE clauses
  • List parameters - Support for arrays/lists in IN clauses and BigQuery arrays
  • Dry run mode - Preview final queries with parameter substitution before execution
  • Transparent BigQuery client initialization with automatic project and credentials detection
  • Use Polars DataFrames seamlessly for input/output
  • Unified methods for CRUD operations with SQL and DataFrame inputs
  • Supports table creation, overwrite warnings, and write mode control
  • Context manager support for client lifecycle management
  • Enhanced error handling with custom exceptions and retry logic

Storage Operations

  • Advanced pattern matching - Multiple include/exclude patterns, regex support, case-insensitive matching
  • Parallel uploads/downloads - Configurable thread pool for better performance
  • Built-in progress bars - Automatic visual progress tracking with tqdm
  • Progress tracking - Built-in callbacks and detailed operation statistics
  • Dry-run mode - Preview operations without executing
  • Retry logic - Automatic retry with exponential backoff for failed operations
  • Comprehensive logging - Structured logging with configurable levels
  • Operation results - Detailed statistics including file counts, bytes transferred, duration, and errors

Installation

pip install zbq

Quick Start

BigQuery Operations

from zbq import zclient

# Simple query
df = zclient.read("SELECT * FROM `project.dataset.table` LIMIT 1000")
print(df)

# Parameterized query (recommended)
df = zclient.read(
    "SELECT * FROM @table_name WHERE region = @region LIMIT @limit",
    parameters={
        "table_name": "project.dataset.table",
        "region": "US", 
        "limit": 1000
    }
)

# Write DataFrame to BigQuery
result = zclient.write(
    df=df,
    full_table_path="project.dataset.new_table", 
    write_type="truncate",  # or "append"
    warning=True
)

# CRUD operations with parameters
zclient.insert(
    "INSERT INTO @table_name (col1, col2) VALUES (@val1, @val2)",
    parameters={"table_name": "project.dataset.table", "val1": "data", "val2": 123}
)
zclient.update(
    "UPDATE @table_name SET col = @new_value WHERE id = @target_id",
    parameters={"table_name": "project.dataset.table", "new_value": "updated", "target_id": 1}
)
zclient.delete(
    "DELETE FROM @table_name WHERE id IN (@ids)",
    parameters={"table_name": "project.dataset.table", "ids": [1, 2, 3]}
)

# Context manager support - automatic cleanup
with zclient as client:
    # Test query with dry_run first
    client.read(
        "SELECT * FROM @table WHERE date >= @start_date",
        parameters={"table": "project.dataset.table1", "start_date": "2024-01-01"},
        dry_run=True
    )
    
    # Execute actual queries
    df1 = client.read(
        "SELECT * FROM @table WHERE date >= @start_date",
        parameters={"table": "project.dataset.table1", "start_date": "2024-01-01"}
    )
    df2 = client.read("SELECT * FROM table2") 
    result = client.write(df1, "project.dataset.output_table")
# Client automatically cleaned up after context

Parameter Substitution & Advanced Features

Parameter Substitution

zbq supports parameterized queries using @param_name syntax for secure and flexible SQL execution:

from zbq import zclient

# Basic parameter substitution
df = zclient.read(
    """SELECT * FROM `project.dataset.table` 
    WHERE region = @region AND status = @status""",
    parameters={
        "region": "US",           # Strings are automatically quoted
        "status": "active"
    }
)

# Multiple data types
df = zclient.read(
    """SELECT * FROM hotels 
    WHERE hotel_id = @id 
    AND price > @min_price 
    AND is_available = @available
    AND last_updated > @date""",
    parameters={
        "id": 123,                    # Numbers: no quotes
        "min_price": 99.99,          # Floats: no quotes  
        "available": True,            # Booleans: TRUE/FALSE
        "date": None                 # None: NULL
    }
)

# Table identifiers (automatic backticks)
df = zclient.read(
    "SELECT * FROM @table_name WHERE region = @region",
    parameters={
        "table_name": "project.dataset.hotels",  # Becomes `project.dataset.hotels`
        "region": "US"                           # Becomes 'US'
    }
)

List Parameters for IN Clauses

Use lists and tuples for IN clauses and array operations:

# IN clause with list
df = zclient.read(
    "SELECT * FROM hotels WHERE hotel_code IN (@codes)",
    parameters={
        "codes": ['ABC', 'DEF', 'GHI']  # Becomes ('ABC', 'DEF', 'GHI')
    }
)

# Numeric lists
df = zclient.read(
    "SELECT * FROM bookings WHERE booking_id IN (@ids)", 
    parameters={
        "ids": [123, 456, 789]  # Becomes (123, 456, 789)
    }
)

# Array syntax (BigQuery)
df = zclient.read(
    "SELECT * FROM table WHERE col = ANY([@values])",
    parameters={
        "values": ['a', 'b', 'c']  # Becomes ['a', 'b', 'c']
    }
)

# Mixed data types in lists
df = zclient.read(
    "SELECT * FROM table WHERE col IN (@mixed)",
    parameters={
        "mixed": ['text', 123, None, True]  # Becomes ('text', 123, NULL, TRUE)
    }
)

Conditional Parameters

Use @condition parameters for optional WHERE clauses:

# With condition
df = zclient.read(
    """SELECT * FROM hotels 
    WHERE region = @region 
    @condition""",
    parameters={
        "region": "US",
        "condition": "AND hotel_rating >= 4"  # Raw SQL inserted
    }
)
# Result: WHERE region = 'US' AND hotel_rating >= 4

# Without condition (automatically removed)
df = zclient.read(
    """SELECT * FROM hotels 
    WHERE region = @region 
    @condition""",
    parameters={
        "region": "US"
        # No condition parameter - @condition is removed entirely
    }
)
# Result: WHERE region = 'US'

Dry Run Mode

Preview your final query with parameter substitution without executing:

# Preview query with dry_run
result = zclient.read(
    """SELECT * FROM `project.dataset.hotels`
    WHERE region = @region 
    AND hotel_codes IN (@codes)
    @condition""",
    parameters={
        "region": "US",
        "codes": ['ABC', 'DEF'], 
        "condition": "AND rating >= 4"
    },
    dry_run=True  # Prints query, returns None
)

# Output:
# DRY RUN - Query that would be executed:
# --------------------------------------------------
# SELECT * FROM `project.dataset.hotels`
# WHERE region = 'US' 
# AND hotel_codes IN ('ABC', 'DEF')
# AND rating >= 4
# --------------------------------------------------

Storage Operations

Basic Upload/Download

from zbq import zstorage

# Simple upload with pattern - files go to bucket root
result = zstorage.upload(
    local_dir="./data",
    bucket_path="my-bucket",
    include_patterns="*.xlsx"  # Upload only Excel files
)

# Upload to specific folder in bucket
result = zstorage.upload(
    local_dir="./data", 
    bucket_path="my-bucket/reports/2024",  # Upload to reports/2024/ folder
    include_patterns="*.xlsx"
)

print(f"Uploaded {result.uploaded_files}/{result.total_files} files")
print(f"Total size: {result.total_bytes:,} bytes in {result.duration:.2f}s")

# Context manager support for batch operations
with zstorage as storage:
    # Upload multiple directories in sequence
    result1 = storage.upload("./data1", "my-bucket/folder1", include_patterns="*.csv")
    result2 = storage.upload("./data2", "my-bucket/folder2", include_patterns="*.json")
    result3 = storage.download("my-bucket/archive", "./downloads", include_patterns="*.parquet")
# Storage client automatically cleaned up

# Simple download from bucket root
result = zstorage.download(
    bucket_path="my-bucket", 
    local_dir="./downloads",
    include_patterns="*.csv"  # Download only CSV files
)

# Download from specific folder in bucket
result = zstorage.download(
    bucket_path="my-bucket/data/exports",  # Download from data/exports/ folder
    local_dir="./downloads",
    include_patterns="*.csv"
)

Advanced Pattern Matching

# Multiple include patterns
result = zstorage.upload(
    local_dir="./reports",
    bucket_path="my-bucket/reports", 
    include_patterns=["*.xlsx", "*.csv", "*.json"],  # Multiple file types
    exclude_patterns=["temp_*", "*_backup.*"],       # Exclude temporary/backup files
    case_sensitive=False  # Case-insensitive matching
)

# Regex patterns for complex matching
result = zstorage.upload(
    local_dir="./logs",
    bucket_path="my-bucket/logs",
    include_patterns=r"log_\d{4}-\d{2}-\d{2}\.txt",  # Match log_YYYY-MM-DD.txt
    use_regex=True
)

Parallel Processing & Progress Tracking

# Automatic progress bar (shows for multiple files)
result = zstorage.upload(
    local_dir="./large-dataset", 
    bucket_path="my-bucket",
    include_patterns="*.xlsx",
    parallel=True,                    # Enable parallel uploads
    max_retries=5                     # Retry failed uploads
)
# Shows: "Uploading: 75%|███████▌  | 15/20 [00:30<00:10, 0.5files/s]"

# Custom progress callback (optional)
def progress_callback(completed, total):
    percentage = (completed / total) * 100
    print(f"Custom progress: {completed}/{total} files ({percentage:.1f}%)")

result = zstorage.upload(
    local_dir="./large-dataset", 
    bucket_path="my-bucket",
    progress_callback=progress_callback,
    show_progress=False               # Disable built-in progress bar
)

# Handle results
if result.failed_files > 0:
    print(f"WARNING: {result.failed_files} files failed to upload:")
    for error in result.errors:
        print(f"  - {error}")

print(f"Successfully uploaded {result.uploaded_files} files")
print(f"Total: {result.total_bytes:,} bytes in {result.duration:.2f}s")

Dry Run & Preview

# Preview what would be uploaded without actually uploading
result = zstorage.upload(
    local_dir="./data",
    bucket_path="my-bucket", 
    include_patterns="*.parquet",
    dry_run=True  # Preview only
)

print(f"Would upload {result.total_files} files ({result.total_bytes:,} bytes)")

# Progress bar control
result = zstorage.upload(
    local_dir="./data",
    bucket_path="my-bucket",
    include_patterns="*.xlsx", 
    show_progress=True     # Force show progress bar even for single files
)

result = zstorage.upload(
    local_dir="./data",
    bucket_path="my-bucket", 
    include_patterns="*.xlsx",
    show_progress=False    # Never show progress bar
)

Advanced Download with Filtering

# Download with path filtering and patterns
result = zstorage.download(
    bucket_path="my-data-bucket/reports/2024",  # Only files under this path
    local_dir="./downloaded-reports", 
    include_patterns=["*.xlsx", "*.pdf"],
    exclude_patterns="*_draft.*",     # Skip draft files
    parallel=True,
    max_results=500                   # Limit number of files to list
)

Advanced Configuration

Custom Logging

from zbq import setup_logging, StorageHandler, BigQueryHandler

# Configure logging
logger = setup_logging("DEBUG")  # DEBUG, INFO, WARNING, ERROR

# Create handlers with custom settings
storage = StorageHandler(
    project_id="my-project",
    log_level="INFO", 
    max_workers=8  # More parallel workers
)

bq = BigQueryHandler(
    project_id="my-project",
    default_timeout=600,  # 10 minute timeout
    log_level="DEBUG"
)

Error Handling

from zbq import ZbqAuthenticationError, ZbqOperationError, ZbqConfigurationError

try:
    result = zstorage.upload("./data", "my-bucket", include_patterns="*.csv")
except ZbqAuthenticationError:
    print("Authentication failed. Run: gcloud auth application-default login")
except ZbqConfigurationError:
    print("Configuration error. Check your project settings.")
except ZbqOperationError as e:
    print(f"Operation failed: {e}")

Working with Results

from zbq import UploadResult, DownloadResult

# Upload with detailed result handling
result: UploadResult = zstorage.upload(
    local_dir="./data",
    bucket_name="my-bucket",
    include_patterns=["*.json", "*.csv"]
)

# Detailed statistics
print(f"""
Upload Summary:
Total files: {result.total_files}
Uploaded: {result.uploaded_files} 
Skipped: {result.skipped_files}
Failed: {result.failed_files}
Total size: {result.total_bytes:,} bytes
Duration: {result.duration:.2f} seconds
""")

# Handle errors
if result.errors:
    print("Errors encountered:")
    for error in result.errors[:5]:  # Show first 5 errors
        print(f"  - {error}")
    if len(result.errors) > 5:
        print(f"  ... and {len(result.errors) - 5} more errors")

Pattern Matching Guide

Glob Patterns (Default)

  • *.xlsx - All Excel files
  • data_*.csv - CSV files starting with "data_"
  • report_????_??.pdf - Reports with specific naming pattern
  • **/*.json - All JSON files in subdirectories (recursive)
  • [!.]*.txt - Text files not starting with dot

Regex Patterns

# Enable regex with use_regex=True
zstorage.upload(
    local_dir="./logs",
    bucket_name="my-bucket", 
    include_patterns=[
        r"access_log_\d{4}-\d{2}-\d{2}\.log",  # access_log_2024-01-01.log
        r"error_log_\d{8}\.log"                # error_log_20240101.log  
    ],
    use_regex=True
)

Complex Filtering

# Include multiple types, exclude temp files
zstorage.upload(
    local_dir="./workspace",
    bucket_name="my-bucket",
    include_patterns=["*.py", "*.json", "*.md", "*.yml"],
    exclude_patterns=["__pycache__/*", "*.pyc", "temp_*", ".git/*"],
    case_sensitive=False
)

Authentication & Setup

  1. Install Google Cloud SDK:

    gcloud auth application-default login
    gcloud config set project YOUR_PROJECT_ID
    
  2. Or set environment variables:

    export GOOGLE_APPLICATION_CREDENTIALS="path/to/service-account.json" 
    export GOOGLE_CLOUD_PROJECT="your-project-id"
    

Requirements

  • Python ≥ 3.11
  • Google Cloud project with BigQuery and/or Storage APIs enabled
  • Appropriate IAM permissions for your operations

Performance Tips

  1. Use parallel processing for multiple files: parallel=True
  2. Adjust thread count based on your system: max_workers=8
  3. Use dry-run to preview large operations first
  4. Filter early with specific patterns to avoid processing unwanted files
  5. Monitor progress with callback functions for long operations

Contributing

Issues and pull requests welcome at the project repository.

License

See LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zbq-0.0.1.tar.gz (34.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zbq-0.0.1-py3-none-any.whl (16.3 kB view details)

Uploaded Python 3

File details

Details for the file zbq-0.0.1.tar.gz.

File metadata

  • Download URL: zbq-0.0.1.tar.gz
  • Upload date:
  • Size: 34.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.3

File hashes

Hashes for zbq-0.0.1.tar.gz
Algorithm Hash digest
SHA256 b79d42b504814497de9df5f1e75947f0262736914c8ea2466a370feb3ac698a8
MD5 90094a718c667e2679101f0897a08150
BLAKE2b-256 ff1ed0416021894ad28079eba6450f7eb40d0f3c81cadd3ce38bc0495e47ba7b

See more details on using hashes here.

File details

Details for the file zbq-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: zbq-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 16.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.3

File hashes

Hashes for zbq-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2195e1d7234a45c4c18ca1d4f5e939ee1742042c4112357ff38fd137fb36dd23
MD5 4603f818e6b6dc71993924bf6e8fe5ef
BLAKE2b-256 fe9809bb42763cf031dcf2433af0d8eb98644f784de89e322bd9cffa09aa0cae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page