Data Flow Tools - flexible ETL pipeline framework
Project description
DFT - Data Flow Tools
Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.
โจ Key Features
- ๐ง Component-Based: Modular sources, processors, and endpoints
- ๐ Plugin System: Add custom components directly to your project
- ๐ YAML Configuration: Simple, readable pipeline definitions
- ๐ Dependency Management: Automatic pipeline ordering and validation
- ๐ Interactive Documentation: Web-based pipeline exploration with component library
- ๐พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
- ๐ Incremental Processing: Smart data loading with state management
- โ๏ธ Data Validation: Built-in quality checks and constraints
- ๐ฏ Analyst-Friendly: Rich CLI tools and component discovery
๐ Quick Start
1. Installation
# Clone repository
git clone <repository-url>
cd dft
# Install package with dependencies
pip install -e .
2. Create Project
# Initialize new project
dft init my_analytics_project
cd my_analytics_project
3. Explore Examples
# Try the example project
cd example_project
# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080
# Discover available components
dft components list
# Run a simple pipeline
dft run --select simple_csv_example
# Try the custom components example
dft run --select custom_example_pipeline
๐ฆ Component Library
DFT provides a rich library of pre-built components:
๐ฅ Sources
- CSV: Read data from CSV files with configurable delimiters and encoding
- PostgreSQL: Extract data with SQL queries and named connections
- MySQL: Database source with connection pooling
- ClickHouse: High-performance analytics database source
- Google Play: Specialized financial data extraction
โ๏ธ Processors
- Validator: Data quality checks with custom rules and constraints
- MAD Anomaly Detector: Statistical anomaly detection for data monitoring
๐ค Endpoints
- CSV: Write processed data to CSV files
- PostgreSQL: Load data with append/replace/upsert modes
- MySQL: Advanced upsert operations with conflict resolution
- ClickHouse: Optimized bulk loading for analytics workloads
- JSON: Export data in JSON format
๐ง Component Discovery
CLI Commands
# List all components
dft components list
# Filter by type
dft components list --type endpoint
# Get detailed information
dft components describe mysql
# View configuration examples
dft components describe validator --format yaml
Web Interface
Access the interactive component library at dft docs --serve:
- Browse components by category
- View configuration requirements
- Copy-paste ready YAML examples
- Interactive component details
๐พ Database Features
Named Connections
Define reusable database connections:
# dft_project.yml
connections:
analytics_db:
type: postgresql
host: analytics.company.com
database: warehouse
user: analyst
password: "${POSTGRES_PASSWORD}"
main_mysql:
type: mysql
host: mysql.company.com
database: production
user: readonly
password: "${MYSQL_PASSWORD}"
Upsert Operations
Intelligent insert-or-update operations:
# MySQL upsert example
- id: upsert_users
type: endpoint
endpoint_type: mysql
connection: main_mysql
config:
table: users
mode: upsert
upsert_keys: [id] # Conflict resolution on 'id' column
auto_create: true
schema:
id: "INT PRIMARY KEY"
name: "VARCHAR(100)"
email: "VARCHAR(100)"
updated_at: "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
๐ Pipeline Configuration
Basic Pipeline
name: simple_etl
description: Extract, validate, and load user data
sources:
- name: user_data
type: csv
config:
file_path: "data/users.csv"
endpoints:
- name: clean_users
type: postgresql
connection: analytics_db
config:
table: users_clean
mode: replace
pipelines:
- name: process_users
source: user_data
processors:
- type: validator
config:
required_columns: [id, email, created_at]
row_count_min: 1
endpoints: [clean_users]
Advanced Pipeline with Dependencies
name: customer_analytics
tags: [analytics, daily]
depends_on: [data_ingestion] # Run after data_ingestion pipeline
variables:
analysis_date: "{{ yesterday() }}"
min_transaction_amount: 10.00
pipelines:
- name: customer_metrics
source: transaction_data
processors:
- type: validator
config:
checks:
- column: amount
min_value: "{{ var('min_transaction_amount') }}"
not_null: true
endpoints: [metrics_warehouse]
๐ Pipeline Execution
Basic Execution
# Run all pipelines
dft run
# Run specific pipeline
dft run --select customer_analytics
# Run by tags
dft run --select tag:daily
Dependency-Aware Execution
# Run pipeline and all dependencies
dft run --select +customer_analytics
# Run pipeline and all dependents
dft run --select customer_analytics+
# Run full dependency tree
dft run --select +customer_analytics+
Pipeline Variables
# Override pipeline variables
dft run --select customer_analytics --vars analysis_date=2024-01-15,min_amount=5.00
๐ Documentation & Monitoring
Interactive Documentation
Generate comprehensive project documentation:
dft docs --serve
Features:
- Pipeline Overview: Visual dependency graphs with filtering
- Component Library: Interactive component browser with examples
- Configuration Details: Collapsible component configurations
- YAML Examples: Copy-paste ready configurations
Pipeline Validation
# Validate all pipeline configurations
dft validate
# Validate specific pipelines
dft validate --select customer_analytics
# Check dependencies
dft deps
๐ฏ For Data Analysts
DFT is designed with analysts in mind:
1. Discover Components
dft components list --type source
dft components describe postgresql
2. Build Pipelines
- Copy YAML examples from documentation
- Use named connections for database access
- Apply data validation rules
3. Monitor & Debug
- Interactive web documentation
- Pipeline dependency visualization
- Built-in configuration validation
4. Scale Operations
- Incremental data processing
- Dependency-aware execution
- Environment-specific configurations
๐ Advanced Features
Environment Configuration
# Development environment
export DFT_ENV=dev
dft run --select customer_analytics
# Production environment
export DFT_ENV=prod
dft run --select customer_analytics
State Management
DFT automatically tracks pipeline execution state for incremental processing:
variables:
# Start from last processed date or 7 days ago
start_date: "{{ state.get('last_processed_date', days_ago(7)) }}"
end_date: "{{ yesterday() }}"
Custom Components Plugin System
DFT supports adding custom components directly to your project, similar to dbt macros. When you run dft init, a complete plugin structure is created:
my_project/
โโโ dft/ # Custom components directory
โ โโโ sources/ # Custom data sources
โ โโโ processors/ # Custom data processors
โ โโโ endpoints/ # Custom data endpoints
โโโ pipelines/
โโโ custom_example_pipeline.yml # Example using custom components
Creating Custom Components
1. Custom Source Example:
# dft/sources/api_source.py
from typing import Any, Dict, Optional
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket
class ApiSource(DataSource):
"""Custom API data source"""
def extract(self, variables: Optional[Dict[str, Any]] = None) -> DataPacket:
api_url = self.get_config('api_url')
# Your API extraction logic
return DataPacket(data=data, metadata={})
def test_connection(self) -> bool:
return True
2. Custom Processor Example:
# dft/processors/data_cleaner.py
from dft.core.base import DataProcessor
class DataCleaner(DataProcessor):
"""Custom data cleaning processor"""
def process(self, packet, variables=None):
# Your cleaning logic
cleaned_data = self.clean_data(packet.data)
return DataPacket(data=cleaned_data, metadata=packet.metadata)
3. Custom Endpoint Example:
# dft/endpoints/webhook.py
from dft.core.base import DataEndpoint
class WebhookEndpoint(DataEndpoint):
"""Custom webhook endpoint"""
def load(self, packet, variables=None) -> bool:
webhook_url = self.get_config('webhook_url')
# Your webhook logic
return True
Using Custom Components
Components are automatically discovered and can be used by their snake_case name:
# pipelines/my_pipeline.yml
steps:
- id: fetch_data
type: source
source_type: api # Uses ApiSource class
config:
api_url: "https://api.example.com/data"
- id: clean_data
type: processor
processor_type: data_cleaner # Uses DataCleaner class
depends_on: [fetch_data]
- id: send_webhook
type: endpoint
endpoint_type: webhook # Uses WebhookEndpoint class
depends_on: [clean_data]
config:
webhook_url: "https://hooks.slack.com/..."
Plugin Features
- Auto-Discovery: Components are automatically loaded from
dft/directories - Snake Case Naming:
MyCustomSourceโmy_customin pipelines - No Registration: Just add
.pyfiles and they become available - Examples Included: Working examples created with every
dft init - Flexible: Supports both pandas and plain Python data structures
๐ Project Structure
my_project/
โโโ dft_project.yml # Project configuration
โโโ .env # Environment variables
โโโ pipelines/ # Pipeline definitions
โ โโโ ingestion.yml
โ โโโ analytics.yml
โ โโโ reporting.yml
โ โโโ custom_example_pipeline.yml # Example with custom components
โโโ dft/ # Custom components (auto-created)
โ โโโ sources/ # Custom data sources
โ โ โโโ __init__.py
โ โ โโโ my_custom_source.py # Example custom source
โ โโโ processors/ # Custom data processors
โ โ โโโ __init__.py
โ โ โโโ my_custom_processor.py # Example custom processor
โ โโโ endpoints/ # Custom data endpoints
โ โโโ __init__.py
โ โโโ my_custom_endpoint.py # Example custom endpoint
โโโ data/ # Input data files
โโโ output/ # Generated outputs
โโโ .dft/ # DFT metadata
โโโ docs/ # Generated documentation
โโโ state/ # Pipeline state files
โโโ logs/ # Execution logs
๐ค Contributing
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Submit a pull request
๐ License
MIT License - see LICENSE file for details.
Get started with the example project: cd example_project && dft docs --serve
Try the plugin system: dft init my_project && cd my_project && dft run --select custom_example_pipeline
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dft_pipeline-0.1.0.tar.gz.
File metadata
- Download URL: dft_pipeline-0.1.0.tar.gz
- Upload date:
- Size: 58.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
725d24e910d97e849781f62d9b6f42367a3c94ffde69f5ea8264bf802ecb431c
|
|
| MD5 |
9b2c1dca3a7100442989f16e92a531c6
|
|
| BLAKE2b-256 |
90307b55dbba8acf3092934eb80fcd20f8acb492db4bc6f48a581b61d440d88f
|
File details
Details for the file dft_pipeline-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dft_pipeline-0.1.0-py3-none-any.whl
- Upload date:
- Size: 64.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13612fcd2cd8c6f8d2d99ed00d0fea03602748c6f7bfde0404ae9b0b294690c8
|
|
| MD5 |
bad06adfff8e7d20420e968832bac2bb
|
|
| BLAKE2b-256 |
81209e3b4df8f59965844640ec5171ba5307bcc090fe9d8174b2e52334da1151
|