A data processing and analysis pipeline designed to handle various jobs related to data transformation, quality assessment, deduplication, and formatting. The pipeline can be configured and executed using YAML configuration files.
Project description
๐ DatasetPipeline
Transform messy datasets into ML-ready gold. A powerful, configurable pipeline for dataset processing, quality assessment, and standardizationโbuilt by ML practitioners, for ML practitioners.
๐ฏ Why DatasetPipeline?
The Problem: You're drowning in data preprocessing chaos. Multiple formats, inconsistent schemas, duplicate records, quality issuesโand you're spending more time wrangling data than training models.
The Solution: DatasetPipeline automates the entire journey from raw data to model-ready datasets with reproducible, configurable workflows.
Born from Real-World Pain ๐ฅ
This project emerged from my experience as a data engineer and MLOps practitioner. I was constantly:
- Ingesting diverse datasets for LLM fine-tuning
- Converting everything to OpenAI-compatible formats
- Writing repetitive preprocessing scripts
- Juggling deduplication, quality checks, and format conversions
- Maintaining brittle pipelines across multiple projects
What started as manageable became overwhelming. DatasetPipeline was built to solve these exact pain pointsโturning hours of manual work into minutes of configuration.
โจ Features
| Feature | Description |
|---|---|
| ๐ Multi-Source Loading | Hugging Face datasets, local files, cloud storage |
| ๐ Format Flexibility | SFT, DPO, conversational, textโconvert between any format |
| ๐งน Smart Deduplication | Semantic similarity using embeddings, not just exact matches |
| ๐ Quality Analysis | Automated categorization and quality scoring |
| โ๏ธ YAML Configuration | Reproducible workflows, version-controlled pipelines |
| ๐ฅ๏ธ CLI Interface | Simple commands, powerful automation |
| ๐ GPU Acceleration | Optional GPU support for heavy processing |
๐ Quick Start
Installation
# Recommended: Use as isolated tool
uv tool install datasetpipeline
# Or with pip
pip install datasetpipeline
# For full features (embeddings, GPU support)
pip install "datasetpipeline[all]"
Your First Pipeline
# Generate a sample configuration
datasetpipeline sample my-first-job.yml
# Run the pipeline
datasetpipeline run my-first-job.yml
# That's it! ๐
๐ Real-World Example
Transform a Hugging Face dataset into training-ready format:
# jobs/sft-training.yml
load:
huggingface:
path: "teknium/OpenHermes-2.5"
split: "train"
take_rows: 10000
format:
sft:
use_openai: true
column_role_map:
system: "system"
human: "user"
gpt: "assistant"
deduplicate:
semantic:
threshold: 0.85
column: "messages"
analyze:
quality:
column_name: "messages"
categories: ["code", "reasoning", "creative", "factual"]
save:
local:
directory: "training_data"
filetype: "jsonl"
datasetpipeline run jobs/sft-training.yml
Result: Clean, deduplicated, standardized training data ready for your LLM fine-tuning pipeline.
๐ ๏ธ Core Commands
| Command | Purpose | Example |
|---|---|---|
list |
Preview available jobs | datasetpipeline list jobs/ |
run |
Execute pipeline(s) | datasetpipeline run jobs/my-job.yml |
sample |
Generate template configs | datasetpipeline sample new-job.yml |
Batch Processing
# Process all jobs in a directory
datasetpipeline run jobs/
# Preview what will run
datasetpipeline list jobs/
๐๏ธ Pipeline Components
๐ฅ Data Loading
- Hugging Face: Direct dataset integration
- Local Files: JSON, CSV, Parquet, JSONL
- Cloud Storage: S3, GCS (coming soon)
๐ง Data Formatting
- SFT (Supervised Fine-Tuning): OpenAI chat format
- DPO (Direct Preference Optimization): Preference pairs
- Conversational: Multi-turn dialogue format
- Text: Simple text processing
- Custom Merging: Combine multiple fields intelligently
๐งน Deduplication
- Semantic: Embedding-based similarity detection
- Exact: Hash-based duplicate removal
- Fuzzy: Near-duplicate detection
๐ Quality Analysis
- Automated Categorization: Code, math, reasoning, creative writing
- Quality Scoring: Length, complexity, coherence metrics
- Custom Categories: Define your own quality dimensions
๐พ Data Saving
- Multiple Formats: Parquet, JSONL, CSV
- Flexible Output: Local files, structured directories
- Metadata: Pipeline provenance and statistics
๐ Project Structure
datasetpipeline/
โโโ ๐ฆ app/
โ โโโ ๐ฌ analyzer/ # Quality analysis & categorization
โ โโโ ๐งน dedup/ # Deduplication algorithms
โ โโโ ๐ format/ # Data format transformations
โ โโโ ๐ฅ loader/ # Multi-source data loading
โ โโโ ๐พ saver/ # Output handling
โ โโโ ๐ ๏ธ helpers/ # Utilities & common functions
โโโ โ๏ธ jobs/ # YAML configurations
โโโ ๐ processed/ # Pipeline outputs
โโโ ๐ scripts/ # Additional utilities
๐จ Advanced Configuration
Conditional Processing
load:
huggingface:
path: "my-dataset"
filters:
quality_score: ">= 0.8"
language: "en"
format:
sft:
use_openai: true
min_message_length: 10
max_conversation_turns: 20
Quality-Based Filtering
analyze:
quality:
column_name: "text"
min_score: 0.7
categories: ["technical", "creative"]
save_analysis: true
Custom Deduplication
deduplicate:
semantic:
threshold: 0.9
model: "sentence-transformers/all-MiniLM-L6-v2"
batch_size: 32
use_gpu: true
๐๏ธ Extensible Architecture
DatasetPipeline is built with extensibility at its core. Each major component uses Abstract Base Classes (ABC), making it incredibly easy to add new functionality:
# Want a new data loader? Just extend BaseLoader
class MyCustomLoader(BaseLoader):
def load(self) -> Dataset:
# Your custom loading logic
pass
# Need a specialized formatter? Extend BaseFormatter
class MyFormatter(BaseFormatter):
def format(self, dataset: Dataset) -> Dataset:
# Your formatting logic
pass
๐ Pluggable Components
| Component | ABC Base Class | Easy to Add |
|---|---|---|
| ๐ฅ Loaders | BaseLoader |
New data sources (APIs, databases, cloud storage) |
| ๐ Formatters | BaseFormatter |
Custom data transformations and schemas |
| ๐งน Deduplicators | BaseDeduplicator |
Novel similarity algorithms |
| ๐ Analyzers | BaseAnalyzer |
Domain-specific quality metrics |
| ๐พ Savers | BaseSaver |
New output formats and destinations |
๐ Contribution-Friendly
This architecture means:
- Low barrier to entry: Add one component without touching others
- Clean interfaces: Well-defined contracts between components
- Easy testing: Mock and test components in isolation
- Community growth: Contributors can focus on their expertise area
Example: Want to add PostgreSQL loading? Just implement BaseLoader and you're done!
๐โโ๏ธ Performance Tips
- GPU Acceleration: Install with
[gpu]extras for faster embeddings - Batch Processing: Use larger batch sizes for better throughput
- Memory Management: Process large datasets in chunks
- Caching: Embeddings are cached automatically
# High-performance setup
pip install "datasetpipeline[gpu]"
export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:512
๐ค Contributing
We welcome contributions! Whether you're fixing bugs, adding features, or improving documentation:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Test your changes thoroughly
- Submit a pull request
Development Setup
git clone https://github.com/subhayu99/datasetpipeline
cd DatasetPipeline
uv pip install -e ".[dev]"
pre-commit install
Areas We Need Help
- ๐ Cloud storage integrations (S3, GCS, Azure)
- ๐ Advanced quality metrics
- ๐ Performance optimizations
- ๐ Documentation and examples
- ๐งช Test coverage improvements
๐ License
MIT License - see LICENSE for details.
๐ Acknowledgments
Built with love by the ML community, for the ML community. Special thanks to all contributors and users who help make dataset preparation less painful.
Star the repo if DatasetPipeline saves you time! โญ
Made with โค๏ธ by Subhayu Kumar Bala
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file datasetpipeline-0.1.8.tar.gz.
File metadata
- Download URL: datasetpipeline-0.1.8.tar.gz
- Upload date:
- Size: 279.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bdb62c5648b41c04ab6fc78840da72abd8bbbb5c68821b8ce347c8f53fbce228
|
|
| MD5 |
29ec287cf5a51ff712bf0e27a7cc38c9
|
|
| BLAKE2b-256 |
9387a331b3582b0fcbad68a370f5c38e272786fa9bd62600a8240680e08a6b64
|
File details
Details for the file datasetpipeline-0.1.8-py3-none-any.whl.
File metadata
- Download URL: datasetpipeline-0.1.8-py3-none-any.whl
- Upload date:
- Size: 77.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
089d9b750ddac08c45810915e5b55bf4b7e08d7f092b0d88e8727c7e858f05e8
|
|
| MD5 |
c5fddda86d2a81ab4e51f7261cb35ae8
|
|
| BLAKE2b-256 |
2434a4a30e810a0e117248cc0a8d737162c87d58d70faba3f9be03661eed0724
|