A flexible data pipeline library for custom data processing workflows
Project description
PipelineHub
A flexible Python library for creating custom data processing workflows with ease.
✨ Features
- 🔧 Flexible: Add any callable function as a processing step
- 🔗 Chainable: Fluent method chaining for clean, readable code
- 🐛 Debuggable: Verbose mode shows data flow between steps
- 🧪 Testable: Clear error handling with step identification
- 📦 Lightweight: Zero external dependencies
- 🎯 Type-friendly: Full type hints for better IDE support
- 🚀 Performance: Minimal overhead for maximum speed
- 🔄 Reusable: Create pipelines once, use with different datasets
Installation
pip install pipelinehub
📖 Quick Start
from pipelinehub import DataPipeline, normalize_data, square_numbers
# Create a pipeline with multiple steps
pipeline = DataPipeline()
pipeline.add_step(lambda x: [i for i in x if i > 0], "filter_positive")
pipeline.add_step(square_numbers, "square")
pipeline.add_step(normalize_data, "normalize")
# Execute with sample data
data = [-2, -1, 0, 1, 2, 3, 4, 5]
result = pipeline.execute(data, verbose=True)
print(result)
🔗 Method Chaining
Create pipelines fluently with method chaining:
from pipelinehub import DataPipeline, add_constant
# Chain operations together
result = (DataPipeline()
.add_step(lambda x: [i for i in x if i % 2 == 0], "filter_even")
.add_step(add_constant(10), "add_10")
.add_step(lambda x: sorted(x, reverse=True), "sort_desc")
.execute([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
print(result)
📚 Comprehensive Examples
Data Cleaning Pipeline
from pipelinehub import DataPipeline, outlier_removal, normalize_data, calculate_stats
# Create a data cleaning pipeline
cleaning_pipeline = (DataPipeline()
.add_step(lambda x: [float(i) for i in x if i is not None], "convert_and_filter")
.add_step(lambda x: outlier_removal(x, threshold=2.5), "remove_outliers")
.add_step(normalize_data, "normalize")
.add_step(calculate_stats, "final_stats"))
# Process messy data
messy_data = [1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9]
stats = cleaning_pipeline.execute(messy_data, verbose=True)
print(stats)
Text Processing Pipeline
import re
from pipelinehub import DataPipeline
def clean_text(text):
"""Remove special characters and extra whitespace."""
text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
return ' '.join(text.split())
def extract_keywords(words, min_length=4):
"""Extract words longer than min_length."""
return [word for word in words if len(word) >= min_length]
# Build text processing pipeline
text_pipeline = (DataPipeline()
.add_step(str.lower, "lowercase")
.add_step(clean_text, "clean")
.add_step(str.split, "tokenize")
.add_step(lambda words: extract_keywords(words, min_length=4), "extract_keywords")
.add_step(lambda words: sorted(set(words)), "unique_and_sort"))
# Process text
text = "Hello World! This is a Sample Text for Processing... With special chars!!!"
keywords = text_pipeline.execute(text, verbose=True)
print(keywords)
Pipeline Management
pipeline = DataPipeline()
pipeline.add_step(lambda x: [i*2 for i in x], "double")
pipeline.add_step(lambda x: [i+1 for i in x], "add_one")
# Inspect pipeline
print(len(pipeline)) # 2
print(pipeline.get_steps()) # ['double', 'add_one']
print(pipeline) # DataPipeline(2 steps: double, add_one)
# Remove steps
pipeline.remove_step(0) # Remove first step
print(pipeline.get_steps()) # ['add_one']
# Clear all steps
pipeline.clear_steps()
print(len(pipeline)) # 0
🚀 Performance Tips
- Use built-in functions when possible - they're optimized
- Avoid creating large intermediate data structures
- Consider using generators for large datasets:
def generator_step(data):
"""Use generator for memory efficiency."""
for item in data:
if item > 0:
yield item * 2
pipeline = DataPipeline().add_step(lambda x: list(generator_step(x)), "process")
🤝 Contributing
Contributions are welcome! Here's how to get started:
- Fork the repository
- Create a feature branch: git checkout -b feature/amazing-feature
- Make your changes and add tests
- Run tests: pytest tests/
- Commit your changes: git commit -m 'Add amazing feature'
- Push to branch: git push origin feature/amazing-feature
- Open a Pull Request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙋♂️ Support
Discussions: GitHub Discussions
🎉 Acknowledgments
- Inspired by functional programming and Unix pipes philosophy
- Built with ❤️ for the Python community
- Thanks to all contributors and users!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pipelinehub-0.1.2.tar.gz.
File metadata
- Download URL: pipelinehub-0.1.2.tar.gz
- Upload date:
- Size: 22.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
205ad8fe75faf4a035f2a37eada027b9ee006887d7caa495156abcb4b779aa11
|
|
| MD5 |
73b3880bbeb43a3910b690f9c07167a3
|
|
| BLAKE2b-256 |
d5fe59da7cbe5af39047d274dc092afa3ff90ef59bb4adedb7bb0c15e77ec215
|
Provenance
The following attestation bundles were made for pipelinehub-0.1.2.tar.gz:
Publisher:
publish.yml on rahulxj100/pipelinehub
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pipelinehub-0.1.2.tar.gz -
Subject digest:
205ad8fe75faf4a035f2a37eada027b9ee006887d7caa495156abcb4b779aa11 - Sigstore transparency entry: 1912943748
- Sigstore integration time:
-
Permalink:
rahulxj100/pipelinehub@555b3ae77b94d30a12960e3a0bdd08ea1bf4977c -
Branch / Tag:
refs/heads/main - Owner: https://github.com/rahulxj100
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@555b3ae77b94d30a12960e3a0bdd08ea1bf4977c -
Trigger Event:
push
-
Statement type:
File details
Details for the file pipelinehub-0.1.2-py3-none-any.whl.
File metadata
- Download URL: pipelinehub-0.1.2-py3-none-any.whl
- Upload date:
- Size: 16.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78aebf245000598480694e577d33bc7036ea89b30f2dd38ca5908d1c2031f456
|
|
| MD5 |
b023a90f77b116646a4b206122d60248
|
|
| BLAKE2b-256 |
53b0bf8ca47b2dfc86ed8e92bccf8ad799f4aca66ee21668d809069ef555d074
|
Provenance
The following attestation bundles were made for pipelinehub-0.1.2-py3-none-any.whl:
Publisher:
publish.yml on rahulxj100/pipelinehub
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pipelinehub-0.1.2-py3-none-any.whl -
Subject digest:
78aebf245000598480694e577d33bc7036ea89b30f2dd38ca5908d1c2031f456 - Sigstore transparency entry: 1912944128
- Sigstore integration time:
-
Permalink:
rahulxj100/pipelinehub@555b3ae77b94d30a12960e3a0bdd08ea1bf4977c -
Branch / Tag:
refs/heads/main - Owner: https://github.com/rahulxj100
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@555b3ae77b94d30a12960e3a0bdd08ea1bf4977c -
Trigger Event:
push
-
Statement type: