A lightweight, wrapper around Google Cloud BigQuery with Polars integration. Simplifies querying and data ingestion with a unified interface, supporting read, write, insert, and delete operations on BigQuery tables.
Project description
zbq
A lightweight, enhanced wrapper around Google Cloud BigQuery and Storage with Polars integration. Simplifies querying and data operations with a unified interface, supporting read, write, insert, delete operations on BigQuery tables, and advanced file upload/download with pattern matching, parallel processing, and comprehensive error handling.
Features
BigQuery Operations
- Parameter substitution - Secure
@param_namesyntax with automatic type handling and SQL injection protection - Conditional queries - Optional
@conditionparameters for dynamic WHERE clauses - List parameters - Support for arrays/lists in IN clauses and BigQuery arrays
- Dry run mode - Preview final queries with parameter substitution before execution
- Transparent BigQuery client initialization with automatic project and credentials detection
- Use Polars DataFrames seamlessly for input/output
- Unified methods for CRUD operations with SQL and DataFrame inputs
- Supports table creation, overwrite warnings, and write mode control
- Context manager support for client lifecycle management
- Enhanced error handling with custom exceptions and retry logic
Storage Operations
- Advanced pattern matching - Multiple include/exclude patterns, regex support, case-insensitive matching
- Parallel uploads/downloads - Configurable thread pool for better performance
- Built-in progress bars - Automatic visual progress tracking with tqdm
- Progress tracking - Built-in callbacks and detailed operation statistics
- Dry-run mode - Preview operations without executing
- Retry logic - Automatic retry with exponential backoff for failed operations
- Comprehensive logging - Structured logging with configurable levels
- Operation results - Detailed statistics including file counts, bytes transferred, duration, and errors
Installation
pip install zbq
Quick Start
BigQuery Operations
from zbq import zclient
# Simple query
df = zclient.read("SELECT * FROM `project.dataset.table` LIMIT 1000")
print(df)
# Parameterized query (recommended)
df = zclient.read(
"SELECT * FROM @table_name WHERE region = @region LIMIT @limit",
parameters={
"table_name": "project.dataset.table",
"region": "US",
"limit": 1000
}
)
# Write DataFrame to BigQuery
result = zclient.write(
df=df,
full_table_path="project.dataset.new_table",
write_type="truncate", # or "append"
warning=True
)
# CRUD operations with parameters
zclient.insert(
"INSERT INTO @table_name (col1, col2) VALUES (@val1, @val2)",
parameters={"table_name": "project.dataset.table", "val1": "data", "val2": 123}
)
zclient.update(
"UPDATE @table_name SET col = @new_value WHERE id = @target_id",
parameters={"table_name": "project.dataset.table", "new_value": "updated", "target_id": 1}
)
zclient.delete(
"DELETE FROM @table_name WHERE id IN (@ids)",
parameters={"table_name": "project.dataset.table", "ids": [1, 2, 3]}
)
# Context manager support - automatic cleanup
with zclient as client:
# Test query with dry_run first
client.read(
"SELECT * FROM @table WHERE date >= @start_date",
parameters={"table": "project.dataset.table1", "start_date": "2024-01-01"},
dry_run=True
)
# Execute actual queries
df1 = client.read(
"SELECT * FROM @table WHERE date >= @start_date",
parameters={"table": "project.dataset.table1", "start_date": "2024-01-01"}
)
df2 = client.read("SELECT * FROM table2")
result = client.write(df1, "project.dataset.output_table")
# Client automatically cleaned up after context
Parameter Substitution & Advanced Features
Parameter Substitution
zbq supports parameterized queries using @param_name syntax for secure and flexible SQL execution:
from zbq import zclient
# Basic parameter substitution
df = zclient.read(
"""SELECT * FROM `project.dataset.table`
WHERE region = @region AND status = @status""",
parameters={
"region": "US", # Strings are automatically quoted
"status": "active"
}
)
# Multiple data types
df = zclient.read(
"""SELECT * FROM hotels
WHERE hotel_id = @id
AND price > @min_price
AND is_available = @available
AND last_updated > @date""",
parameters={
"id": 123, # Numbers: no quotes
"min_price": 99.99, # Floats: no quotes
"available": True, # Booleans: TRUE/FALSE
"date": None # None: NULL
}
)
# Table identifiers (automatic backticks)
df = zclient.read(
"SELECT * FROM @table_name WHERE region = @region",
parameters={
"table_name": "project.dataset.hotels", # Becomes `project.dataset.hotels`
"region": "US" # Becomes 'US'
}
)
List Parameters for IN Clauses
Use lists and tuples for IN clauses and array operations:
# IN clause with list
df = zclient.read(
"SELECT * FROM hotels WHERE hotel_code IN (@codes)",
parameters={
"codes": ['ABC', 'DEF', 'GHI'] # Becomes ('ABC', 'DEF', 'GHI')
}
)
# Numeric lists
df = zclient.read(
"SELECT * FROM bookings WHERE booking_id IN (@ids)",
parameters={
"ids": [123, 456, 789] # Becomes (123, 456, 789)
}
)
# Array syntax (BigQuery)
df = zclient.read(
"SELECT * FROM table WHERE col = ANY([@values])",
parameters={
"values": ['a', 'b', 'c'] # Becomes ['a', 'b', 'c']
}
)
# Mixed data types in lists
df = zclient.read(
"SELECT * FROM table WHERE col IN (@mixed)",
parameters={
"mixed": ['text', 123, None, True] # Becomes ('text', 123, NULL, TRUE)
}
)
Conditional Parameters
Use @condition parameters for optional WHERE clauses:
# With condition
df = zclient.read(
"""SELECT * FROM hotels
WHERE region = @region
@condition""",
parameters={
"region": "US",
"condition": "AND hotel_rating >= 4" # Raw SQL inserted
}
)
# Result: WHERE region = 'US' AND hotel_rating >= 4
# Without condition (automatically removed)
df = zclient.read(
"""SELECT * FROM hotels
WHERE region = @region
@condition""",
parameters={
"region": "US"
# No condition parameter - @condition is removed entirely
}
)
# Result: WHERE region = 'US'
Dry Run Mode
Preview your final query with parameter substitution without executing:
# Preview query with dry_run
result = zclient.read(
"""SELECT * FROM `project.dataset.hotels`
WHERE region = @region
AND hotel_codes IN (@codes)
@condition""",
parameters={
"region": "US",
"codes": ['ABC', 'DEF'],
"condition": "AND rating >= 4"
},
dry_run=True # Prints query, returns None
)
# Output:
# DRY RUN - Query that would be executed:
# --------------------------------------------------
# SELECT * FROM `project.dataset.hotels`
# WHERE region = 'US'
# AND hotel_codes IN ('ABC', 'DEF')
# AND rating >= 4
# --------------------------------------------------
Storage Operations
Basic Upload/Download
from zbq import zstorage
# Simple upload with pattern - files go to bucket root
result = zstorage.upload(
local_dir="./data",
bucket_path="my-bucket",
include_patterns="*.xlsx" # Upload only Excel files
)
# Upload to specific folder in bucket
result = zstorage.upload(
local_dir="./data",
bucket_path="my-bucket/reports/2024", # Upload to reports/2024/ folder
include_patterns="*.xlsx"
)
print(f"Uploaded {result.uploaded_files}/{result.total_files} files")
print(f"Total size: {result.total_bytes:,} bytes in {result.duration:.2f}s")
# Context manager support for batch operations
with zstorage as storage:
# Upload multiple directories in sequence
result1 = storage.upload("./data1", "my-bucket/folder1", include_patterns="*.csv")
result2 = storage.upload("./data2", "my-bucket/folder2", include_patterns="*.json")
result3 = storage.download("my-bucket/archive", "./downloads", include_patterns="*.parquet")
# Storage client automatically cleaned up
# Simple download from bucket root
result = zstorage.download(
bucket_path="my-bucket",
local_dir="./downloads",
include_patterns="*.csv" # Download only CSV files
)
# Download from specific folder in bucket
result = zstorage.download(
bucket_path="my-bucket/data/exports", # Download from data/exports/ folder
local_dir="./downloads",
include_patterns="*.csv"
)
Advanced Pattern Matching
# Multiple include patterns
result = zstorage.upload(
local_dir="./reports",
bucket_path="my-bucket/reports",
include_patterns=["*.xlsx", "*.csv", "*.json"], # Multiple file types
exclude_patterns=["temp_*", "*_backup.*"], # Exclude temporary/backup files
case_sensitive=False # Case-insensitive matching
)
# Regex patterns for complex matching
result = zstorage.upload(
local_dir="./logs",
bucket_path="my-bucket/logs",
include_patterns=r"log_\d{4}-\d{2}-\d{2}\.txt", # Match log_YYYY-MM-DD.txt
use_regex=True
)
Parallel Processing & Progress Tracking
# Automatic progress bar (shows for multiple files)
result = zstorage.upload(
local_dir="./large-dataset",
bucket_path="my-bucket",
include_patterns="*.xlsx",
parallel=True, # Enable parallel uploads
max_retries=5 # Retry failed uploads
)
# Shows: "Uploading: 75%|███████▌ | 15/20 [00:30<00:10, 0.5files/s]"
# Custom progress callback (optional)
def progress_callback(completed, total):
percentage = (completed / total) * 100
print(f"Custom progress: {completed}/{total} files ({percentage:.1f}%)")
result = zstorage.upload(
local_dir="./large-dataset",
bucket_path="my-bucket",
progress_callback=progress_callback,
show_progress=False # Disable built-in progress bar
)
# Handle results
if result.failed_files > 0:
print(f"WARNING: {result.failed_files} files failed to upload:")
for error in result.errors:
print(f" - {error}")
print(f"Successfully uploaded {result.uploaded_files} files")
print(f"Total: {result.total_bytes:,} bytes in {result.duration:.2f}s")
Dry Run & Preview
# Preview what would be uploaded without actually uploading
result = zstorage.upload(
local_dir="./data",
bucket_path="my-bucket",
include_patterns="*.parquet",
dry_run=True # Preview only
)
print(f"Would upload {result.total_files} files ({result.total_bytes:,} bytes)")
# Progress bar control
result = zstorage.upload(
local_dir="./data",
bucket_path="my-bucket",
include_patterns="*.xlsx",
show_progress=True # Force show progress bar even for single files
)
result = zstorage.upload(
local_dir="./data",
bucket_path="my-bucket",
include_patterns="*.xlsx",
show_progress=False # Never show progress bar
)
Advanced Download with Filtering
# Download with path filtering and patterns
result = zstorage.download(
bucket_path="my-data-bucket/reports/2024", # Only files under this path
local_dir="./downloaded-reports",
include_patterns=["*.xlsx", "*.pdf"],
exclude_patterns="*_draft.*", # Skip draft files
parallel=True,
max_results=500 # Limit number of files to list
)
Advanced Configuration
Custom Logging
from zbq import setup_logging, StorageHandler, BigQueryHandler
# Configure logging
logger = setup_logging("DEBUG") # DEBUG, INFO, WARNING, ERROR
# Create handlers with custom settings
storage = StorageHandler(
project_id="my-project",
log_level="INFO",
max_workers=8 # More parallel workers
)
bq = BigQueryHandler(
project_id="my-project",
default_timeout=600, # 10 minute timeout
log_level="DEBUG"
)
Error Handling
from zbq import ZbqAuthenticationError, ZbqOperationError, ZbqConfigurationError
try:
result = zstorage.upload("./data", "my-bucket", include_patterns="*.csv")
except ZbqAuthenticationError:
print("Authentication failed. Run: gcloud auth application-default login")
except ZbqConfigurationError:
print("Configuration error. Check your project settings.")
except ZbqOperationError as e:
print(f"Operation failed: {e}")
Working with Results
from zbq import UploadResult, DownloadResult
# Upload with detailed result handling
result: UploadResult = zstorage.upload(
local_dir="./data",
bucket_name="my-bucket",
include_patterns=["*.json", "*.csv"]
)
# Detailed statistics
print(f"""
Upload Summary:
Total files: {result.total_files}
Uploaded: {result.uploaded_files}
Skipped: {result.skipped_files}
Failed: {result.failed_files}
Total size: {result.total_bytes:,} bytes
Duration: {result.duration:.2f} seconds
""")
# Handle errors
if result.errors:
print("Errors encountered:")
for error in result.errors[:5]: # Show first 5 errors
print(f" - {error}")
if len(result.errors) > 5:
print(f" ... and {len(result.errors) - 5} more errors")
Pattern Matching Guide
Glob Patterns (Default)
*.xlsx- All Excel filesdata_*.csv- CSV files starting with "data_"report_????_??.pdf- Reports with specific naming pattern**/*.json- All JSON files in subdirectories (recursive)[!.]*.txt- Text files not starting with dot
Regex Patterns
# Enable regex with use_regex=True
zstorage.upload(
local_dir="./logs",
bucket_name="my-bucket",
include_patterns=[
r"access_log_\d{4}-\d{2}-\d{2}\.log", # access_log_2024-01-01.log
r"error_log_\d{8}\.log" # error_log_20240101.log
],
use_regex=True
)
Complex Filtering
# Include multiple types, exclude temp files
zstorage.upload(
local_dir="./workspace",
bucket_name="my-bucket",
include_patterns=["*.py", "*.json", "*.md", "*.yml"],
exclude_patterns=["__pycache__/*", "*.pyc", "temp_*", ".git/*"],
case_sensitive=False
)
Authentication & Setup
-
Install Google Cloud SDK:
gcloud auth application-default login gcloud config set project YOUR_PROJECT_ID
-
Or set environment variables:
export GOOGLE_APPLICATION_CREDENTIALS="path/to/service-account.json" export GOOGLE_CLOUD_PROJECT="your-project-id"
Requirements
- Python ≥ 3.11
- Google Cloud project with BigQuery and/or Storage APIs enabled
- Appropriate IAM permissions for your operations
Performance Tips
- Use parallel processing for multiple files:
parallel=True - Adjust thread count based on your system:
max_workers=8 - Use dry-run to preview large operations first
- Filter early with specific patterns to avoid processing unwanted files
- Monitor progress with callback functions for long operations
Contributing
Issues and pull requests welcome at the project repository.
License
See LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zbq-0.0.1.tar.gz.
File metadata
- Download URL: zbq-0.0.1.tar.gz
- Upload date:
- Size: 34.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b79d42b504814497de9df5f1e75947f0262736914c8ea2466a370feb3ac698a8
|
|
| MD5 |
90094a718c667e2679101f0897a08150
|
|
| BLAKE2b-256 |
ff1ed0416021894ad28079eba6450f7eb40d0f3c81cadd3ce38bc0495e47ba7b
|
File details
Details for the file zbq-0.0.1-py3-none-any.whl.
File metadata
- Download URL: zbq-0.0.1-py3-none-any.whl
- Upload date:
- Size: 16.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2195e1d7234a45c4c18ca1d4f5e939ee1742042c4112357ff38fd137fb36dd23
|
|
| MD5 |
4603f818e6b6dc71993924bf6e8fe5ef
|
|
| BLAKE2b-256 |
fe9809bb42763cf031dcf2433af0d8eb98644f784de89e322bd9cffa09aa0cae
|