A practical tool for managing async pipelines.
Project description
relais
A high-performance async streaming pipeline library for Python.
Key Features:
- 🚀 True Streaming: Process data as it flows through the pipeline
- ⚡ Directional Cancellation: Early termination optimizations (e.g.,
take(5)stops upstream processing) - 🔄 Concurrent Processing: All operations run concurrently with proper backpressure
- 🛡️ Memory Efficient: Bounded memory usage with configurable stream buffers
- 🎯 Flexible Ordering: Choose between ordered and unordered processing for optimal performance
Perfect for:
- LLM evaluation pipelines
- API data processing
- Real-time data transformation
- I/O-bound concurrent operations
usage
import relais as r
# Simple pipeline
pipeline = range(10) | r.Map(lambda x: x * 2) | r.Take(5)
result = await pipeline.collect() # [0, 2, 4, 6, 8]
# Streaming processing
async for item in (range(100) | r.Map(lambda x: x * 2) | r.Take(5)).stream():
print(item) # Prints results as they become available
Installation
pip install relais
Requirements
- Python 3.11+
Quick Start
import asyncio
import relais as r
async def main():
# Transform and filter data
result = await (
range(20)
| r.Map(lambda x: x * 2)
| r.Filter(lambda x: x > 10)
| r.Take(5)
).collect()
print(result) # [12, 14, 16, 18, 20]
# Stream processing with async operations
async def slow_square(x):
await asyncio.sleep(0.1) # Simulate I/O
return x * x
# Process items as they complete
async for item in (range(5) | r.Map(slow_square)).stream():
print(f"Completed: {item}")
asyncio.run(main())
API Reference
Core Operations
Transform Operations
r.Map(fn)- Apply function to each item (supports async functions)r.Filter(fn)- Filter items based on conditionr.FlatMap(fn)- Flatten iterables returned by function
Collection Operations
r.Take(n, ordered=False)- Take first N items (with early cancellation)r.Skip(n, ordered=False)- Skip first N itemsr.Distinct(key_fn=None)- Remove duplicatesr.Sort(key_fn=None)- Sort items (stateful operation)r.Batch(size)- Group items into batchesr.Reduce(fn, initial)- Accumulate values
Processing Modes
- Unordered (default): Maximum performance, items processed as available
- Ordered: Preserves input order, may be slower for some operations
All operations support async functions and run concurrently by default.
Performance Features
Directional Cancellation
# Only processes first 5 items, cancels upstream automatically
result = await (large_data_source | r.expensive_operation() | r.Take(5)).collect()
Memory Efficiency
# Streams through millions of items with bounded memory
async for batch in (huge_dataset | r.Map(transform) | r.Batch(100)).stream():
process_batch(batch) # Constant memory usage
Concurrent Processing
# All async operations run concurrently
pipeline = (
data_source
| r.Map(async_api_call) # Multiple concurrent API calls
| r.Filter(validate) # Filters results as they arrive
| r.Take(10) # Stops processing after 10 valid results
)
Pipeline Composition
Pipelines are built using the intuitive | operator for chaining operations:
Basic Usage
# Data source | operations | collection
result = await (range(5) | r.Map(lambda x: x * 2) | r.Filter(lambda x: x > 4)).collect()
# [6, 8]
Runtime Input
# Define pipeline without input data
pipeline = r.Map(lambda x: x * 2) | r.Take(3)
# Apply to different data sources
result1 = await pipeline.collect(range(10)) # [0, 2, 4]
result2 = await pipeline.collect([5, 6, 7, 8]) # [10, 12, 14]
Streaming Results
import asyncio
import random
async def slow_process(x):
await asyncio.sleep(random.uniform(0.1, 0.5))
return x * x
pipeline = range(10) | r.Map(slow_process) | r.Filter(lambda x: x % 2 == 0)
# Process results as they become available
async for result in pipeline.stream():
print(f"Got result: {result}")
# Results appear in completion order, not input order
Error Handling
from relais.errors import ErrorPolicy
# Fail fast (default) - stops on first error
pipeline = r.Pipeline(
[r.Map(might_fail), r.Filter(lambda x: x > 0)],
error_policy=ErrorPolicy.FAIL_FAST
)
# Collect errors for later inspection
pipeline = r.Pipeline(
[r.Map(might_fail), r.Take(10)],
error_policy=ErrorPolicy.COLLECT
)
combined = await pipeline.collect(data, error_policy=ErrorPolicy.COLLECT)
results = [x for x in combined if not isinstance(x, Exception)]
errors = [x for x in combined if isinstance(x, Exception)]
# Ignore errors and continue processing
pipeline = r.Pipeline(
[r.Map(might_fail), r.Take(10)],
error_policy=ErrorPolicy.IGNORE
)
Advanced: Context Manager Usage
# For fine-grained control over pipeline execution
async with await pipeline.open(data_source) as stream:
async for event in stream:
if isinstance(event, StreamItemEvent):
print(f"Item: {event.item}")
elif isinstance(event, StreamErrorEvent):
print(f"Error: {event.error}")
# Early termination
if some_condition:
break
# Pipeline automatically cleans up resources
Architecture & Performance
Streaming Architecture
Relais uses a true streaming architecture where:
- Data flows through bounded queues between pipeline steps
- Operations run concurrently - each step processes items as they arrive
- Memory usage is bounded - configurable queue sizes prevent memory explosions
- Backpressure handling - upstream producers slow down when downstream is busy
Directional Cancellation
Optimizations flow backwards through the pipeline:
# take(5) signals upstream to stop after 5 items
# This prevents processing millions of unnecessary items
huge_dataset | expensive_computation | r.Take(5)
Memory Efficiency
- Bounded queues: Default 1000 items per stream (configurable)
- Streaming processing: Items are processed and released immediately
- Resource cleanup: Automatic cleanup via context managers
Performance Characteristics
- Best for: I/O-bound operations with 100-100K items
- Concurrent: All async operations run in parallel
- Memory bounded: Constant memory usage regardless of input size
- Early termination: Operations like
take()provide significant optimizations
Use Cases
LLM Evaluation Pipeline
# Generate test cases → Run model → Evaluate results
test_cases | r.Map(run_llm_async) | r.Map(evaluate_response) | r.Take(100)
API Data Processing
# Fetch → Transform → Validate → Store
api_endpoints | r.Map(fetch_async) | r.Map(transform) | r.Filter(validate) | r.Batch(10)
Real-time Stream Processing
# Process events as they arrive
event_stream | r.Filter(important) | r.Map(enrich) | r.Batch(5)
Support
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file relais-0.2.1.tar.gz.
File metadata
- Download URL: relais-0.2.1.tar.gz
- Upload date:
- Size: 128.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a25876cc46f282d9df168d52bed5c7e5f61143a4cd797a2eab17aa5fcf24af4
|
|
| MD5 |
10505b936e187d195d273d226a4d963d
|
|
| BLAKE2b-256 |
162bd9bb9aa75f8571fd17babd76984a79a86cb4a733e37b57da216ed9d9418c
|
Provenance
The following attestation bundles were made for relais-0.2.1.tar.gz:
Publisher:
create-release.yml on Giskard-AI/relais
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
relais-0.2.1.tar.gz -
Subject digest:
2a25876cc46f282d9df168d52bed5c7e5f61143a4cd797a2eab17aa5fcf24af4 - Sigstore transparency entry: 467021188
- Sigstore integration time:
-
Permalink:
Giskard-AI/relais@48d12063b45109ed84f62ec4ffaa3616b94746dd -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Giskard-AI
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
create-release.yml@48d12063b45109ed84f62ec4ffaa3616b94746dd -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file relais-0.2.1-py3-none-any.whl.
File metadata
- Download URL: relais-0.2.1-py3-none-any.whl
- Upload date:
- Size: 36.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
853b640a7886d44f40f3c4eb29739145d697b77e6a27e56af5151aa6fcfb871b
|
|
| MD5 |
44daece86e0fff2ddf948555662c8aea
|
|
| BLAKE2b-256 |
655010d14257b10ee34372cdf98e7142a7a2d5477436498c20f4dae64f4a45c4
|
Provenance
The following attestation bundles were made for relais-0.2.1-py3-none-any.whl:
Publisher:
create-release.yml on Giskard-AI/relais
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
relais-0.2.1-py3-none-any.whl -
Subject digest:
853b640a7886d44f40f3c4eb29739145d697b77e6a27e56af5151aa6fcfb871b - Sigstore transparency entry: 467021214
- Sigstore integration time:
-
Permalink:
Giskard-AI/relais@48d12063b45109ed84f62ec4ffaa3616b94746dd -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Giskard-AI
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
create-release.yml@48d12063b45109ed84f62ec4ffaa3616b94746dd -
Trigger Event:
workflow_dispatch
-
Statement type: