Data Flow Tools - flexible ETL pipeline framework
Project description
DFT - Data Flow Tools
Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.
โจ Key Features
- ๐ง Component-Based: Modular sources, processors, and endpoints
- ๐ Plugin System: Add custom components directly to your project
- ๐ YAML Configuration: Simple, readable pipeline definitions
- ๐ Dependency Management: Automatic pipeline ordering and validation
- ๐ Interactive Documentation: Web-based pipeline exploration
- ๐พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
- ๐ Incremental Processing: Smart data loading with state management
- โฑ๏ธ Microbatch Processing: Time-based data windows with lookback support
- โ๏ธ Data Validation: Built-in quality checks and constraints
- ๐ฏ Analyst-Friendly: Rich CLI tools and component discovery
๐ Documentation
- Custom Components Guide - Develop custom sources, processors, and endpoints
- Database Integration Guide - Database connections, upsert operations, and incremental processing
- Pipeline Dependencies Guide - Inter-pipeline dependencies and execution order
- Microbatch Processing Guide - Time-based data windows, lookback strategies, and ETL optimization
- A/B Testing Guide - Statistical hypothesis testing for experiments with T-test, Z-test, CUPED, and Bootstrap methods
๐ Quick Start
1. Installation
Option A: Install from PyPI (Recommended)
# Install directly from PyPI
pip install dft-pipeline
Option B: Install from Source (For Development)
# Clone repository
git clone <repository-url>
cd dft
# Install package with dependencies
pip install -e .
2. Create Project
# Initialize new project
dft init my_analytics_project
cd my_analytics_project
3. Explore Examples
# Initialize a new project with examples
dft init my_analytics_project
cd my_analytics_project
# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080
# Discover available components
dft components list
# Run a simple pipeline (uses sample data)
dft run --select simple_csv_example
# Try the custom components example
dft run --select custom_example_pipeline
๐ฆ Built-in Components
DFT includes pre-built components for common data operations:
- Sources: CSV, PostgreSQL, MySQL, ClickHouse, Google Play, JSON
- Processors: Data validation, anomaly detection
- Endpoints: CSV, PostgreSQL, MySQL, ClickHouse, JSON output
# Discover available components
dft components list
# Get component details and examples
dft components describe postgresql --format yaml
# Interactive component browser
dft docs --serve
๐ Pipeline Configuration
Basic Pipeline
pipeline_name: simple_etl
description: Extract, validate, and load user data
connections:
analytics_db:
type: postgresql
host: analytics.company.com
database: warehouse
user: analyst
password: "${POSTGRES_PASSWORD}"
steps:
- id: load_user_data
type: source
source_type: csv
config:
file_path: "data/users.csv"
- id: validate_users
type: processor
processor_type: validator
depends_on: [load_user_data]
config:
required_columns: [id, email, created_at]
- id: save_clean_users
type: endpoint
endpoint_type: postgresql
connection: analytics_db
depends_on: [validate_users]
config:
table: users_clean
mode: replace
Advanced Features
- Pipeline Dependencies:
depends_on: [other_pipeline] - Variables:
{{ var("date") }}and{{ env_var("API_KEY") }} - Named Connections: Reusable database configurations
- Tags: Organize pipelines with
tags: [daily, analytics]
๐ Pipeline Execution
# Run all pipelines
dft run
# Run specific pipeline
dft run --select customer_analytics
# Run by tags
dft run --select tag:daily
# Run with dependencies (dbt-style)
dft run --select +customer_analytics # Include upstream dependencies
dft run --select customer_analytics+ # Include downstream dependencies
# Override variables
dft run --select analytics --vars date=2024-01-15,min_amount=5.00
๐ Documentation & Monitoring
# Interactive web documentation
dft docs --serve
# Validate pipeline configurations
dft validate
# Check pipeline dependencies
dft deps
# List available components
dft components list
๐ Advanced Features
- Environment Configuration:
export DFT_ENV=prod - State Management: Automatic incremental processing with
{{ state.get() }} - Custom Components: Plugin system for extending functionality
Custom Components
Add custom sources, processors, and endpoints in your project's dft/ directory:
# dft/sources/api_source.py
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket
class ApiSource(DataSource):
def extract(self, variables=None) -> DataPacket:
# Your API extraction logic
return DataPacket(data=data, metadata={})
def test_connection(self) -> bool:
return True
# Use in pipelines with snake_case naming
steps:
- id: fetch_data
type: source
source_type: api # Uses ApiSource class
config:
api_url: "https://api.example.com/data"
See Custom Components Guide for detailed examples.
๐ Project Structure
my_project/
โโโ dft_project.yml # Project configuration
โโโ .env # Environment variables
โโโ pipelines/ # Pipeline definitions
โ โโโ ingestion.yml
โ โโโ analytics.yml
โ โโโ reporting.yml
โโโ dft/ # Custom components (auto-created)
โ โโโ sources/ # Custom data sources
โ โโโ processors/ # Custom data processors
โ โโโ endpoints/ # Custom data endpoints
โโโ data/ # Input data files
โโโ output/ # Generated outputs
โโโ .dft/ # DFT metadata and logs
๐ Get Started
# Install DFT
pip install dft-pipeline
# Create new project
dft init my_project && cd my_project
# Explore components and documentation
dft docs --serve
# Run example pipeline
dft run --select custom_example_pipeline
๐ License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dft_pipeline-0.3.16.tar.gz.
File metadata
- Download URL: dft_pipeline-0.3.16.tar.gz
- Upload date:
- Size: 64.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b62827982375c27c8caddc0e85888b75fd2296b7387388146aab3f81b9d7f2b
|
|
| MD5 |
fb2a7c6a84eb0a63808f7c83849f14f8
|
|
| BLAKE2b-256 |
bb4c6e4f1a76e680dc1787ea48a903e74e16e4a98836f86368e8b42716046e67
|
File details
Details for the file dft_pipeline-0.3.16-py3-none-any.whl.
File metadata
- Download URL: dft_pipeline-0.3.16-py3-none-any.whl
- Upload date:
- Size: 85.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f6af654559690adb665ef3aa56a4ce96d5aac309ea266962ae8c69c0e0aaa68
|
|
| MD5 |
0a991bd0b86864048feec888db26a0ac
|
|
| BLAKE2b-256 |
ea50a519ff0091a8d0e77531faf5c96237dd74bfe27cea5c0fa9b66d4af9af9a
|