Skip to main content

Data Flow Tools - flexible ETL pipeline framework

Project description

DFT - Data Flow Tools

PyPI version Python 3.9+

Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.

โœจ Key Features

  • ๐Ÿ”ง Component-Based: Modular sources, processors, and endpoints
  • ๐Ÿ”Œ Plugin System: Add custom components directly to your project
  • ๐Ÿ“‹ YAML Configuration: Simple, readable pipeline definitions
  • ๐Ÿ”— Dependency Management: Automatic pipeline ordering and validation
  • ๐Ÿ“Š Interactive Documentation: Web-based pipeline exploration
  • ๐Ÿ’พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
  • ๐Ÿ”„ Incremental Processing: Smart data loading with state management
  • โฑ๏ธ Microbatch Processing: Time-based data windows with lookback support
  • โš™๏ธ Data Validation: Built-in quality checks and constraints
  • ๐ŸŽฏ Analyst-Friendly: Rich CLI tools and component discovery

๐Ÿ“š Documentation

๐Ÿš€ Quick Start

1. Installation

Option A: Install from PyPI (Recommended)

# Install directly from PyPI
pip install dft-pipeline

Option B: Install from Source (For Development)

# Clone repository
git clone <repository-url>
cd dft

# Install package with dependencies
pip install -e .

2. Create Project

# Initialize new project
dft init my_analytics_project
cd my_analytics_project

3. Explore Examples

# Initialize a new project with examples
dft init my_analytics_project
cd my_analytics_project

# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080

# Discover available components
dft components list

# Run a simple pipeline (uses sample data)
dft run --select simple_csv_example

# Try the custom components example  
dft run --select custom_example_pipeline

๐Ÿ“ฆ Built-in Components

DFT includes pre-built components for common data operations:

  • Sources: CSV, PostgreSQL, MySQL, ClickHouse, Google Play, JSON
  • Processors: Data validation, anomaly detection
  • Endpoints: CSV, PostgreSQL, MySQL, ClickHouse, JSON output
# Discover available components
dft components list

# Get component details and examples
dft components describe postgresql --format yaml

# Interactive component browser
dft docs --serve

๐Ÿ“‹ Pipeline Configuration

Basic Pipeline

pipeline_name: simple_etl
description: Extract, validate, and load user data

connections:
  analytics_db:
    type: postgresql
    host: analytics.company.com
    database: warehouse
    user: analyst
    password: "${POSTGRES_PASSWORD}"

steps:
  - id: load_user_data
    type: source
    source_type: csv
    config:
      file_path: "data/users.csv"

  - id: validate_users
    type: processor
    processor_type: validator
    depends_on: [load_user_data]
    config:
      required_columns: [id, email, created_at]

  - id: save_clean_users
    type: endpoint
    endpoint_type: postgresql
    connection: analytics_db
    depends_on: [validate_users]
    config:
      table: users_clean
      mode: replace

Advanced Features

  • Pipeline Dependencies: depends_on: [other_pipeline]
  • Variables: {{ var("date") }} and {{ env_var("API_KEY") }}
  • Named Connections: Reusable database configurations
  • Tags: Organize pipelines with tags: [daily, analytics]

๐Ÿ”„ Pipeline Execution

# Run all pipelines
dft run

# Run specific pipeline
dft run --select customer_analytics

# Run by tags
dft run --select tag:daily

# Run with dependencies (dbt-style)
dft run --select +customer_analytics  # Include upstream dependencies
dft run --select customer_analytics+  # Include downstream dependencies

# Override variables
dft run --select analytics --vars date=2024-01-15,min_amount=5.00

๐Ÿ“Š Documentation & Monitoring

# Interactive web documentation
dft docs --serve

# Validate pipeline configurations
dft validate

# Check pipeline dependencies
dft deps

# List available components
dft components list

๐Ÿ” Advanced Features

  • Environment Configuration: export DFT_ENV=prod
  • State Management: Automatic incremental processing with {{ state.get() }}
  • Custom Components: Plugin system for extending functionality

Custom Components

Add custom sources, processors, and endpoints in your project's dft/ directory:

# dft/sources/api_source.py
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket

class ApiSource(DataSource):
    def extract(self, variables=None) -> DataPacket:
        # Your API extraction logic
        return DataPacket(data=data, metadata={})
    
    def test_connection(self) -> bool:
        return True
# Use in pipelines with snake_case naming
steps:
  - id: fetch_data
    type: source
    source_type: api  # Uses ApiSource class
    config:
      api_url: "https://api.example.com/data"

See Custom Components Guide for detailed examples.

๐Ÿ“ Project Structure

my_project/
โ”œโ”€โ”€ dft_project.yml          # Project configuration
โ”œโ”€โ”€ .env                     # Environment variables
โ”œโ”€โ”€ pipelines/               # Pipeline definitions
โ”‚   โ”œโ”€โ”€ ingestion.yml
โ”‚   โ”œโ”€โ”€ analytics.yml
โ”‚   โ””โ”€โ”€ reporting.yml
โ”œโ”€โ”€ dft/                     # Custom components (auto-created)
โ”‚   โ”œโ”€โ”€ sources/            # Custom data sources
โ”‚   โ”œโ”€โ”€ processors/         # Custom data processors
โ”‚   โ””โ”€โ”€ endpoints/          # Custom data endpoints
โ”œโ”€โ”€ data/                    # Input data files
โ”œโ”€โ”€ output/                  # Generated outputs
โ””โ”€โ”€ .dft/                    # DFT metadata and logs

๐Ÿš€ Get Started

# Install DFT
pip install dft-pipeline

# Create new project
dft init my_project && cd my_project

# Explore components and documentation
dft docs --serve

# Run example pipeline
dft run --select custom_example_pipeline

๐Ÿ“„ License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dft_pipeline-0.3.23.tar.gz (64.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dft_pipeline-0.3.23-py3-none-any.whl (86.0 kB view details)

Uploaded Python 3

File details

Details for the file dft_pipeline-0.3.23.tar.gz.

File metadata

  • Download URL: dft_pipeline-0.3.23.tar.gz
  • Upload date:
  • Size: 64.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.23.tar.gz
Algorithm Hash digest
SHA256 9f93b54d5693f7339dc961909f6ac964f4cde9cc2de94a4be830013fa97d85a4
MD5 861cfa9383fb9d65c0b056c32f9ba35d
BLAKE2b-256 1a2255f27066f2815d6bba49ffb427c7d465b781a24f0436f2939d2ab1ee6e14

See more details on using hashes here.

File details

Details for the file dft_pipeline-0.3.23-py3-none-any.whl.

File metadata

  • Download URL: dft_pipeline-0.3.23-py3-none-any.whl
  • Upload date:
  • Size: 86.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.23-py3-none-any.whl
Algorithm Hash digest
SHA256 ef9fccc774ab527fd02a66c435677b3baec19baea75366758bf127c98a6f1f72
MD5 0dc4d4b5c0ed47cc536b9ae924a10f00
BLAKE2b-256 3fba33eb59da62a3f2afdf66dde29ac7d52d0afabbaba503828e790551f58c59

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page