Skip to main content

Data Flow Tools - flexible ETL pipeline framework

Project description

DFT - Data Flow Tools

PyPI version Python 3.9+

Flexible ETL pipeline framework designed for data analysts and engineers. Build, orchestrate, and monitor data pipelines with YAML configurations.

โœจ Key Features

  • ๐Ÿ”ง Component-Based: Modular sources, processors, and endpoints
  • ๐Ÿ”Œ Plugin System: Add custom components directly to your project
  • ๐Ÿ“‹ YAML Configuration: Simple, readable pipeline definitions
  • ๐Ÿ”— Dependency Management: Automatic pipeline ordering and validation
  • ๐Ÿ“Š Interactive Documentation: Web-based pipeline exploration
  • ๐Ÿ’พ Database Support: PostgreSQL, MySQL, ClickHouse with upsert capabilities
  • ๐Ÿ”„ Incremental Processing: Smart data loading with state management
  • โฑ๏ธ Microbatch Processing: Time-based data windows with lookback support
  • โš™๏ธ Data Validation: Built-in quality checks and constraints
  • ๐ŸŽฏ Analyst-Friendly: Rich CLI tools and component discovery

๐Ÿ“š Documentation

๐Ÿš€ Quick Start

1. Installation

Option A: Install from PyPI (Recommended)

# Install directly from PyPI
pip install dft-pipeline

Option B: Install from Source (For Development)

# Clone repository
git clone <repository-url>
cd dft

# Install package with dependencies
pip install -e .

2. Create Project

# Initialize new project
dft init my_analytics_project
cd my_analytics_project

3. Explore Examples

# Initialize a new project with examples
dft init my_analytics_project
cd my_analytics_project

# View interactive documentation
dft docs --serve
# Opens at http://localhost:8080

# Discover available components
dft components list

# Run a simple pipeline (uses sample data)
dft run --select simple_csv_example

# Try the custom components example  
dft run --select custom_example_pipeline

๐Ÿ“ฆ Built-in Components

DFT includes pre-built components for common data operations:

  • Sources: CSV, PostgreSQL, MySQL, ClickHouse, Google Play, JSON
  • Processors: Data validation, anomaly detection
  • Endpoints: CSV, PostgreSQL, MySQL, ClickHouse, JSON output
# Discover available components
dft components list

# Get component details and examples
dft components describe postgresql --format yaml

# Interactive component browser
dft docs --serve

๐Ÿ“‹ Pipeline Configuration

Basic Pipeline

pipeline_name: simple_etl
description: Extract, validate, and load user data

connections:
  analytics_db:
    type: postgresql
    host: analytics.company.com
    database: warehouse
    user: analyst
    password: "${POSTGRES_PASSWORD}"

steps:
  - id: load_user_data
    type: source
    source_type: csv
    config:
      file_path: "data/users.csv"

  - id: validate_users
    type: processor
    processor_type: validator
    depends_on: [load_user_data]
    config:
      required_columns: [id, email, created_at]

  - id: save_clean_users
    type: endpoint
    endpoint_type: postgresql
    connection: analytics_db
    depends_on: [validate_users]
    config:
      table: users_clean
      mode: replace

Advanced Features

  • Pipeline Dependencies: depends_on: [other_pipeline]
  • Variables: {{ var("date") }} and {{ env_var("API_KEY") }}
  • Named Connections: Reusable database configurations
  • Tags: Organize pipelines with tags: [daily, analytics]

๐Ÿ”„ Pipeline Execution

# Run all pipelines
dft run

# Run specific pipeline
dft run --select customer_analytics

# Run by tags
dft run --select tag:daily

# Run with dependencies (dbt-style)
dft run --select +customer_analytics  # Include upstream dependencies
dft run --select customer_analytics+  # Include downstream dependencies

# Override variables
dft run --select analytics --vars date=2024-01-15,min_amount=5.00

๐Ÿ“Š Documentation & Monitoring

# Interactive web documentation
dft docs --serve

# Validate pipeline configurations
dft validate

# Check pipeline dependencies
dft deps

# List available components
dft components list

๐Ÿ” Advanced Features

  • Environment Configuration: export DFT_ENV=prod
  • State Management: Automatic incremental processing with {{ state.get() }}
  • Custom Components: Plugin system for extending functionality

Custom Components

Add custom sources, processors, and endpoints in your project's dft/ directory:

# dft/sources/api_source.py
from dft.core.base import DataSource
from dft.core.data_packet import DataPacket

class ApiSource(DataSource):
    def extract(self, variables=None) -> DataPacket:
        # Your API extraction logic
        return DataPacket(data=data, metadata={})
    
    def test_connection(self) -> bool:
        return True
# Use in pipelines with snake_case naming
steps:
  - id: fetch_data
    type: source
    source_type: api  # Uses ApiSource class
    config:
      api_url: "https://api.example.com/data"

See Custom Components Guide for detailed examples.

๐Ÿ“ Project Structure

my_project/
โ”œโ”€โ”€ dft_project.yml          # Project configuration
โ”œโ”€โ”€ .env                     # Environment variables
โ”œโ”€โ”€ pipelines/               # Pipeline definitions
โ”‚   โ”œโ”€โ”€ ingestion.yml
โ”‚   โ”œโ”€โ”€ analytics.yml
โ”‚   โ””โ”€โ”€ reporting.yml
โ”œโ”€โ”€ dft/                     # Custom components (auto-created)
โ”‚   โ”œโ”€โ”€ sources/            # Custom data sources
โ”‚   โ”œโ”€โ”€ processors/         # Custom data processors
โ”‚   โ””โ”€โ”€ endpoints/          # Custom data endpoints
โ”œโ”€โ”€ data/                    # Input data files
โ”œโ”€โ”€ output/                  # Generated outputs
โ””โ”€โ”€ .dft/                    # DFT metadata and logs

๐Ÿš€ Get Started

# Install DFT
pip install dft-pipeline

# Create new project
dft init my_project && cd my_project

# Explore components and documentation
dft docs --serve

# Run example pipeline
dft run --select custom_example_pipeline

๐Ÿ“„ License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dft_pipeline-0.3.1.tar.gz (72.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dft_pipeline-0.3.1-py3-none-any.whl (83.7 kB view details)

Uploaded Python 3

File details

Details for the file dft_pipeline-0.3.1.tar.gz.

File metadata

  • Download URL: dft_pipeline-0.3.1.tar.gz
  • Upload date:
  • Size: 72.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.1.tar.gz
Algorithm Hash digest
SHA256 f20e5f9e45a317a39457e6d89b4beeaaabf63cd2cb7f484625eece6a8bb045b5
MD5 670e11020a31a9c6c6825ad77850ed9f
BLAKE2b-256 24d2d2d89c0460ab6b4b58c83845848185f4208dd4da4ffa18c0d62f60b9eaff

See more details on using hashes here.

File details

Details for the file dft_pipeline-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: dft_pipeline-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 83.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for dft_pipeline-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 316b9de6b437fb2276403438686939de548ab0845f7b5410af6423dfbc7b5c9f
MD5 b90057ae3005f4ec2ddbf8d428dab2ad
BLAKE2b-256 f8dfe2d3c455c6f47f3539f1a85f331ea3f3f40a66920d67af0f6abfbf48c8fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page