Skip to main content

Data Flow Tools - flexible ETL pipeline framework

Project description

DFT - Data Flow Tools

PyPI version Python 3.9+

Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.

โœจ Key Features

  • ๐Ÿ”ง Component-Based: Modular sources, processors, and endpoints
  • ๐Ÿ”Œ Plugin System: Add custom components directly to your project
  • ๐Ÿ“‹ YAML Configuration: Simple, readable pipeline definitions
  • ๐Ÿ”— Dependency Management: Automatic pipeline ordering and validation
  • ๐Ÿ“Š Interactive Documentation: Web-based pipeline exploration
  • ๐Ÿ’พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
  • ๐Ÿ”„ Incremental Processing: Smart data loading with state management
  • โฑ๏ธ Microbatch Processing: Time-based data windows with lookback support
  • โš™๏ธ Data Validation: Built-in quality checks and constraints
  • ๐ŸŽฏ Analyst-Friendly: Rich CLI tools and component discovery

๐Ÿ“š Documentation

๐Ÿš€ Quick Start

1. Installation

Option A: Install from PyPI (Recommended)

# Install directly from PyPI
pip install dft-pipeline

Option B: Install from Source (For Development)

# Clone repository
git clone <repository-url>
cd dft

# Install package with dependencies
pip install -e .

2. Create Project

# Initialize new project
dft init my_analytics_project
cd my_analytics_project

3. Explore Examples

# Initialize a new project with examples
dft init my_analytics_project
cd my_analytics_project

# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080

# Discover available components
dft components list

# Run a simple pipeline (uses sample data)
dft run --select simple_csv_example

# Try the custom components example  
dft run --select custom_example_pipeline

๐Ÿ“ฆ Built-in Components

DFT includes pre-built components for common data operations:

  • Sources: CSV, PostgreSQL, MySQL, ClickHouse, Google Play, JSON
  • Processors: Data validation, anomaly detection
  • Endpoints: CSV, PostgreSQL, MySQL, ClickHouse, JSON output
# Discover available components
dft components list

# Get component details and examples
dft components describe postgresql --format yaml

# Interactive component browser
dft docs --serve

๐Ÿ“‹ Pipeline Configuration

Basic Pipeline

pipeline_name: simple_etl
description: Extract, validate, and load user data

connections:
  analytics_db:
    type: postgresql
    host: analytics.company.com
    database: warehouse
    user: analyst
    password: "${POSTGRES_PASSWORD}"

steps:
  - id: load_user_data
    type: source
    source_type: csv
    config:
      file_path: "data/users.csv"

  - id: validate_users
    type: processor
    processor_type: validator
    depends_on: [load_user_data]
    config:
      required_columns: [id, email, created_at]

  - id: save_clean_users
    type: endpoint
    endpoint_type: postgresql
    connection: analytics_db
    depends_on: [validate_users]
    config:
      table: users_clean
      mode: replace

Advanced Features

  • Pipeline Dependencies: depends_on: [other_pipeline]
  • Variables: {{ var("date") }} and {{ env_var("API_KEY") }}
  • Named Connections: Reusable database configurations
  • Tags: Organize pipelines with tags: [daily, analytics]

๐Ÿ”„ Pipeline Execution

# Run all pipelines
dft run

# Run specific pipeline
dft run --select customer_analytics

# Run by tags
dft run --select tag:daily

# Run with dependencies (dbt-style)
dft run --select +customer_analytics  # Include upstream dependencies
dft run --select customer_analytics+  # Include downstream dependencies

# Override variables
dft run --select analytics --vars date=2024-01-15,min_amount=5.00

๐Ÿ“Š Documentation & Monitoring

# Interactive web documentation
dft docs --serve

# Validate pipeline configurations
dft validate

# Check pipeline dependencies
dft deps

# List available components
dft components list

๐Ÿ” Advanced Features

  • Environment Configuration: export DFT_ENV=prod
  • State Management: Automatic incremental processing with {{ state.get() }}
  • Custom Components: Plugin system for extending functionality

Custom Components

Add custom sources, processors, and endpoints in your project's dft/ directory:

# dft/sources/api_source.py
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket

class ApiSource(DataSource):
    def extract(self, variables=None) -> DataPacket:
        # Your API extraction logic
        return DataPacket(data=data, metadata={})
    
    def test_connection(self) -> bool:
        return True
# Use in pipelines with snake_case naming
steps:
  - id: fetch_data
    type: source
    source_type: api  # Uses ApiSource class
    config:
      api_url: "https://api.example.com/data"

See Custom Components Guide for detailed examples.

๐Ÿ“ Project Structure

my_project/
โ”œโ”€โ”€ dft_project.yml          # Project configuration
โ”œโ”€โ”€ .env                     # Environment variables
โ”œโ”€โ”€ pipelines/               # Pipeline definitions
โ”‚   โ”œโ”€โ”€ ingestion.yml
โ”‚   โ”œโ”€โ”€ analytics.yml
โ”‚   โ””โ”€โ”€ reporting.yml
โ”œโ”€โ”€ dft/                     # Custom components (auto-created)
โ”‚   โ”œโ”€โ”€ sources/            # Custom data sources
โ”‚   โ”œโ”€โ”€ processors/         # Custom data processors
โ”‚   โ””โ”€โ”€ endpoints/          # Custom data endpoints
โ”œโ”€โ”€ data/                    # Input data files
โ”œโ”€โ”€ output/                  # Generated outputs
โ””โ”€โ”€ .dft/                    # DFT metadata and logs

๐Ÿš€ Get Started

# Install DFT
pip install dft-pipeline

# Create new project
dft init my_project && cd my_project

# Explore components and documentation
dft docs --serve

# Run example pipeline
dft run --select custom_example_pipeline

๐Ÿ“„ License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dft_pipeline-0.3.16.tar.gz (64.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dft_pipeline-0.3.16-py3-none-any.whl (85.5 kB view details)

Uploaded Python 3

File details

Details for the file dft_pipeline-0.3.16.tar.gz.

File metadata

  • Download URL: dft_pipeline-0.3.16.tar.gz
  • Upload date:
  • Size: 64.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.16.tar.gz
Algorithm Hash digest
SHA256 0b62827982375c27c8caddc0e85888b75fd2296b7387388146aab3f81b9d7f2b
MD5 fb2a7c6a84eb0a63808f7c83849f14f8
BLAKE2b-256 bb4c6e4f1a76e680dc1787ea48a903e74e16e4a98836f86368e8b42716046e67

See more details on using hashes here.

File details

Details for the file dft_pipeline-0.3.16-py3-none-any.whl.

File metadata

  • Download URL: dft_pipeline-0.3.16-py3-none-any.whl
  • Upload date:
  • Size: 85.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.16-py3-none-any.whl
Algorithm Hash digest
SHA256 5f6af654559690adb665ef3aa56a4ce96d5aac309ea266962ae8c69c0e0aaa68
MD5 0a991bd0b86864048feec888db26a0ac
BLAKE2b-256 ea50a519ff0091a8d0e77531faf5c96237dd74bfe27cea5c0fa9b66d4af9af9a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page