Skip to main content

A functional pipeline library for Python with support for synchronous and asynchronous operations

Project description

pyperly

PyPI version License: MIT Build Status Python versions

A functional pipeline library for Python with support for synchronous and asynchronous operations.

pyperly provides a clear and expressive way to build data processing pipelines. It allows you to chain a series of functions (transformations) in a fluent, readable manner, inspired by functional programming concepts. Whether you're working with synchronous code or complex async workflows, pyperly helps you write cleaner, more maintainable data-centric logic.

Key Features

  • Fluent Interface: Chain operations together in a natural and readable way.
  • Sync & Async Support: Seamlessly mix and match synchronous and asynchronous functions in your pipelines.
  • Error Handling: Gracefully handle exceptions using the Result object, preventing crashes.
  • Side Effects: Perform actions like logging or database writes without interrupting the main data flow using apply.
  • Conditional Logic: Use ensure to validate data at any stage of the pipeline and branch logic accordingly.

Installation

System Requirements:

  • Python 3.10+

To install the library, run the following command:

pip install pyperly

Getting Started

Here's a simple example to get you started. Let's create a pipeline that takes a number, adds 10, and then doubles the result.

from pyperly import let

# Define the processing functions
def add_ten(n):
    return n + 10

def double(n):
    return n * 2

# Create and run the pipeline
initial_value = 5
result = let(initial_value).bind(add_ten).bind(double).run()

print(f"The result is: {result}")
# Output: The result is: 30

Usage Examples

This section provides more detailed examples demonstrating various features of pyperly.

1. Basic Synchronous Pipeline

This example shows a simple chain of synchronous functions. The pipeline starts with a string, processes it, and returns the length.

from pyperly import let

def to_uppercase(text: str) -> str:
    print(f"Uppercasing: '{text}'")
    return text.upper()

def add_exclamation(text: str) -> str:
    print(f"Adding exclamation to: '{text}'")
    return f"{text}!"

def get_length(text: str) -> int:
    return len(text)

# Start with an initial value and chain the functions
pipeline = let("hello world").bind(to_uppercase).bind(add_exclamation).bind(get_length)

# The .run() method executes all steps in order
result = pipeline.run()

print(f"\nFinal length: {result}")
# Output:
# Uppercasing: 'hello world'
# Adding exclamation to: 'HELLO WORLD'
#
# Final length: 12

2. Asynchronous Pipeline

pyperly handles async functions just as easily. Here's a pipeline that simulates fetching data from a web API and then processing it.

import asyncio
from pyperly import alet

# Simulate an async API call
async def fetch_user_data(user_id: int) -> dict:
    print(f"Fetching data for user {user_id}...")
    await asyncio.sleep(0.1)  # Simulate network latency
    return {"id": user_id, "name": "John Doe", "email": "john.doe@example.com"}

# An async function to extract a specific field
async def get_field(data: dict, field: str) -> str:
    print(f"Extracting field '{field}'...")
    await asyncio.sleep(0.1)
    return data.get(field, "N/A")

async def main():
    # Use alet() to start an async pipeline
    # Note: the first function is passed directly to alet()
    user_email = await alet(fetch_user_data, 101).bind(get_field, field="email").arun()

    print(f"\nUser email: {user_email}")

asyncio.run(main())
# Output:
# Fetching data for user 101...
# Extracting field 'email'...
#
# User email: john.doe@example.com

3. Handling Side Effects with apply

Sometimes you need to perform an action that doesn't change the data, like logging. Use the apply method for this. The value passed to the next step remains unchanged.

from pyperly import let

def log_value(value):
    # This function is for a side effect (printing)
    print(f"[LOG] Current value: {value}")
    # It doesn't need to return anything meaningful

def multiply_by_three(n):
    return n * 3

result = (
    let(10)
    .apply(log_value)
    .bind(multiply_by_three)
    .apply(log_value)
    .run()
)

print(f"\nFinal result: {result}")
# Output:
# [LOG] Current value: 10
# [LOG] Current value: 30
#
# Final result: 30

4. Validation with ensure

You can validate the data at any point in the pipeline using ensure. If the condition fails, the pipeline stops and returns None, or a default value if one is provided.

from pyperly import let

def is_positive(n):
    return n > 0

# Scenario 1: Validation passes
pipeline_success = (
    let(20)
    .ensure(is_positive)
    .bind(lambda x: x - 5)
)
result_success = pipeline_success.run()
print(f"Success case result: {result_success}") # Output: 15

# Scenario 2: Validation fails, pipeline stops and returns None
pipeline_fail = (
    let(-10)
    .ensure(is_positive)
    .bind(lambda x: x - 5) # This step is never reached
)
result_fail = pipeline_fail.run()
print(f"Failure case result: {result_fail}") # Output: None

# Scenario 3: Validation fails, but a default value is provided
pipeline_default = (
    let(-10)
    .ensure(is_positive, default=0) # If ensure fails, the pipeline continues with 0
    .bind(lambda x: x + 100)
)
result_default = pipeline_default.run()
print(f"Default case result: {result_default}") # Output: 100

5. Using the & Operator for Concise Pipelines

For a more compact and expressive style, you can use the & operator as an alternative to bind(). It works with lambda functions as well as bind, apply, and ensure callbacks, allowing you to build sophisticated pipelines with minimal boilerplate.

from pyperly import let, bind, apply, ensure

# A pipeline to process a list of numbers
# 1. Start with a list.
# 2. Ensure the list is not empty, otherwise default to [0].
# 3. Log the initial list (side effect).
# 4. Sum the numbers in the list.
# 5. Add 100 to the sum.
pipeline = (
    let([1, 2, 3])
    & ensure(lambda x: len(x) > 0, default=[0])
    & apply(lambda x: print(f"Processing list: {x}"))
    & bind(sum)
    & (lambda total: total + 100)
)

result = pipeline.run()
print(f"Final result: {result}")
# Output:
# Processing list: [1, 2, 3]
# Final result: 106

# Example with a failing validation
empty_list_result = (let([]) & ensure(lambda x: len(x) > 0, default=[0]) & bind(sum)).run()
print(f"\nResult with empty list: {empty_list_result}")
# Output:
# Result with empty list: 0

6. Error Handling with result

To safely execute a pipeline that might raise an exception, use the .result() or .aresult() method. It returns a Result object which is either ok or contains an error.

from pyperly import let

def divide(a, b):
    if b == 0:
        raise ValueError("Cannot divide by zero")
    return a / b

# Successful execution
result_ok = let(10).bind(divide, 2).result()
if result_ok.ok:
    print(f"Success: {result_ok.value}")
else:
    print(f"Error: {result_ok.error}")
# Output: Success: 5.0

# Execution with an error
result_err = let(10).bind(divide, 0).result()
if result_err.ok:
    print(f"Success: {result_err.value}")
else:
    print(f"Error: {result_err.error}")
# Output: Error: Cannot divide by zero

API Overview

The core of the library revolves around the Pipeline object and a few key functions. Most functions that accept a callable (like bind, apply, and ensure) also accept a common set of keyword arguments to control their behavior.

Core Functions

  • let(value) or let(fn, *args, **kwargs): Creates a new synchronous pipeline.
  • alet(coro, *args, **kwargs): Creates a new asynchronous pipeline.
  • pipeline.bind(fn, *args, **kwargs): Chains a transformation. The return value of fn becomes the new value in the pipeline.
  • pipeline.abind(coro, *args, **kwargs): Chains an asynchronous transformation.
  • pipeline.apply(fn, *args, **kwargs): Executes a function for side effects. The pipeline's value is not modified.
  • pipeline.aapply(coro, *args, **kwargs): Executes an async function for side effects.
  • pipeline.ensure(predicate, **kwargs): Validates the pipeline's current value. If the predicate is false, the pipeline stops (returning None) or continues with the default value if provided.
  • pipeline.aensure(coro, **kwargs): Asynchronous validation.
  • pipeline.run(is_async: bool = False, allow_none: bool = False): Executes a synchronous pipeline and returns the final value.
  • pipeline.arun(allow_none: bool = False): Executes an asynchronous pipeline and returns the final value.
  • pipeline.result(is_async: bool = False, allow_none: bool = False): Executes the pipeline and returns a Result object, capturing any exceptions.
  • pipeline.aresult(allow_none: bool = False): Executes an async pipeline and returns a Result object.

Execution Parameters

The run, arun, result, and aresult methods accept the following parameters to control pipeline execution:

  • is_async: bool: When set to True in run() or result(), it forces the pipeline to execute asynchronously, even if it only contains synchronous steps. This is useful for consistent execution in mixed environments.
  • allow_none: bool: Sets the default behavior for handling None for the entire pipeline execution. If a step does not have its own allow_none setting, this value is used. However, a step-specific allow_none parameter will always override this global setting for that particular step. Defaults to False.

Common Parameters

These parameters can be used with bind, abind, apply, aapply, ensure, and aensure to customize their behavior:

  • result_kw: str: Instead of passing the pipeline's current value as the first positional argument to the function, this parameter allows you to pass it as a keyword argument. This is useful for functions where you can't or don't want to change the signature.

    def process_data(data, config):
        return data + config
    
    # The pipeline value (10) will be passed as the 'data' argument.
    let(10).bind(process_data, result_kw="data", config=5).run() # Result: 15
    
  • is_async: bool: Explicitly tells the pipeline to treat a function as asynchronous, even if it's not a coroutine function defined with async def. This is an advanced use case, typically for functions that return an awaitable. Defaults to False.

  • allow_none: bool: Controls how None values are handled.

    • If False (the default), the pipeline will stop execution if a step returns None.
    • If True, None is treated as a valid value and is passed to the next step in the pipeline.
  • default: Any: Provides a fallback value.

    • In bind: If the function's result is None, the pipeline will continue with this default value instead of stopping.
    • In ensure: If the predicate returns False, the pipeline will continue with this default value.

Contributing

Contributions are welcome! If you have a suggestion or find a bug, please open an issue or submit a pull request.

  1. Fork the repository.
  2. Create a new branch (git checkout -b feature/my-new-feature).
  3. Make your changes.
  4. Commit your changes (git commit -am 'Add some feature').
  5. Push to the branch (git push origin feature/my-new-feature).
  6. Create a new Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyperly-0.1.0.tar.gz (2.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyperly-0.1.0-py3-none-any.whl (16.5 kB view details)

Uploaded Python 3

File details

Details for the file pyperly-0.1.0.tar.gz.

File metadata

  • Download URL: pyperly-0.1.0.tar.gz
  • Upload date:
  • Size: 2.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for pyperly-0.1.0.tar.gz
Algorithm Hash digest
SHA256 f184cc64af7b3e03376ae41bc9a2700e025a666760db93a171f70a2fe3647a9c
MD5 ddb67913bad3452f2dfb7910c17aba65
BLAKE2b-256 eae1e1bc1d8a0185f6056ed0c2efacb299468b8d60a53b4b6188b5c81d1b12e7

See more details on using hashes here.

File details

Details for the file pyperly-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: pyperly-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 16.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for pyperly-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bd95c32ef379af4e383ca5da83e4ca1b49bd2d67f4d3fe1afa4c2a1770c1ca6d
MD5 385873195e2aba7bc79415dac8918c85
BLAKE2b-256 942ee19d832d9fefe3bd11db97e922a0a1da3ad44a7b9528e6599975f3181586

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page