A powerful CLI-based ETL (Extract, Transform, Load) pipeline automation tool for data engineering workflows
Project description
PipeX v2.0
A powerful, enterprise-grade CLI-based ETL pipeline automation tool for modern data engineering workflows.
PipeX simplifies complex data pipeline tasks with multi-cloud support, intelligent error handling, and industry-specific transformations. Built for scalability, reliability, and ease of use.
Key Features
Comprehensive ETL Operations
- Extract from APIs, databases (SQL/NoSQL), and files (CSV, JSON, Excel, Parquet, XML)
- Transform with custom Python scripts, default templates, and industry-specific functions
- Load to multi-cloud storage, databases, and local files with automatic organization
Multi-Cloud Storage Support
- AWS S3 - Full S3 API compatibility with IAM roles
- Google Cloud Storage - Service account and project-based authentication
- Azure Blob Storage - Connection string and account key authentication
- DigitalOcean Spaces - S3-compatible API with regional deployment
Intelligent Error Handling
- User-friendly error messages with clear problem descriptions
- Actionable solutions for every error scenario
- Context-aware guidance based on error category and environment
- Technical details for debugging without overwhelming users
Advanced Transformations
- Multiple script execution in sequence with fail-safe options
- Default transformation library with data cleaning, feature engineering, and validation
- Industry-specific templates for Finance, Retail, Healthcare, and Manufacturing
- Configuration-based transformations for common operations
Multi-Format File Support
- Excel files with sheet selection, range options, and formula support
- Parquet format for high-performance columnar storage and analytics
- XML parsing with XPath support for structured data extraction
- Enhanced CSV/JSON with encoding, delimiter, and orientation options
Quick Start
Installation
Basic Installation
pip install pipex
With Cloud Provider Support
pip install pipex[aws] # AWS S3 support
pip install pipex[gcp] # Google Cloud Storage
pip install pipex[azure] # Azure Blob Storage
pip install pipex[all] # All cloud providers + file formats
Development Installation
git clone https://github.com/yourusername/pipex.git
cd pipex
pip install -e .[all]
Basic Usage
- Create configuration file (
config.yaml):
extract:
source: "api"
connection_details:
headers:
Authorization: "Bearer ${API_TOKEN}"
query_or_endpoint: "${API_ENDPOINT}"
transform:
scripts:
- "app/default_transforms.py"
config:
use_default_transforms: true
default_config:
clean_data: true
feature_engineering: true
load:
target: "Cloud Storage"
config:
provider: "aws"
bucket_name: "${AWS_BUCKET_NAME}"
file_name: "processed_data.csv"
- Set environment variables (
.envfile):
API_TOKEN=your-api-token
API_ENDPOINT=https://api.example.com/data
AWS_BUCKET_NAME=your-bucket-name
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
- Run the pipeline:
pipex run --config config.yaml
Commands
Pipeline Operations
# Run complete ETL pipeline
pipex run --config config.yaml
# Validate configuration before execution
pipex validate config.yaml
# Dry run (validate without executing)
pipex run --config config.yaml --dry-run
# Verbose logging for debugging
pipex run --config config.yaml --verbose
Individual Stage Operations
# Extract data only
pipex extract api config.yaml --output extracted_data.csv
# Transform data with custom scripts
pipex transform scripts/clean.py config.yaml data.csv --output clean_data.csv
# Load data to target
pipex load "Cloud Storage" config.yaml processed_data.csv
System Information
# Display system status and configuration
pipex info
# Get help for any command
pipex --help
pipex run --help
Configuration Examples
Multi-Cloud Storage
AWS S3
load:
target: "Cloud Storage"
config:
provider: "aws"
bucket_name: "${AWS_BUCKET_NAME}"
file_name: "data.parquet"
format: "parquet"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
region_name: "${AWS_REGION}"
Google Cloud Storage
load:
target: "Cloud Storage"
config:
provider: "gcp"
bucket_name: "${GCP_BUCKET_NAME}"
file_name: "data.json"
format: "json"
project_id: "${GOOGLE_CLOUD_PROJECT}"
credentials_path: "${GOOGLE_APPLICATION_CREDENTIALS}"
Azure Blob Storage
load:
target: "Cloud Storage"
config:
provider: "azure"
bucket_name: "${AZURE_CONTAINER_NAME}"
file_name: "data.csv"
format: "csv"
connection_string: "${AZURE_STORAGE_CONNECTION_STRING}"
Advanced File Processing
Excel to Parquet Pipeline
extract:
source: "file"
connection_details:
file_type: "excel"
sheet_name: "Sheet1"
skiprows: 2
nrows: 10000
engine: "openpyxl"
query_or_endpoint: "data/input.xlsx"
load:
target: "Local File"
config:
file_type: "parquet"
file_path: "output/processed_data.parquet"
compression: "snappy"
API to Multi-Format
extract:
source: "api"
connection_details:
headers:
X-API-Key: "${API_KEY}"
timeout: 60
query_or_endpoint: "${API_ENDPOINT}"
load:
target: "Local File"
config:
file_type: "excel"
file_path: "output/api_data.xlsx"
sheet_name: "APIData"
add_timestamp: true
Advanced Transformations
Multiple Scripts with Industry Templates
transform:
scripts:
- "app/default_transforms.py"
- "transforms/industry_specific.py"
- "transforms/custom_business_rules.py"
config:
use_default_transforms: true
fail_on_script_error: false
default_config:
clean_data: true
feature_engineering: true
add_metadata: true
script_config:
industry: "finance"
large_transaction_threshold: 10000
Configuration-Based Transformations
transform:
config:
drop_columns: ["temp_id", "debug_info"]
rename_columns:
customer_name: "client_name"
order_date: "purchase_date"
filter_rows: "amount > 0 & status == 'active'"
add_columns:
total_value: "price * quantity"
processed_date: "pd.Timestamp.now()"
is_high_value: "total_value > 1000"
Data Sources & Targets
Supported Data Sources
| Source Type | Formats | Authentication | Features |
|---|---|---|---|
| APIs | JSON, XML | Bearer Token, API Key, Basic Auth | Retry logic, caching, timeout |
| Databases | SQL, NoSQL | Connection strings, credentials | MySQL, PostgreSQL, MongoDB |
| Files | CSV, JSON, Excel, Parquet, XML | File system access | Encoding, delimiters, sheets |
Supported Targets
| Target Type | Providers | Formats | Features |
|---|---|---|---|
| Cloud Storage | AWS S3, GCP, Azure, DigitalOcean | CSV, JSON, Parquet | Multi-region, encryption |
| Databases | MySQL, PostgreSQL, MongoDB | Native formats | Batch loading, upserts |
| Local Files | File system | CSV, JSON, Excel, Parquet | Directory creation, timestamps |
Industry-Specific Templates
Financial Services
# Automatic risk scoring, compliance checks, transaction analysis
transform:
scripts: ["transforms/industry_specific.py"]
config:
script_config:
industry: "finance"
large_transaction_threshold: 10000
compliance_checks: true
Features:
- Transaction risk scoring
- Money laundering detection
- Regulatory compliance flags
- Fiscal year calculations
- Business day analysis
Retail & E-commerce
# Customer segmentation, lifetime value, seasonal analysis
transform:
scripts: ["transforms/industry_specific.py"]
config:
script_config:
industry: "retail"
customer_segmentation: true
Features:
- Customer lifetime value (CLV) estimation
- RFM analysis (Recency, Frequency, Monetary)
- Seasonal trend detection
- Product category analysis
- Customer tier classification
Healthcare
# Patient demographics, risk stratification, medical coding
transform:
scripts: ["transforms/industry_specific.py"]
config:
script_config:
industry: "healthcare"
risk_stratification: true
Features:
- Age group classification
- Risk score calculation
- ICD-10 code processing
- Length of stay analysis
- Chronic condition flagging
Manufacturing
# Quality metrics, equipment efficiency, cost analysis
transform:
scripts: ["transforms/industry_specific.py"]
config:
script_config:
industry: "manufacturing"
efficiency_threshold: 0.85
Features:
- Equipment efficiency tracking
- Quality grade classification
- Shift analysis
- Defect rate calculation
- Maintenance scheduling
Error Handling & Troubleshooting
Intelligent Error Messages
PipeX provides context-aware error messages with actionable solutions:
Configuration Error: Environment variable placeholders are not resolved
📋 Context:
• config_file: config.yaml
• missing_variables: API_TOKEN, AWS_BUCKET_NAME
Suggested Solutions:
1. Create a .env file in your project root
2. Set the required environment variables (check .env.example)
3. Ensure environment variable names match the placeholders in config
4. Use format ${VARIABLE_NAME} for placeholders in config file
Technical Details: Unresolved placeholders: ${API_TOKEN}, ${AWS_BUCKET_NAME}
Common Issues & Solutions
Authentication Errors
# Check credentials
pipex info
# Set AWS credentials
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
# Validate configuration
pipex validate config.yaml
File Format Issues
# Test with verbose logging
pipex run --config config.yaml --verbose
# Check file encoding
pipex extract file config.yaml --output test.csv
Network Issues
# Test API connectivity
curl -H "Authorization: Bearer $API_TOKEN" $API_ENDPOINT
# Check timeout settings in config
pipex run --config config.yaml --dry-run
Environment Variables
Multi-Cloud Credentials
# AWS
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
AWS_REGION=us-east-1
# Google Cloud
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
GOOGLE_CLOUD_PROJECT=your-project-id
# Azure
AZURE_STORAGE_CONNECTION_STRING=your-connection-string
# OR
AZURE_STORAGE_ACCOUNT_NAME=your-account
AZURE_STORAGE_ACCOUNT_KEY=your-key
# DigitalOcean Spaces
DO_SPACES_ACCESS_KEY_ID=your-key
DO_SPACES_SECRET_ACCESS_KEY=your-secret
DO_SPACES_REGION=nyc3
API & Database Credentials
# API Authentication
API_TOKEN=your-bearer-token
API_KEY=your-api-key
API_ENDPOINT=https://api.example.com/data
# Database Connections
DB_HOST=localhost
DB_USER=username
DB_PASSWORD=password
DB_NAME=database_name
# MongoDB
MONGO_HOST=localhost
MONGO_USER=username
MONGO_PASSWORD=password
MONGO_DATABASE=database_name
Performance & Scalability
Optimization Features
- Chunked processing for large datasets
- Memory-efficient transformations with pandas
- Parallel script execution for complex pipelines
- Connection pooling for database operations
- Streaming uploads for cloud storage
- Compression support for all file formats
Benchmarks
| Dataset Size | Processing Time | Memory Usage | Throughput |
|---|---|---|---|
| 10K records | 2-5 seconds | 15-25 MB | 2K-5K records/sec |
| 100K records | 15-30 seconds | 50-100 MB | 3K-7K records/sec |
| 1M records | 2-5 minutes | 200-500 MB | 3K-8K records/sec |
Scaling Recommendations
- Use Parquet format for large datasets (10x faster than CSV)
- Enable chunked processing for files > 1GB
- Use cloud storage for distributed processing
- Implement data partitioning for very large datasets
Integration Examples
CI/CD Pipeline
# GitHub Actions
name: Data Pipeline
on:
schedule:
- cron: "0 2 * * *" # Daily at 2 AM
jobs:
etl:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: "3.11"
- name: Install PipeX
run: pip install pipex[all]
- name: Validate Configuration
run: pipex validate config.yaml
- name: Run ETL Pipeline
run: pipex run --config config.yaml
env:
API_TOKEN: ${{ secrets.API_TOKEN }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
Docker Deployment
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
RUN pip install -e .[all]
CMD ["pipex", "run", "--config", "config.yaml"]
Kubernetes CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: pipex-etl
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: pipex
image: your-registry/pipex:latest
command: ["pipex", "run", "--config", "config.yaml"]
env:
- name: API_TOKEN
valueFrom:
secretKeyRef:
name: pipex-secrets
key: api-token
restartPolicy: OnFailure
🧪 Testing & Development
Running Tests
# Install development dependencies
pip install -e .[dev]
# Run all tests
pytest
# Run with coverage
pytest --cov=app --cov-report=html
# Run specific test categories
pytest tests/test_extract.py -v
pytest tests/test_transform.py -v
pytest tests/test_load.py -v
Code Quality
# Format code
black app/ tests/
# Sort imports
isort app/ tests/
# Type checking
mypy app/
# Linting
flake8 app/ tests/
Development Setup
# Clone repository
git clone https://github.com/yourusername/pipex.git
cd pipex
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install in development mode
pip install -e .[all,dev]
# Run tests
pytest
📚 Documentation & Examples
Configuration Templates
examples/multi_cloud_config.yaml- Multi-cloud storage examplesexamples/transforms/- Custom transformation scripts.env.example- Environment variable template
Transformation Scripts
app/default_transforms.py- Default transformation libraryexamples/transforms/industry_specific.py- Industry templates
Additional Resources
- Usage Examples - Comprehensive usage guide
- New Features - v2.0 feature overview
- API Documentation - Detailed API reference
- Troubleshooting Guide - Common issues and solutions
Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Process
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes with tests
- Run the test suite (
pytest) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Contribution Areas
- New data sources (databases, APIs, file formats)
- Additional cloud providers (IBM Cloud, Oracle Cloud)
- Industry-specific transformations
- Testing and quality assurance
- Documentation and examples
- Bug fixes and performance improvements
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support & Community
Getting Help
- Email: agniveshkumar15@gmail.com
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Community
- 🌟 Star the project if you find it useful
- 🐦 Follow updates on Twitter @PipeXETL
- 📢 Share your use cases in GitHub Discussions
- 🤝 Contribute to make PipeX even better
Acknowledgments
PipeX is built with and inspired by amazing open-source projects:
- Typer - Modern CLI framework
- Pandas - Powerful data manipulation library
- Boto3 - AWS SDK for Python
- SQLAlchemy - Database toolkit
- Requests - HTTP library
Made with ❤️ for the data engineering community
⭐ Star on GitHub • 📚 Documentation • 🐛 Report Bug • 💡 Request Feature
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pipex-2.0.0.tar.gz.
File metadata
- Download URL: pipex-2.0.0.tar.gz
- Upload date:
- Size: 44.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e4954d2ee8057c965d7d32f5168c156977b79569a2622a36aacc63798b0736f
|
|
| MD5 |
6ee8c821943c7ca54c3a8f0c3b1284e3
|
|
| BLAKE2b-256 |
f635dc9dfc57fc784b2f62065cf5c788d59c364bc11b35ee407bd4fb8b5acb99
|
Provenance
The following attestation bundles were made for pipex-2.0.0.tar.gz:
Publisher:
release.yml on ImCYMBIOT/PipeX
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pipex-2.0.0.tar.gz -
Subject digest:
8e4954d2ee8057c965d7d32f5168c156977b79569a2622a36aacc63798b0736f - Sigstore transparency entry: 799304753
- Sigstore integration time:
-
Permalink:
ImCYMBIOT/PipeX@7e3a331f2c512de4291ceb1955fdf934ad53e24c -
Branch / Tag:
refs/tags/v2.0.0 - Owner: https://github.com/ImCYMBIOT
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7e3a331f2c512de4291ceb1955fdf934ad53e24c -
Trigger Event:
release
-
Statement type:
File details
Details for the file pipex-2.0.0-py3-none-any.whl.
File metadata
- Download URL: pipex-2.0.0-py3-none-any.whl
- Upload date:
- Size: 44.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf417ad24801b9b3e436d8c2f4e9aab2c458e64801541d8159515944ee0c6d89
|
|
| MD5 |
45b57750679b4b82f5bbd932cd3d8689
|
|
| BLAKE2b-256 |
a038f12f472eea3fc54e80a32fb49b5e88719b222ff4135f25f04a02a0881a27
|
Provenance
The following attestation bundles were made for pipex-2.0.0-py3-none-any.whl:
Publisher:
release.yml on ImCYMBIOT/PipeX
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pipex-2.0.0-py3-none-any.whl -
Subject digest:
cf417ad24801b9b3e436d8c2f4e9aab2c458e64801541d8159515944ee0c6d89 - Sigstore transparency entry: 799304765
- Sigstore integration time:
-
Permalink:
ImCYMBIOT/PipeX@7e3a331f2c512de4291ceb1955fdf934ad53e24c -
Branch / Tag:
refs/tags/v2.0.0 - Owner: https://github.com/ImCYMBIOT
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7e3a331f2c512de4291ceb1955fdf934ad53e24c -
Trigger Event:
release
-
Statement type: