Skip to main content

A flexible data pipeline library for custom data processing workflows

Project description

PipelineHub

License: MIT

A flexible Python library for creating custom data processing workflows with ease.

✨ Features

  • 🔧 Flexible: Add any callable function as a processing step
  • 🔗 Chainable: Fluent method chaining for clean, readable code
  • 🐛 Debuggable: Verbose mode shows data flow between steps
  • 🧪 Testable: Clear error handling with step identification
  • 📦 Lightweight: Zero external dependencies
  • 🎯 Type-friendly: Full type hints for better IDE support
  • 🚀 Performance: Minimal overhead for maximum speed
  • 🔄 Reusable: Create pipelines once, use with different datasets

Installation

pip install pipelinehub

📖 Quick Start

from pipelinehub import DataPipeline, normalize_data, square_numbers

# Create a pipeline with multiple steps
pipeline = DataPipeline()
pipeline.add_step(lambda x: [i for i in x if i > 0], "filter_positive")
pipeline.add_step(square_numbers, "square")
pipeline.add_step(normalize_data, "normalize")

# Execute with sample data
data = [-2, -1, 0, 1, 2, 3, 4, 5]
result = pipeline.execute(data, verbose=True)

print(result)

🔗 Method Chaining

Create pipelines fluently with method chaining:

from pipelinehub import DataPipeline, add_constant

# Chain operations together
result = (DataPipeline()
          .add_step(lambda x: [i for i in x if i % 2 == 0], "filter_even")
          .add_step(add_constant(10), "add_10")  
          .add_step(lambda x: sorted(x, reverse=True), "sort_desc")
          .execute([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))

print(result) 

📚 Comprehensive Examples

Data Cleaning Pipeline

from pipelinehub import DataPipeline, outlier_removal, normalize_data, calculate_stats

# Create a data cleaning pipeline
cleaning_pipeline = (DataPipeline()
    .add_step(lambda x: [float(i) for i in x if i is not None], "convert_and_filter")
    .add_step(lambda x: outlier_removal(x, threshold=2.5), "remove_outliers") 
    .add_step(normalize_data, "normalize")
    .add_step(calculate_stats, "final_stats"))

# Process messy data
messy_data = [1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9]
stats = cleaning_pipeline.execute(messy_data, verbose=True)
print(stats)

Text Processing Pipeline

import re
from pipelinehub import DataPipeline

def clean_text(text):
    """Remove special characters and extra whitespace."""
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    return ' '.join(text.split())

def extract_keywords(words, min_length=4):
    """Extract words longer than min_length."""
    return [word for word in words if len(word) >= min_length]

# Build text processing pipeline
text_pipeline = (DataPipeline()
    .add_step(str.lower, "lowercase")
    .add_step(clean_text, "clean")
    .add_step(str.split, "tokenize") 
    .add_step(lambda words: extract_keywords(words, min_length=4), "extract_keywords")
    .add_step(lambda words: sorted(set(words)), "unique_and_sort"))

# Process text
text = "Hello World! This is a Sample Text for Processing... With special chars!!!"
keywords = text_pipeline.execute(text, verbose=True)
print(keywords)

Pipeline Management

pipeline = DataPipeline()
pipeline.add_step(lambda x: [i*2 for i in x], "double")
pipeline.add_step(lambda x: [i+1 for i in x], "add_one")

# Inspect pipeline
print(len(pipeline))  # 2
print(pipeline.get_steps())  # ['double', 'add_one']
print(pipeline)  # DataPipeline(2 steps: double, add_one)

# Remove steps
pipeline.remove_step(0)  # Remove first step
print(pipeline.get_steps())  # ['add_one']

# Clear all steps
pipeline.clear_steps()
print(len(pipeline))  # 0

🚀 Performance Tips

  • Use built-in functions when possible - they're optimized
  • Avoid creating large intermediate data structures
  • Consider using generators for large datasets:
def generator_step(data):
    """Use generator for memory efficiency."""
    for item in data:
        if item > 0:
            yield item * 2

pipeline = DataPipeline().add_step(lambda x: list(generator_step(x)), "process")

🤝 Contributing

Contributions are welcome! Here's how to get started:

  • Fork the repository
  • Create a feature branch: git checkout -b feature/amazing-feature
  • Make your changes and add tests
  • Run tests: pytest tests/
  • Commit your changes: git commit -m 'Add amazing feature'
  • Push to branch: git push origin feature/amazing-feature
  • Open a Pull Request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙋‍♂️ Support

Discussions: GitHub Discussions

🎉 Acknowledgments

  • Inspired by functional programming and Unix pipes philosophy
  • Built with ❤️ for the Python community
  • Thanks to all contributors and users!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinehub-0.1.2.tar.gz (22.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipelinehub-0.1.2-py3-none-any.whl (16.4 kB view details)

Uploaded Python 3

File details

Details for the file pipelinehub-0.1.2.tar.gz.

File metadata

  • Download URL: pipelinehub-0.1.2.tar.gz
  • Upload date:
  • Size: 22.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelinehub-0.1.2.tar.gz
Algorithm Hash digest
SHA256 205ad8fe75faf4a035f2a37eada027b9ee006887d7caa495156abcb4b779aa11
MD5 73b3880bbeb43a3910b690f9c07167a3
BLAKE2b-256 d5fe59da7cbe5af39047d274dc092afa3ff90ef59bb4adedb7bb0c15e77ec215

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelinehub-0.1.2.tar.gz:

Publisher: publish.yml on rahulxj100/pipelinehub

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pipelinehub-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pipelinehub-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 16.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelinehub-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 78aebf245000598480694e577d33bc7036ea89b30f2dd38ca5908d1c2031f456
MD5 b023a90f77b116646a4b206122d60248
BLAKE2b-256 53b0bf8ca47b2dfc86ed8e92bccf8ad799f4aca66ee21668d809069ef555d074

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelinehub-0.1.2-py3-none-any.whl:

Publisher: publish.yml on rahulxj100/pipelinehub

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page