A Python utility library for Redshift data operations within SageMaker, enabling seamless UNLOAD and COPY operations between Redshift and S3
Project description
sagemaker-redshift
A Python utility library for Redshift data operations within SageMaker, enabling seamless UNLOAD and COPY operations between Redshift and S3.
Features
- UNLOAD data from Redshift to S3 - Export query results to various file formats from within SageMaker
- COPY data to Redshift from DataFrame - Fast data loading using S3 as intermediary
- COPY data from S3 to Redshift - Direct loading of existing S3 files
- SageMaker optimized - Designed for use within SageMaker notebooks and processing jobs
- Supports multiple file formats: CSV, JSON, Parquet
- Built-in retry logic and error handling
- Progress tracking with configurable verbosity
Installation
pip install sagemaker-redshift
Requirements
- boto3
- botocore
- polars
- sagemaker
Usage
Important: SageMaker Integration
This library is designed to work seamlessly within Amazon SageMaker environments. All functions require explicit credential parameters for security. Previously hardcoded credentials have been removed.
Required parameters:
db: Redshift database namecluster_id: Redshift cluster identifierdb_user: Database usernamerole: IAM role ARN with appropriate permissions
1. UNLOAD data from Redshift to S3
from redshift_utils import unload_redshift
# Basic usage within SageMaker
unload_redshift(
query="SELECT * FROM sales.transactions WHERE date >= '2024-01-01'",
destination="s3://my-bucket/exports/sales/",
db="prod",
cluster_id="my-redshift-cluster",
db_user="myuser",
role="arn:aws:iam::123456789012:role/RedshiftS3Role",
file_format="parquet",
partition_by="date",
gzip=True
)
# CSV export with custom delimiter
unload_redshift(
query="SELECT customer_id, total FROM sales.summary",
destination="s3://my-bucket/exports/summary.csv",
db="analytics",
cluster_id="analytics-cluster",
db_user="analyst",
role="arn:aws:iam::123456789012:role/RedshiftS3Role",
file_format="csv",
delimiter="|",
header=True
)
2. COPY DataFrame to Redshift
import polars as pl
from redshift_utils import copy_to_redshift
# Create a sample DataFrame in SageMaker
df = pl.DataFrame({
"product_id": [1, 2, 3, 4, 5],
"product_name": ["Widget A", "Widget B", "Gadget C", "Gadget D", "Tool E"],
"price": [19.99, 29.99, 39.99, 49.99, 59.99],
"quantity": [100, 150, 75, 200, 50]
})
# Copy to Redshift from SageMaker
copy_to_redshift(
df=df,
table_name="products",
schema="inventory",
s3_bucket="my-temp-bucket",
db="warehouse",
cluster_id="warehouse-cluster",
db_user="etl_user",
role="arn:aws:iam::123456789012:role/RedshiftS3Role",
if_exists="truncate" # Options: "append", "truncate", "replace"
)
3. COPY from S3 to Redshift
from redshift_utils import copy_s3_to_redshift
# Copy existing S3 file to Redshift from SageMaker
copy_s3_to_redshift(
s3_uri="s3://my-data-bucket/raw/customers_2024.parquet",
table_name="customers",
schema="staging",
db="analytics",
cluster_id="analytics-cluster",
db_user="loader",
role="arn:aws:iam::123456789012:role/RedshiftS3Role",
file_format="parquet",
if_exists="append"
)
Function Parameters
Common Parameters
db(str): Redshift database namecluster_id(str): Redshift cluster identifierdb_user(str): Database usernamerole(str): IAM role ARN with appropriate permissions (can use SageMaker execution role)verbose(int): Output verbosity (0=silent, 1=minimal, 2=detailed)max_wait_minutes(int): Maximum time to wait for operation completion
unload_redshift
query(str): SQL query to executedestination(str): S3 URI for output filesfile_format(str): Output format - "csv", "json", or "parquet"header(bool): Include column headers (CSV only)delimiter(str): Field delimiter (CSV only)allow_overwrite(bool): Overwrite existing S3 filesparallel(bool): Enable parallel unloadpartition_by(str): Column name for partitioning outputgzip(bool): Compress output files
copy_to_redshift
df(pl.DataFrame): Polars DataFrame to uploadtable_name(str): Target table nameschema(str): Target schema names3_bucket(str): S3 bucket for temporary storages3_prefix(str): S3 key prefix for temporary filesif_exists(str): Action if table exists - "append", "truncate", or "replace"cleanup_s3(bool): Delete temporary S3 file after load
copy_s3_to_redshift
s3_uri(str): Full S3 URI of source filetable_name(str): Target table nameschema(str): Target schema namefile_format(str): Source file format - "csv", "json", or "parquet"if_exists(str): Action if table exists - "append", "truncate", or "replace"
SageMaker Integration
Using SageMaker Execution Role
import sagemaker
# Get the SageMaker execution role
role = sagemaker.get_execution_role()
# Use it in your operations
unload_redshift(
query="SELECT * FROM my_table",
destination="s3://my-bucket/data/",
role=role, # SageMaker execution role
# ... other parameters
)
Within SageMaker Processing Jobs
This library works seamlessly within SageMaker Processing jobs for large-scale data operations.
IAM Role Requirements
The IAM role specified in role must have:
- Read/write access to the S3 buckets used
- Permission to assume the role from Redshift
- Appropriate Redshift permissions
Example IAM policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::my-bucket/*",
"arn:aws:s3:::my-bucket"
]
},
{
"Effect": "Allow",
"Action": [
"redshift-data:*"
],
"Resource": "*"
}
]
}
Error Handling
All functions include comprehensive error handling:
- Validation of required parameters
- Timeout handling with configurable wait times
- Detailed error messages for troubleshooting
- Automatic retry logic for transient failures
Best Practices
- Use appropriate file formats: Parquet for large datasets, CSV for compatibility
- Enable compression: Use
gzip=Truefor UNLOAD to reduce S3 storage costs - Partition large exports: Use
partition_byto split large datasets - Clean up temporary files: Keep
cleanup_s3=Truefor copy operations - Set reasonable timeouts: Adjust
max_wait_minutesbased on data volume - Use SageMaker execution role: Leverage
sagemaker.get_execution_role()for permissions
Testing
Run the test suite:
pytest test_redshift_utils.py -v
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sagemaker-redshift-0.2.0.tar.gz.
File metadata
- Download URL: sagemaker-redshift-0.2.0.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.8.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
011e441da09030ede5b75708634b3d04f534a51613f0e6d5a51fa799de1f5269
|
|
| MD5 |
563d6c00eddc370cbc93389bf3245077
|
|
| BLAKE2b-256 |
a86288df0eb9ac671872430c3080301e1fd46918386bdfa19208ab3543581da1
|
File details
Details for the file sagemaker_redshift-0.2.0-py3-none-any.whl.
File metadata
- Download URL: sagemaker_redshift-0.2.0-py3-none-any.whl
- Upload date:
- Size: 9.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.8.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
72ea4e910787f29399f30e8db05ae6db6f9a48a0863bdb883d6d0bfc091041ec
|
|
| MD5 |
276a5573e165f7cbf627a779bb21b01e
|
|
| BLAKE2b-256 |
207ff17d4fa6e6d7219b6f61d2aa90eebb90786961b8ecb193a5d23b696a29d2
|