Skip to main content

A data processing and analysis pipeline designed to handle various jobs related to data transformation, quality assessment, deduplication, and formatting. The pipeline can be configured and executed using YAML configuration files.

Project description

๐Ÿš€ DatasetPipeline

PyPI version Python 3.10+ License: MIT Downloads

Transform messy datasets into ML-ready gold. A powerful, configurable pipeline for dataset processing, quality assessment, and standardizationโ€”built by ML practitioner(s), for ML practitioners.


๐ŸŽฏ Why DatasetPipeline?

The Problem: You're drowning in data preprocessing chaos. Multiple formats, inconsistent schemas, duplicate records, quality issuesโ€”and you're spending more time wrangling data than training models.

The Solution: DatasetPipeline automates the entire journey from raw data to model-ready datasets with reproducible, configurable workflows.

Born from Real-World Pain ๐Ÿ”ฅ

This project emerged from my experience as a data engineer and MLOps practitioner. I was constantly:

  • Ingesting diverse datasets for LLM fine-tuning
  • Converting everything to OpenAI-compatible formats
  • Writing repetitive preprocessing scripts
  • Juggling deduplication, quality checks, and format conversions
  • Maintaining brittle pipelines across multiple projects

What started as manageable became overwhelming. DatasetPipeline was built to solve these exact pain pointsโ€”turning hours of manual work into minutes of configuration.


โœจ Features

Feature Description
๐Ÿ”Œ Multi-Source Loading Hugging Face datasets, local files, cloud storage
๐Ÿ”„ Format Flexibility SFT, DPO, conversational, textโ€”convert between any format
๐Ÿงน Smart Deduplication Semantic similarity using embeddings, not just exact matches
๐Ÿ“Š Quality Analysis Automated categorization and quality scoring
โš™๏ธ YAML Configuration Reproducible workflows, version-controlled pipelines
๐Ÿ–ฅ๏ธ CLI Interface Simple commands, powerful automation
๐Ÿš€ GPU Acceleration Optional GPU support for heavy processing

๐Ÿš€ Quick Start

Installation

# Recommended: Use as isolated tool
uv tool install datasetpipeline

# Or with pip
pip install datasetpipeline

# For full features (embeddings, GPU support)
pip install "datasetpipeline[all]"

Your First Pipeline

# Generate a minimal sample configuration with comments
datasetpipeline sample my-first-job.yml --template minimal

# Or generate a full sample with all options and comments
datasetpipeline sample my-first-job.yml --template full

# Run the pipeline
datasetpipeline run my-first-job.yml

# That's it! ๐ŸŽ‰

โš™๏ธ Configuration Guidelines

๐Ÿšจ Important Configuration Rule

When disabling pipeline components, you must keep the section keys present with null values. Never completely remove the top-level keys.

โœ… Correct Way to Disable Components

load:
  huggingface:
    path: "teknium/OpenHermes-2.5"
    split: "train"

format:
  sft:
    use_openai: true

# Disable deduplication - keep the key with null
deduplicate: null

# Disable analysis - keep the key with null  
analyze: null

save:
  local:
    directory: "output"
    filetype: "jsonl"

โŒ Wrong Way (Will Cause Errors)

load:
  huggingface:
    path: "teknium/OpenHermes-2.5"
    split: "train"

format:
  sft:
    use_openai: true

# DON'T DO THIS - completely removing keys
# deduplicate: <-- missing entirely
# analyze: <-- missing entirely

save:
  local:
    directory: "output"
    filetype: "jsonl"

๐Ÿ’ก Alternative: Comment Out Values, Keep Keys

load:
  huggingface:
    path: "teknium/OpenHermes-2.5"
    split: "train"

format:
  sft:
    use_openai: true

# Temporarily disable deduplication
deduplicate:
  # semantic:
  #   threshold: 0.85
  #   column: "messages"

# Disable analysis for now
analyze:
  # quality:
  #   column_name: "messages"
  #   categories: ["code", "reasoning"]

save:
  local:
    directory: "output"
    filetype: "jsonl"

Why This Matters

DatasetPipeline expects all major pipeline sections (load, format, deduplicate, analyze, save) to be present in the configuration. This design ensures:

  • Consistent pipeline structure across all jobs
  • Clear intent - you explicitly choose to skip steps vs. forgetting them
  • Easy re-enablement - uncomment values instead of rewriting sections
  • Better error messages - the pipeline knows what you intended

๐ŸŽ›๏ธ Managing Configuration Complexity

Problem: The full sample configuration can be overwhelming with all comments and options.

Solutions:

  1. Start minimal - Use --template minimal as a starting point for clean, simple configs
  2. Use templates - Pre-built configurations for common use cases (--template sft, --template dpo, --template analysis)
  3. Progressive enhancement - Start simple, add complexity as needed
  4. Reference mode - Use --template full when you need to see all available options

๐Ÿ“– Real-World Example

Transform a Hugging Face dataset into training-ready format:

# jobs/sft-training.yml
load:
  huggingface:
    path: "teknium/OpenHermes-2.5"
    split: "train"
    take_rows: 10000

format:
  sft:
    use_openai: true
    column_role_map:
      system: "system"
      human: "user" 
      gpt: "assistant"

deduplicate:
  semantic:
    threshold: 0.85
    column: "messages"

analyze:
  quality:
    column_name: "messages"
    categories: ["code", "reasoning", "creative", "factual"]

save:
  local:
    directory: "training_data"
    filetype: "jsonl"
datasetpipeline run jobs/sft-training.yml

Result: Clean, deduplicated, standardized training data ready for your LLM fine-tuning pipeline.


๐Ÿ› ๏ธ Core Commands & Sample Generation

Command Reference

Command Purpose Example
list Preview available jobs datasetpipeline list jobs/
run Execute pipeline(s) datasetpipeline run jobs/my-job.yml
sample Generate template configs datasetpipeline sample new-job.yml --template=minimal

Batch Processing

# Process all jobs in a directory
datasetpipeline run jobs/

# Preview what will run
datasetpipeline list jobs/

๐Ÿ—๏ธ Pipeline Components

๐Ÿ“ฅ Data Loading

  • Hugging Face: Direct dataset integration
  • Local Files: JSON, CSV, Parquet, JSONL
  • Cloud Storage: S3, GCS (coming soon)

๐Ÿ”ง Data Formatting

  • SFT (Supervised Fine-Tuning): OpenAI chat format
  • DPO (Direct Preference Optimization): Preference pairs
  • Conversational: Multi-turn dialogue format
  • Text: Simple text processing
  • Custom Merging: Combine multiple fields intelligently

๐Ÿงน Deduplication

  • Semantic: Embedding-based similarity detection
  • Exact: Hash-based duplicate removal
  • Fuzzy: Near-duplicate detection

๐Ÿ“Š Quality Analysis

  • Automated Categorization: Code, math, reasoning, creative writing
  • Quality Scoring: Length, complexity, coherence metrics
  • Custom Categories: Define your own quality dimensions

๐Ÿ’พ Data Saving

  • Multiple Formats: Parquet, JSONL, CSV
  • Flexible Output: Local files, structured directories
  • Metadata: Pipeline provenance and statistics

๐Ÿ“ Project Structure

datasetpipeline/
โ”œโ”€โ”€ ๐Ÿ“ฆ app/
โ”‚   โ”œโ”€โ”€ ๐Ÿ”ฌ analyzer/       # Quality analysis & categorization
โ”‚   โ”œโ”€โ”€ ๐Ÿงน dedup/          # Deduplication algorithms
โ”‚   โ”œโ”€โ”€ ๐Ÿ”„ format/         # Data format transformations
โ”‚   โ”œโ”€โ”€ ๐Ÿ“ฅ loader/         # Multi-source data loading
โ”‚   โ”œโ”€โ”€ ๐Ÿ’พ saver/          # Output handling
โ”‚   โ””โ”€โ”€ ๐Ÿ› ๏ธ helpers/       # Utilities & common functions
โ”œโ”€โ”€ โš™๏ธ jobs/              # YAML configurations
โ”œโ”€โ”€ ๐Ÿ“Š processed/         # Pipeline outputs
โ””โ”€โ”€ ๐Ÿ“œ scripts/           # Additional utilities

๐ŸŽจ Advanced Configuration

Conditional Processing

load:
  huggingface:
    path: "my-dataset"
    filters:
      quality_score: ">= 0.8"
      language: "en"

format:
  sft:
    use_openai: true
    min_message_length: 10
    max_conversation_turns: 20

# Skip deduplication for this job
deduplicate: null

analyze:
  quality:
    column_name: "text"
    min_score: 0.7
    categories: ["technical", "creative"]
    save_analysis: true

save:
  local:
    directory: "filtered_data"
    filetype: "parquet"

Quality-Based Filtering

load:
  local:
    path: "raw_data.jsonl"

# Skip formatting - data is already in correct format
format: null

deduplicate:
  exact:
    column: "content"

analyze:
  quality:
    column_name: "text"
    min_score: 0.7
    categories: ["technical", "creative"]
    save_analysis: true

save:
  local:
    directory: "cleaned_data"
    filetype: "jsonl"

Custom Deduplication

load:
  huggingface:
    path: "my-dataset"

format:
  text:
    column: "content"

deduplicate:
  semantic:
    threshold: 0.9
    model: "sentence-transformers/all-MiniLM-L6-v2"
    batch_size: 32
    use_gpu: true

# Skip analysis for faster processing
analyze: null

save:
  local:
    directory: "deduped_data"
    filetype: "parquet"

๐Ÿ—๏ธ Extensible Architecture

DatasetPipeline is built with extensibility at its core. Each major component uses Abstract Base Classes (ABC), making it incredibly easy to add new functionality:

# Want a new data loader? Just extend BaseLoader
class MyCustomLoader(BaseLoader):
    def load(self) -> Dataset:
        # Your custom loading logic
        pass

# Need a specialized formatter? Extend BaseFormatter  
class MyFormatter(BaseFormatter):
    def format(self, dataset: Dataset) -> Dataset:
        # Your formatting logic
        pass

๐Ÿ”Œ Pluggable Components

Component ABC Base Class Easy to Add
๐Ÿ“ฅ Loaders BaseLoader New data sources (APIs, databases, cloud storage)
๐Ÿ”„ Formatters BaseFormatter Custom data transformations and schemas
๐Ÿงน Deduplicators BaseDeduplicator Novel similarity algorithms
๐Ÿ“Š Analyzers BaseAnalyzer Domain-specific quality metrics
๐Ÿ’พ Savers BaseSaver New output formats and destinations

๐Ÿš€ Contribution-Friendly

This architecture means:

  • Low barrier to entry: Add one component without touching others
  • Clean interfaces: Well-defined contracts between components
  • Easy testing: Mock and test components in isolation
  • Community growth: Contributors can focus on their expertise area

Example: Want to add PostgreSQL loading? Just implement BaseLoader and you're done!


๐Ÿƒโ€โ™‚๏ธ Performance Tips

  • GPU Acceleration: Install with [gpu] extras for faster embeddings
  • Batch Processing: Use larger batch sizes for better throughput
  • Memory Management: Process large datasets in chunks
  • Caching: Embeddings are cached automatically
# High-performance setup
pip install "datasetpipeline[gpu]"
export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:512

๐Ÿค Contributing

We welcome contributions! Whether you're fixing bugs, adding features, or improving documentation:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Test your changes thoroughly
  4. Submit a pull request

Development Setup

git clone https://github.com/subhayu99/datasetpipeline
cd DatasetPipeline
uv pip install -e ".[dev]"
pre-commit install

Areas We Need Help

  • ๐ŸŒ Cloud storage integrations (S3, GCS, Azure)
  • ๐Ÿ” Advanced quality metrics
  • ๐Ÿ“ˆ Performance optimizations
  • ๐Ÿ“š Documentation and examples
  • ๐Ÿงช Test coverage improvements

๐Ÿ“„ License

MIT License - see LICENSE for details.


๐Ÿ™ Acknowledgments

Built with love by the ML community, for the ML community. Special thanks to all contributors and users who help make dataset preparation less painful.

Star the repo if DatasetPipeline saves you time! โญ


Made with โค๏ธ by Subhayu Kumar Bala

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datasetpipeline-0.2.0.tar.gz (281.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datasetpipeline-0.2.0-py3-none-any.whl (80.0 kB view details)

Uploaded Python 3

File details

Details for the file datasetpipeline-0.2.0.tar.gz.

File metadata

  • Download URL: datasetpipeline-0.2.0.tar.gz
  • Upload date:
  • Size: 281.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.12

File hashes

Hashes for datasetpipeline-0.2.0.tar.gz
Algorithm Hash digest
SHA256 fd9c70028182298c558c7d767b293a97734ed4dcb38544fa05f1a6748c2685ce
MD5 87d0edcb7b3e6f3e8106ffaf5963adbb
BLAKE2b-256 8ef9f726990eb91031a2039a58ba70f4eb18cb516b5617cbba26ff8b0b63e660

See more details on using hashes here.

File details

Details for the file datasetpipeline-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for datasetpipeline-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 504a5c6e08d20e3c4ebddb9dc83fc462118c8b8f239cab6583dc0a01e44fbfb5
MD5 10af0245b48d02934a0b5ab70f68bb70
BLAKE2b-256 191ff6dba91fb22c906347e6c25008afdf33bd0b1f20040c3d6d1d1ba5b9672f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page