A functional pipeline library for Python with support for synchronous and asynchronous operations
Project description
pyperly
A functional pipeline library for Python with support for synchronous and asynchronous operations.
pyperly provides a clear and expressive way to build data processing pipelines. It allows you to chain a series of functions (transformations) in a fluent, readable manner, inspired by functional programming concepts. Whether you're working with synchronous code or complex async workflows, pyperly helps you write cleaner, more maintainable data-centric logic.
Key Features
- Fluent Interface: Chain operations together in a natural and readable way.
- Sync & Async Support: Seamlessly mix and match synchronous and asynchronous functions in your pipelines.
- Error Handling: Gracefully handle exceptions using the
Resultobject, preventing crashes. - Side Effects: Perform actions like logging or database writes without interrupting the main data flow using
apply. - Conditional Logic: Use
ensureto validate data at any stage of the pipeline and branch logic accordingly.
Installation
System Requirements:
- Python 3.10+
To install the library, run the following command:
pip install pyperly
Getting Started
Here's a simple example to get you started. Let's create a pipeline that takes a number, adds 10, and then doubles the result.
from pyperly import let
# Define the processing functions
def add_ten(n):
return n + 10
def double(n):
return n * 2
# Create and run the pipeline
initial_value = 5
result = let(initial_value).bind(add_ten).bind(double).run()
print(f"The result is: {result}")
# Output: The result is: 30
Usage Examples
This section provides more detailed examples demonstrating various features of pyperly.
1. Basic Synchronous Pipeline
This example shows a simple chain of synchronous functions. The pipeline starts with a string, processes it, and returns the length.
from pyperly import let
def to_uppercase(text: str) -> str:
print(f"Uppercasing: '{text}'")
return text.upper()
def add_exclamation(text: str) -> str:
print(f"Adding exclamation to: '{text}'")
return f"{text}!"
def get_length(text: str) -> int:
return len(text)
# Start with an initial value and chain the functions
pipeline = let("hello world").bind(to_uppercase).bind(add_exclamation).bind(get_length)
# The .run() method executes all steps in order
result = pipeline.run()
print(f"\nFinal length: {result}")
# Output:
# Uppercasing: 'hello world'
# Adding exclamation to: 'HELLO WORLD'
#
# Final length: 12
2. Asynchronous Pipeline
pyperly handles async functions just as easily. Here's a pipeline that simulates fetching data from a web API and then processing it.
import asyncio
from pyperly import alet
# Simulate an async API call
async def fetch_user_data(user_id: int) -> dict:
print(f"Fetching data for user {user_id}...")
await asyncio.sleep(0.1) # Simulate network latency
return {"id": user_id, "name": "John Doe", "email": "john.doe@example.com"}
# An async function to extract a specific field
async def get_field(data: dict, field: str) -> str:
print(f"Extracting field '{field}'...")
await asyncio.sleep(0.1)
return data.get(field, "N/A")
async def main():
# Use alet() to start an async pipeline
# Note: the first function is passed directly to alet()
user_email = await alet(fetch_user_data, 101).bind(get_field, field="email").arun()
print(f"\nUser email: {user_email}")
asyncio.run(main())
# Output:
# Fetching data for user 101...
# Extracting field 'email'...
#
# User email: john.doe@example.com
3. Handling Side Effects with apply
Sometimes you need to perform an action that doesn't change the data, like logging. Use the apply method for this. The value passed to the next step remains unchanged.
from pyperly import let
def log_value(value):
# This function is for a side effect (printing)
print(f"[LOG] Current value: {value}")
# It doesn't need to return anything meaningful
def multiply_by_three(n):
return n * 3
result = (
let(10)
.apply(log_value)
.bind(multiply_by_three)
.apply(log_value)
.run()
)
print(f"\nFinal result: {result}")
# Output:
# [LOG] Current value: 10
# [LOG] Current value: 30
#
# Final result: 30
4. Validation with ensure
You can validate the data at any point in the pipeline using ensure. If the condition fails, the pipeline stops and returns None, or a default value if one is provided.
from pyperly import let
def is_positive(n):
return n > 0
# Scenario 1: Validation passes
pipeline_success = (
let(20)
.ensure(is_positive)
.bind(lambda x: x - 5)
)
result_success = pipeline_success.run()
print(f"Success case result: {result_success}") # Output: 15
# Scenario 2: Validation fails, pipeline stops and returns None
pipeline_fail = (
let(-10)
.ensure(is_positive)
.bind(lambda x: x - 5) # This step is never reached
)
result_fail = pipeline_fail.run()
print(f"Failure case result: {result_fail}") # Output: None
# Scenario 3: Validation fails, but a default value is provided
pipeline_default = (
let(-10)
.ensure(is_positive, default=0) # If ensure fails, the pipeline continues with 0
.bind(lambda x: x + 100)
)
result_default = pipeline_default.run()
print(f"Default case result: {result_default}") # Output: 100
5. Using the & Operator for Concise Pipelines
For a more compact and expressive style, you can use the & operator as an alternative to bind(). It works with lambda functions as well as bind, apply, and ensure callbacks, allowing you to build sophisticated pipelines with minimal boilerplate.
from pyperly import let, bind, apply, ensure
# A pipeline to process a list of numbers
# 1. Start with a list.
# 2. Ensure the list is not empty, otherwise default to [0].
# 3. Log the initial list (side effect).
# 4. Sum the numbers in the list.
# 5. Add 100 to the sum.
pipeline = (
let([1, 2, 3])
& ensure(lambda x: len(x) > 0, default=[0])
& apply(lambda x: print(f"Processing list: {x}"))
& bind(sum)
& (lambda total: total + 100)
)
result = pipeline.run()
print(f"Final result: {result}")
# Output:
# Processing list: [1, 2, 3]
# Final result: 106
# Example with a failing validation
empty_list_result = (let([]) & ensure(lambda x: len(x) > 0, default=[0]) & bind(sum)).run()
print(f"\nResult with empty list: {empty_list_result}")
# Output:
# Result with empty list: 0
6. Error Handling with result
To safely execute a pipeline that might raise an exception, use the .result() or .aresult() method. It returns a Result object which is either ok or contains an error.
from pyperly import let
def divide(a, b):
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
# Successful execution
result_ok = let(10).bind(divide, 2).result()
if result_ok.ok:
print(f"Success: {result_ok.value}")
else:
print(f"Error: {result_ok.error}")
# Output: Success: 5.0
# Execution with an error
result_err = let(10).bind(divide, 0).result()
if result_err.ok:
print(f"Success: {result_err.value}")
else:
print(f"Error: {result_err.error}")
# Output: Error: Cannot divide by zero
API Overview
The core of the library revolves around the Pipeline object and a few key functions. Most functions that accept a callable (like bind, apply, and ensure) also accept a common set of keyword arguments to control their behavior.
Core Functions
let(value)orlet(fn, *args, **kwargs): Creates a new synchronous pipeline.alet(coro, *args, **kwargs): Creates a new asynchronous pipeline.pipeline.bind(fn, *args, **kwargs): Chains a transformation. The return value offnbecomes the new value in the pipeline.pipeline.abind(coro, *args, **kwargs): Chains an asynchronous transformation.pipeline.apply(fn, *args, **kwargs): Executes a function for side effects. The pipeline's value is not modified.pipeline.aapply(coro, *args, **kwargs): Executes an async function for side effects.pipeline.ensure(predicate, **kwargs): Validates the pipeline's current value. If the predicate is false, the pipeline stops (returningNone) or continues with thedefaultvalue if provided.pipeline.aensure(coro, **kwargs): Asynchronous validation.pipeline.run(is_async: bool = False, allow_none: bool = False): Executes a synchronous pipeline and returns the final value.pipeline.arun(allow_none: bool = False): Executes an asynchronous pipeline and returns the final value.pipeline.result(is_async: bool = False, allow_none: bool = False): Executes the pipeline and returns aResultobject, capturing any exceptions.pipeline.aresult(allow_none: bool = False): Executes an async pipeline and returns aResultobject.
Execution Parameters
The run, arun, result, and aresult methods accept the following parameters to control pipeline execution:
is_async: bool: When set toTrueinrun()orresult(), it forces the pipeline to execute asynchronously, even if it only contains synchronous steps. This is useful for consistent execution in mixed environments.allow_none: bool: Sets the default behavior for handlingNonefor the entire pipeline execution. If a step does not have its ownallow_nonesetting, this value is used. However, a step-specificallow_noneparameter will always override this global setting for that particular step. Defaults toFalse.
Common Parameters
These parameters can be used with bind, abind, apply, aapply, ensure, and aensure to customize their behavior:
-
result_kw: str: Instead of passing the pipeline's current value as the first positional argument to the function, this parameter allows you to pass it as a keyword argument. This is useful for functions where you can't or don't want to change the signature.def process_data(data, config): return data + config # The pipeline value (10) will be passed as the 'data' argument. let(10).bind(process_data, result_kw="data", config=5).run() # Result: 15
-
is_async: bool: Explicitly tells the pipeline to treat a function as asynchronous, even if it's not a coroutine function defined withasync def. This is an advanced use case, typically for functions that return an awaitable. Defaults toFalse. -
allow_none: bool: Controls howNonevalues are handled.- If
False(the default), the pipeline will stop execution if a step returnsNone. - If
True,Noneis treated as a valid value and is passed to the next step in the pipeline.
- If
-
default: Any: Provides a fallback value.- In
bind: If the function's result isNone, the pipeline will continue with thisdefaultvalue instead of stopping. - In
ensure: If the predicate returnsFalse, the pipeline will continue with thisdefaultvalue.
- In
Contributing
Contributions are welcome! If you have a suggestion or find a bug, please open an issue or submit a pull request.
- Fork the repository.
- Create a new branch (
git checkout -b feature/my-new-feature). - Make your changes.
- Commit your changes (
git commit -am 'Add some feature'). - Push to the branch (
git push origin feature/my-new-feature). - Create a new Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyperly-0.1.0.tar.gz.
File metadata
- Download URL: pyperly-0.1.0.tar.gz
- Upload date:
- Size: 2.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f184cc64af7b3e03376ae41bc9a2700e025a666760db93a171f70a2fe3647a9c
|
|
| MD5 |
ddb67913bad3452f2dfb7910c17aba65
|
|
| BLAKE2b-256 |
eae1e1bc1d8a0185f6056ed0c2efacb299468b8d60a53b4b6188b5c81d1b12e7
|
File details
Details for the file pyperly-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pyperly-0.1.0-py3-none-any.whl
- Upload date:
- Size: 16.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd95c32ef379af4e383ca5da83e4ca1b49bd2d67f4d3fe1afa4c2a1770c1ca6d
|
|
| MD5 |
385873195e2aba7bc79415dac8918c85
|
|
| BLAKE2b-256 |
942ee19d832d9fefe3bd11db97e922a0a1da3ad44a7b9528e6599975f3181586
|