Skip to main content

Data Flow Tools - flexible ETL pipeline framework

Project description

DFT - Data Flow Tools

PyPI version Python 3.9+

Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.

โœจ Key Features

  • ๐Ÿ”ง Component-Based: Modular sources, processors, and endpoints
  • ๐Ÿ”Œ Plugin System: Add custom components directly to your project
  • ๐Ÿ“‹ YAML Configuration: Simple, readable pipeline definitions
  • ๐Ÿ”— Dependency Management: Automatic pipeline ordering and validation
  • ๐Ÿ“Š Interactive Documentation: Web-based pipeline exploration
  • ๐Ÿ’พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
  • ๐Ÿ”„ Incremental Processing: Smart data loading with state management
  • โฑ๏ธ Microbatch Processing: Time-based data windows with lookback support
  • โš™๏ธ Data Validation: Built-in quality checks and constraints
  • ๐ŸŽฏ Analyst-Friendly: Rich CLI tools and component discovery

๐Ÿ“š Documentation

๐Ÿš€ Quick Start

1. Installation

Option A: Install from PyPI (Recommended)

# Install directly from PyPI
pip install dft-pipeline

Option B: Install from Source (For Development)

# Clone repository
git clone <repository-url>
cd dft

# Install package with dependencies
pip install -e .

2. Create Project

# Initialize new project
dft init my_analytics_project
cd my_analytics_project

3. Explore Examples

# Initialize a new project with examples
dft init my_analytics_project
cd my_analytics_project

# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080

# Discover available components
dft components list

# Run a simple pipeline (uses sample data)
dft run --select simple_csv_example

# Try the custom components example  
dft run --select custom_example_pipeline

๐Ÿ“ฆ Built-in Components

DFT includes pre-built components for common data operations:

  • Sources: CSV, PostgreSQL, MySQL, ClickHouse, Google Play, JSON
  • Processors: Data validation, anomaly detection
  • Endpoints: CSV, PostgreSQL, MySQL, ClickHouse, JSON output
# Discover available components
dft components list

# Get component details and examples
dft components describe postgresql --format yaml

# Interactive component browser
dft docs --serve

๐Ÿ“‹ Pipeline Configuration

Basic Pipeline

pipeline_name: simple_etl
description: Extract, validate, and load user data

connections:
  analytics_db:
    type: postgresql
    host: analytics.company.com
    database: warehouse
    user: analyst
    password: "${POSTGRES_PASSWORD}"

steps:
  - id: load_user_data
    type: source
    source_type: csv
    config:
      file_path: "data/users.csv"

  - id: validate_users
    type: processor
    processor_type: validator
    depends_on: [load_user_data]
    config:
      required_columns: [id, email, created_at]

  - id: save_clean_users
    type: endpoint
    endpoint_type: postgresql
    connection: analytics_db
    depends_on: [validate_users]
    config:
      table: users_clean
      mode: replace

Advanced Features

  • Pipeline Dependencies: depends_on: [other_pipeline]
  • Variables: {{ var("date") }} and {{ env_var("API_KEY") }}
  • Named Connections: Reusable database configurations
  • Tags: Organize pipelines with tags: [daily, analytics]

๐Ÿ”„ Pipeline Execution

# Run all pipelines
dft run

# Run specific pipeline
dft run --select customer_analytics

# Run by tags
dft run --select tag:daily

# Run with dependencies (dbt-style)
dft run --select +customer_analytics  # Include upstream dependencies
dft run --select customer_analytics+  # Include downstream dependencies

# Override variables
dft run --select analytics --vars date=2024-01-15,min_amount=5.00

๐Ÿ“Š Documentation & Monitoring

# Interactive web documentation
dft docs --serve

# Validate pipeline configurations
dft validate

# Check pipeline dependencies
dft deps

# List available components
dft components list

๐Ÿ” Advanced Features

  • Environment Configuration: export DFT_ENV=prod
  • State Management: Automatic incremental processing with {{ state.get() }}
  • Custom Components: Plugin system for extending functionality

Custom Components

Add custom sources, processors, and endpoints in your project's dft/ directory:

# dft/sources/api_source.py
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket

class ApiSource(DataSource):
    def extract(self, variables=None) -> DataPacket:
        # Your API extraction logic
        return DataPacket(data=data, metadata={})
    
    def test_connection(self) -> bool:
        return True
# Use in pipelines with snake_case naming
steps:
  - id: fetch_data
    type: source
    source_type: api  # Uses ApiSource class
    config:
      api_url: "https://api.example.com/data"

See Custom Components Guide for detailed examples.

๐Ÿ“ Project Structure

my_project/
โ”œโ”€โ”€ dft_project.yml          # Project configuration
โ”œโ”€โ”€ .env                     # Environment variables
โ”œโ”€โ”€ pipelines/               # Pipeline definitions
โ”‚   โ”œโ”€โ”€ ingestion.yml
โ”‚   โ”œโ”€โ”€ analytics.yml
โ”‚   โ””โ”€โ”€ reporting.yml
โ”œโ”€โ”€ dft/                     # Custom components (auto-created)
โ”‚   โ”œโ”€โ”€ sources/            # Custom data sources
โ”‚   โ”œโ”€โ”€ processors/         # Custom data processors
โ”‚   โ””โ”€โ”€ endpoints/          # Custom data endpoints
โ”œโ”€โ”€ data/                    # Input data files
โ”œโ”€โ”€ output/                  # Generated outputs
โ””โ”€โ”€ .dft/                    # DFT metadata and logs

๐Ÿš€ Get Started

# Install DFT
pip install dft-pipeline

# Create new project
dft init my_project && cd my_project

# Explore components and documentation
dft docs --serve

# Run example pipeline
dft run --select custom_example_pipeline

๐Ÿ“„ License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dft_pipeline-0.3.9.tar.gz (64.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dft_pipeline-0.3.9-py3-none-any.whl (85.2 kB view details)

Uploaded Python 3

File details

Details for the file dft_pipeline-0.3.9.tar.gz.

File metadata

  • Download URL: dft_pipeline-0.3.9.tar.gz
  • Upload date:
  • Size: 64.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.9.tar.gz
Algorithm Hash digest
SHA256 08a3f3d2bb6061765e28bcb1a38995e4a953b1e784fd2b0d575284d9e4e0d5b7
MD5 3748431b916b7bfbbc7aa566aede8f73
BLAKE2b-256 135b75284800ba8df12fcb2b2bd2b07e7cecb4e41fe19a5cef0e48940e3411b2

See more details on using hashes here.

File details

Details for the file dft_pipeline-0.3.9-py3-none-any.whl.

File metadata

  • Download URL: dft_pipeline-0.3.9-py3-none-any.whl
  • Upload date:
  • Size: 85.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.9-py3-none-any.whl
Algorithm Hash digest
SHA256 17ff97f3cd57ef7dd87b076d38319bb6c1ae8d4dd215e454486746f2e1492c92
MD5 bf68f7408da4d7a791a108c2f5a6ee04
BLAKE2b-256 b793e356c2e41c46791d92bb5088485c1902b428620cc349eca881a526b0358e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page