Skip to main content

A type-safe, async-first data processing pipeline framework

Project description

Rivusio

codecov PyPI version Documentation Status Python versions License Tests Downloads Ruff Type Checking: mypy Security: bandit

A blazing-fast, type-safe data processing pipeline framework in Python, designed for seamless integration of both synchronous and asynchronous operations, complete with robust parallel execution for ultimate performance and scalability.

Features

  • Generic, type-safe data processing pipeline framework
  • Support for both sync and async operations
  • Parallel execution strategies for improved performance
  • Built-in configuration management using Pydantic
  • Clean, modular architecture that's easy to extend
  • Comprehensive error handling and monitoring
  • Stream processing capabilities

Installation

pip install rivusio

Quick Start

from typing import Dict, List
from rivusio import AsyncBasePipe, SyncBasePipe, AsyncPipeline
from rivusio.core.parallel import ExecutionStrategy

# Async pipe example
class FilterPipe(AsyncBasePipe[Dict, Dict]):
    async def process(self, data: Dict) -> Dict:
        return {k: v for k, v in data.items() if v > 100}

# Sync pipe example
class TransformPipe(SyncBasePipe[Dict, List]):
    def process(self, data: Dict) -> List:
        return list(data.values())

# Create pipeline using >> operator
pipeline = AsyncPipeline([FilterPipe(), TransformPipe()])

# Process single item
data = {"a": 150, "b": 50, "c": 200}
result = await pipeline(data)  # [150, 200]

# Process multiple items in parallel
async with pipeline:
    pipeline.configure_parallel(ExecutionStrategy.THREAD_POOL)
    batch = [
        {"a": 150, "b": 50, "c": 200},
        {"x": 300, "y": 75, "z": 400}
    ]
    results = await pipeline.execute_parallel(batch)

Documentation

For complete documentation, visit https://rivusio.readthedocs.io

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rivusio-0.1.0.tar.gz (21.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rivusio-0.1.0-py3-none-any.whl (26.3 kB view details)

Uploaded Python 3

File details

Details for the file rivusio-0.1.0.tar.gz.

File metadata

  • Download URL: rivusio-0.1.0.tar.gz
  • Upload date:
  • Size: 21.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.16 Linux/6.8.0-1020-azure

File hashes

Hashes for rivusio-0.1.0.tar.gz
Algorithm Hash digest
SHA256 41a81c635ab1f906e61e741373c43f264ae785215b730e63b1ab9e5ea8f351f0
MD5 fe7be08de4c448074983794ad8d23e87
BLAKE2b-256 4898399d0e61f87963564ebfd0abe6b6d5b5fc433cba93d0bc17e030bb822c1f

See more details on using hashes here.

File details

Details for the file rivusio-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rivusio-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 26.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.16 Linux/6.8.0-1020-azure

File hashes

Hashes for rivusio-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cbb34a595612392d3222120a03619417c42674c9e491ac37632eb7cba1936bfc
MD5 676e4c085ae1583e3cd4d259c734aeb3
BLAKE2b-256 1c7f17289265bff8f3e9543353a24e0780506904c89e4d1317e98f1b7b97ff7f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page