Skip to main content

Data Flow Tools - flexible ETL pipeline framework

Project description

DFT - Data Flow Tools

Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.

โœจ Key Features

  • ๐Ÿ”ง Component-Based: Modular sources, processors, and endpoints
  • ๐Ÿ”Œ Plugin System: Add custom components directly to your project
  • ๐Ÿ“‹ YAML Configuration: Simple, readable pipeline definitions
  • ๐Ÿ”— Dependency Management: Automatic pipeline ordering and validation
  • ๐Ÿ“Š Interactive Documentation: Web-based pipeline exploration with component library
  • ๐Ÿ’พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
  • ๐Ÿ”„ Incremental Processing: Smart data loading with state management
  • โš™๏ธ Data Validation: Built-in quality checks and constraints
  • ๐ŸŽฏ Analyst-Friendly: Rich CLI tools and component discovery

๐Ÿš€ Quick Start

1. Installation

# Clone repository
git clone <repository-url>
cd dft

# Install package with dependencies
pip install -e .

2. Create Project

# Initialize new project
dft init my_analytics_project
cd my_analytics_project

3. Explore Examples

# Try the example project
cd example_project

# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080

# Discover available components
dft components list

# Run a simple pipeline
dft run --select simple_csv_example

# Try the custom components example  
dft run --select custom_example_pipeline

๐Ÿ“ฆ Component Library

DFT provides a rich library of pre-built components:

๐Ÿ“ฅ Sources

  • CSV: Read data from CSV files with configurable delimiters and encoding
  • PostgreSQL: Extract data with SQL queries and named connections
  • MySQL: Database source with connection pooling
  • ClickHouse: High-performance analytics database source
  • Google Play: Specialized financial data extraction

โš™๏ธ Processors

  • Validator: Data quality checks with custom rules and constraints
  • MAD Anomaly Detector: Statistical anomaly detection for data monitoring

๐Ÿ“ค Endpoints

  • CSV: Write processed data to CSV files
  • PostgreSQL: Load data with append/replace/upsert modes
  • MySQL: Advanced upsert operations with conflict resolution
  • ClickHouse: Optimized bulk loading for analytics workloads
  • JSON: Export data in JSON format

๐Ÿ”ง Component Discovery

CLI Commands

# List all components
dft components list

# Filter by type
dft components list --type endpoint

# Get detailed information
dft components describe mysql

# View configuration examples
dft components describe validator --format yaml

Web Interface

Access the interactive component library at dft docs --serve:

  • Browse components by category
  • View configuration requirements
  • Copy-paste ready YAML examples
  • Interactive component details

๐Ÿ’พ Database Features

Named Connections

Define reusable database connections:

# dft_project.yml
connections:
  analytics_db:
    type: postgresql
    host: analytics.company.com
    database: warehouse
    user: analyst
    password: "${POSTGRES_PASSWORD}"
  
  main_mysql:
    type: mysql
    host: mysql.company.com
    database: production
    user: readonly
    password: "${MYSQL_PASSWORD}"

Upsert Operations

Intelligent insert-or-update operations:

# MySQL upsert example
- id: upsert_users
  type: endpoint
  endpoint_type: mysql
  connection: main_mysql
  config:
    table: users
    mode: upsert
    upsert_keys: [id]  # Conflict resolution on 'id' column
    auto_create: true
    schema:
      id: "INT PRIMARY KEY"
      name: "VARCHAR(100)"
      email: "VARCHAR(100)"
      updated_at: "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"

๐Ÿ“‹ Pipeline Configuration

Basic Pipeline

name: simple_etl
description: Extract, validate, and load user data

sources:
  - name: user_data
    type: csv
    config:
      file_path: "data/users.csv"

endpoints:
  - name: clean_users
    type: postgresql
    connection: analytics_db
    config:
      table: users_clean
      mode: replace

pipelines:
  - name: process_users
    source: user_data
    processors:
      - type: validator
        config:
          required_columns: [id, email, created_at]
          row_count_min: 1
    endpoints: [clean_users]

Advanced Pipeline with Dependencies

name: customer_analytics
tags: [analytics, daily]
depends_on: [data_ingestion]  # Run after data_ingestion pipeline

variables:
  analysis_date: "{{ yesterday() }}"
  min_transaction_amount: 10.00

pipelines:
  - name: customer_metrics
    source: transaction_data
    processors:
      - type: validator
        config:
          checks:
            - column: amount
              min_value: "{{ var('min_transaction_amount') }}"
              not_null: true
    endpoints: [metrics_warehouse]

๐Ÿ”„ Pipeline Execution

Basic Execution

# Run all pipelines
dft run

# Run specific pipeline
dft run --select customer_analytics

# Run by tags
dft run --select tag:daily

Dependency-Aware Execution

# Run pipeline and all dependencies
dft run --select +customer_analytics

# Run pipeline and all dependents
dft run --select customer_analytics+

# Run full dependency tree
dft run --select +customer_analytics+

Pipeline Variables

# Override pipeline variables
dft run --select customer_analytics --vars analysis_date=2024-01-15,min_amount=5.00

๐Ÿ“Š Documentation & Monitoring

Interactive Documentation

Generate comprehensive project documentation:

dft docs --serve

Features:

  • Pipeline Overview: Visual dependency graphs with filtering
  • Component Library: Interactive component browser with examples
  • Configuration Details: Collapsible component configurations
  • YAML Examples: Copy-paste ready configurations

Pipeline Validation

# Validate all pipeline configurations
dft validate

# Validate specific pipelines
dft validate --select customer_analytics

# Check dependencies
dft deps

๐ŸŽฏ For Data Analysts

DFT is designed with analysts in mind:

1. Discover Components

dft components list --type source
dft components describe postgresql

2. Build Pipelines

  • Copy YAML examples from documentation
  • Use named connections for database access
  • Apply data validation rules

3. Monitor & Debug

  • Interactive web documentation
  • Pipeline dependency visualization
  • Built-in configuration validation

4. Scale Operations

  • Incremental data processing
  • Dependency-aware execution
  • Environment-specific configurations

๐Ÿ” Advanced Features

Environment Configuration

# Development environment
export DFT_ENV=dev
dft run --select customer_analytics

# Production environment  
export DFT_ENV=prod
dft run --select customer_analytics

State Management

DFT automatically tracks pipeline execution state for incremental processing:

variables:
  # Start from last processed date or 7 days ago
  start_date: "{{ state.get('last_processed_date', days_ago(7)) }}"
  end_date: "{{ yesterday() }}"

Custom Components Plugin System

DFT supports adding custom components directly to your project, similar to dbt macros. When you run dft init, a complete plugin structure is created:

my_project/
โ”œโ”€โ”€ dft/                    # Custom components directory
โ”‚   โ”œโ”€โ”€ sources/           # Custom data sources
โ”‚   โ”œโ”€โ”€ processors/        # Custom data processors  
โ”‚   โ””โ”€โ”€ endpoints/         # Custom data endpoints
โ””โ”€โ”€ pipelines/
    โ””โ”€โ”€ custom_example_pipeline.yml  # Example using custom components

Creating Custom Components

1. Custom Source Example:

# dft/sources/api_source.py
from typing import Any, Dict, Optional
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket

class ApiSource(DataSource):
    """Custom API data source"""
    
    def extract(self, variables: Optional[Dict[str, Any]] = None) -> DataPacket:
        api_url = self.get_config('api_url')
        # Your API extraction logic
        return DataPacket(data=data, metadata={})
    
    def test_connection(self) -> bool:
        return True

2. Custom Processor Example:

# dft/processors/data_cleaner.py
from dft.core.base import DataProcessor

class DataCleaner(DataProcessor):
    """Custom data cleaning processor"""
    
    def process(self, packet, variables=None):
        # Your cleaning logic
        cleaned_data = self.clean_data(packet.data)
        return DataPacket(data=cleaned_data, metadata=packet.metadata)

3. Custom Endpoint Example:

# dft/endpoints/webhook.py
from dft.core.base import DataEndpoint

class WebhookEndpoint(DataEndpoint):
    """Custom webhook endpoint"""
    
    def load(self, packet, variables=None) -> bool:
        webhook_url = self.get_config('webhook_url')
        # Your webhook logic
        return True

Using Custom Components

Components are automatically discovered and can be used by their snake_case name:

# pipelines/my_pipeline.yml
steps:
  - id: fetch_data
    type: source
    source_type: api  # Uses ApiSource class
    config:
      api_url: "https://api.example.com/data"
  
  - id: clean_data  
    type: processor
    processor_type: data_cleaner  # Uses DataCleaner class
    depends_on: [fetch_data]
    
  - id: send_webhook
    type: endpoint  
    endpoint_type: webhook  # Uses WebhookEndpoint class
    depends_on: [clean_data]
    config:
      webhook_url: "https://hooks.slack.com/..."

Plugin Features

  • Auto-Discovery: Components are automatically loaded from dft/ directories
  • Snake Case Naming: MyCustomSource โ†’ my_custom in pipelines
  • No Registration: Just add .py files and they become available
  • Examples Included: Working examples created with every dft init
  • Flexible: Supports both pandas and plain Python data structures

๐Ÿ“ Project Structure

my_project/
โ”œโ”€โ”€ dft_project.yml          # Project configuration
โ”œโ”€โ”€ .env                     # Environment variables
โ”œโ”€โ”€ pipelines/               # Pipeline definitions
โ”‚   โ”œโ”€โ”€ ingestion.yml
โ”‚   โ”œโ”€โ”€ analytics.yml
โ”‚   โ”œโ”€โ”€ reporting.yml
โ”‚   โ””โ”€โ”€ custom_example_pipeline.yml  # Example with custom components
โ”œโ”€โ”€ dft/                     # Custom components (auto-created)
โ”‚   โ”œโ”€โ”€ sources/            # Custom data sources
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”‚   โ””โ”€โ”€ my_custom_source.py     # Example custom source
โ”‚   โ”œโ”€โ”€ processors/         # Custom data processors
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”‚   โ””โ”€โ”€ my_custom_processor.py  # Example custom processor
โ”‚   โ””โ”€โ”€ endpoints/          # Custom data endpoints
โ”‚       โ”œโ”€โ”€ __init__.py
โ”‚       โ””โ”€โ”€ my_custom_endpoint.py   # Example custom endpoint
โ”œโ”€โ”€ data/                    # Input data files
โ”œโ”€โ”€ output/                  # Generated outputs
โ””โ”€โ”€ .dft/                    # DFT metadata
    โ”œโ”€โ”€ docs/                # Generated documentation
    โ”œโ”€โ”€ state/               # Pipeline state files
    โ””โ”€โ”€ logs/                # Execution logs

๐Ÿค Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Submit a pull request

๐Ÿ“„ License

MIT License - see LICENSE file for details.


Get started with the example project: cd example_project && dft docs --serve

Try the plugin system: dft init my_project && cd my_project && dft run --select custom_example_pipeline

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dft_pipeline-0.1.0.tar.gz (58.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dft_pipeline-0.1.0-py3-none-any.whl (64.0 kB view details)

Uploaded Python 3

File details

Details for the file dft_pipeline-0.1.0.tar.gz.

File metadata

  • Download URL: dft_pipeline-0.1.0.tar.gz
  • Upload date:
  • Size: 58.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.1.0.tar.gz
Algorithm Hash digest
SHA256 725d24e910d97e849781f62d9b6f42367a3c94ffde69f5ea8264bf802ecb431c
MD5 9b2c1dca3a7100442989f16e92a531c6
BLAKE2b-256 90307b55dbba8acf3092934eb80fcd20f8acb492db4bc6f48a581b61d440d88f

See more details on using hashes here.

File details

Details for the file dft_pipeline-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dft_pipeline-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 64.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 13612fcd2cd8c6f8d2d99ed00d0fea03602748c6f7bfde0404ae9b0b294690c8
MD5 bad06adfff8e7d20420e968832bac2bb
BLAKE2b-256 81209e3b4df8f59965844640ec5171ba5307bcc090fe9d8174b2e52334da1151

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page