Skip to main content

A practical tool for managing async pipelines.

Project description

relais

PyPI version Python versions License CI Code Coverage

A high-performance async streaming pipeline library for Python.

Key Features:

  • 🚀 True Streaming: Process data as it flows through the pipeline
  • Directional Cancellation: Early termination optimizations (e.g., take(5) stops upstream processing)
  • 🔄 Concurrent Processing: All operations run concurrently with proper backpressure
  • 🛡️ Memory Efficient: Bounded memory usage with configurable stream buffers
  • 🎯 Flexible Ordering: Choose between ordered and unordered processing for optimal performance

Perfect for:

  • LLM evaluation pipelines
  • API data processing
  • Real-time data transformation
  • I/O-bound concurrent operations

usage

import relais as r

# Simple pipeline
pipeline = range(10) | r.Map(lambda x: x * 2) | r.Take(5)
result = await pipeline.collect()  # [0, 2, 4, 6, 8]

# Streaming processing
async for item in (range(100) | r.Map(lambda x: x * 2) | r.Take(5)).stream():
    print(item)  # Prints results as they become available

Installation

pip install relais

Requirements

  • Python 3.10+

Quick Start

import asyncio
import relais as r

async def main():
    # Transform and filter data
    result = await (
        range(20)
        | r.Map(lambda x: x * 2)
        | r.Filter(lambda x: x > 10)
        | r.Take(5)
    ).collect()
    print(result)  # [12, 14, 16, 18, 20]

    # Stream processing with async operations
    async def slow_square(x):
        await asyncio.sleep(0.1)  # Simulate I/O
        return x * x

    # Process items as they complete
    async for item in (range(5) | r.Map(slow_square)).stream():
        print(f"Completed: {item}")

asyncio.run(main())

API Reference

Core Operations

Transform Operations

  • r.Map(fn) - Apply function to each item (supports async functions)
  • r.Filter(fn) - Filter items based on condition
  • r.FlatMap(fn) - Flatten iterables returned by function

Collection Operations

  • r.Take(n, ordered=False) - Take first N items (with early cancellation)
  • r.Skip(n, ordered=False) - Skip first N items
  • r.Distinct(key_fn=None) - Remove duplicates
  • r.Sort(key_fn=None) - Sort items (stateful operation)
  • r.Batch(size) - Group items into batches
  • r.Reduce(fn, initial) - Accumulate values

Processing Modes

  • Unordered (default): Maximum performance, items processed as available
  • Ordered: Preserves input order, may be slower for some operations

All operations support async functions and run concurrently by default.

Performance Features

Directional Cancellation

# Only processes first 5 items, cancels upstream automatically
result = await (large_data_source | r.expensive_operation() | r.Take(5)).collect()

Memory Efficiency

# Streams through millions of items with bounded memory
async for batch in (huge_dataset | r.Map(transform) | r.Batch(100)).stream():
    process_batch(batch)  # Constant memory usage

Concurrent Processing

# All async operations run concurrently
pipeline = (
    data_source
    | r.Map(async_api_call)  # Multiple concurrent API calls
    | r.Filter(validate)     # Filters results as they arrive
    | r.Take(10)            # Stops processing after 10 valid results
)

Pipeline Composition

Pipelines are built using the intuitive | operator for chaining operations:

Basic Usage

# Data source | operations | collection
result = await (range(5) | r.Map(lambda x: x * 2) | r.Filter(lambda x: x > 4)).collect()
# [6, 8]

Runtime Input

# Define pipeline without input data
pipeline = r.Map(lambda x: x * 2) | r.Take(3)

# Apply to different data sources
result1 = await pipeline.collect(range(10))  # [0, 2, 4]
result2 = await pipeline.collect([5, 6, 7, 8])  # [10, 12, 14]

Streaming Results

import asyncio
import random

async def slow_process(x):
    await asyncio.sleep(random.uniform(0.1, 0.5))
    return x * x

pipeline = range(10) | r.Map(slow_process) | r.Filter(lambda x: x % 2 == 0)

# Process results as they become available
async for result in pipeline.stream():
    print(f"Got result: {result}")
    # Results appear in completion order, not input order

Error Handling

from relais.errors import ErrorPolicy

# Fail fast (default) - stops on first error
pipeline = r.Pipeline(
    [r.Map(might_fail), r.Filter(lambda x: x > 0)],
    error_policy=ErrorPolicy.FAIL_FAST
)

# Collect errors for later inspection
pipeline = r.Pipeline(
    [r.Map(might_fail), r.Take(10)],
    error_policy=ErrorPolicy.COLLECT
)
results, errors = await pipeline.collect_with_errors(data)

# Ignore errors and continue processing
pipeline = r.Pipeline(
    [r.Map(might_fail), r.Take(10)],
    error_policy=ErrorPolicy.IGNORE
)

Advanced: Context Manager Usage

# For fine-grained control over pipeline execution
async with await pipeline.run(data_source) as stream:
    async for event in stream:
        if isinstance(event, StreamItemEvent):
            print(f"Item: {event.item}")
        elif isinstance(event, StreamErrorEvent):
            print(f"Error: {event.error}")

        # Early termination
        if some_condition:
            break
# Pipeline automatically cleans up resources

Architecture & Performance

Streaming Architecture

Relais uses a true streaming architecture where:

  • Data flows through bounded queues between pipeline steps
  • Operations run concurrently - each step processes items as they arrive
  • Memory usage is bounded - configurable queue sizes prevent memory explosions
  • Backpressure handling - upstream producers slow down when downstream is busy

Directional Cancellation

Optimizations flow backwards through the pipeline:

# take(5) signals upstream to stop after 5 items
# This prevents processing millions of unnecessary items
huge_dataset | expensive_computation | r.Take(5)

Memory Efficiency

  • Bounded queues: Default 1000 items per stream (configurable)
  • Streaming processing: Items are processed and released immediately
  • Resource cleanup: Automatic cleanup via context managers

Performance Characteristics

  • Best for: I/O-bound operations with 100-100K items
  • Concurrent: All async operations run in parallel
  • Memory bounded: Constant memory usage regardless of input size
  • Early termination: Operations like take() provide significant optimizations

Use Cases

LLM Evaluation Pipeline

# Generate test cases → Run model → Evaluate results
test_cases | r.Map(run_llm_async) | r.Map(evaluate_response) | r.Take(100)

API Data Processing

# Fetch → Transform → Validate → Store
api_endpoints | r.Map(fetch_async) | r.Map(transform) | r.Filter(validate) | r.Batch(10)

Real-time Stream Processing

# Process events as they arrive
event_stream | r.Filter(important) | r.Map(enrich) | r.Batch(5)

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

relais-0.2.0.tar.gz (133.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

relais-0.2.0-py3-none-any.whl (35.2 kB view details)

Uploaded Python 3

File details

Details for the file relais-0.2.0.tar.gz.

File metadata

  • Download URL: relais-0.2.0.tar.gz
  • Upload date:
  • Size: 133.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for relais-0.2.0.tar.gz
Algorithm Hash digest
SHA256 134901b45f00102bab669ad6e2c89d16b316f299dc35327f857c21a17f70868b
MD5 a2f610f822f0a64be2aa57f251e3206e
BLAKE2b-256 e0aef1f4b02a31b971a6a89eb7efd4a3e53491c0037872897c110be2b09b6911

See more details on using hashes here.

Provenance

The following attestation bundles were made for relais-0.2.0.tar.gz:

Publisher: create-release.yml on Giskard-AI/relais

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file relais-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: relais-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 35.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for relais-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7ef4bf242a01a7dcffe6190aaadcfcedab7fbe7a772491fcee70cd0a042b59b8
MD5 6d047b8263b10ba23a6e89adf2e715b8
BLAKE2b-256 13359be85793a7ae5b68701c3b6fa77917386a461b7cd9c409dbbf1103eea9d1

See more details on using hashes here.

Provenance

The following attestation bundles were made for relais-0.2.0-py3-none-any.whl:

Publisher: create-release.yml on Giskard-AI/relais

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page