Skip to main content

an ergonomic wrapper around the expression library

Project description

FP-Ops: Functional Programming Operations for Python

PyPI version Python versions codecov License Code style: black Type check: mypy

FP-Ops is a functional programming library for Python that lets you convert you functions into composable operations.

Features

  • Composition as a First-class Citizen: Build complex pipelines using simple operators like >>, &, and |
  • Context Awareness: Pass context through operation chains with automatic validation
  • Async-First: Designed for asynchronous operations from the ground up
  • Type Safety: Comprehensive type hints for better IDE support and code safety
  • Functional Patterns: Implements common functional programming patterns like map, filter, and reduce
  • Lazy Execution: Only execute operations when the result is needed
  • Composition is associative: (a >> b) >> c == a >> (b >> c)

Installation

pip install fp-ops

Getting Started

Here's a simple example to get you started:

from fp_ops.operator import operation
import asyncio

# Define some operations
@operation
async def get_user(user_id: int) -> dict:
    # Simulate API call
    return {"id": user_id, "name": "John Doe", "age": 30}

@operation
async def format_user(user: dict) -> str:
    return f"User {user['name']} is {user['age']} years old"

# Compose operations
get_and_format = get_user >> format_user

get_and_format(1)

Key Concepts

Operations

The core concept in FP-Ops is the Operation class. An operation wraps an async function and provides methods for composition using operators:

  • >> (pipeline): Passes the result of one operation to the next
  • & (parallel): Executes operations in parallel and returns all results
  • | (alternative): Tries the first operation and falls back to the second if it fails

Placeholders

You can use the placeholder _ to specify where the result of a previous operation should be inserted:

from fp_ops.placeholder import _

# Define operations
@operation
async def double(x: int) -> int:
    return x * 2

@operation
async def add(x: int, y: int) -> int:
    return x + y

# These are equivalent:
pipeline1 = double >> (lambda x: add(x, 10))
pipeline2 = double >> add(_, 10)

Context Awareness

Operations can be context-aware, allowing you to pass contextual information through the pipeline:

from fp_ops.operator import operation
from fp_ops.context import BaseContext
from pydantic import BaseModel

class UserContext(BaseContext):
    auth_token: str
    user_id: int

@operation(context=True, context_type=UserContext)
async def get_user_data(context: UserContext) -> dict:
    return {"id": context.user_id, "name": "Jane Doe"}

# Initialize context
context = UserContext(auth_token="abc123", user_id=42)

# Execute with context
result = await get_user_data(context=context)

Advanced Usage

Error Handling

FP-Ops uses the Result type for robust error handling:

@operation
async def divide(a: int, b: int) -> int:
    if b == 0:
        raise ValueError("Division by zero")
    return a / b

# Handle errors with default values
safe_divide = divide.default_value(0)

# Or with custom error handling
safe_divide = divide.catch(lambda e: 0 if isinstance(e, ValueError) else -1)

Composition Functions

Besides operators, FP-Ops provides various composition functions:

from fp_ops.composition import sequence, pipe, parallel, fallback, map, transform

# Run operations in sequence and collect all results
results = await sequence(op1, op2, op3)

# Complex pipelines with conditional logic
pipeline = pipe(
    op1,
    lambda x: op2 if x > 10 else op3,
    op4
)

# Run operations in parallel
combined = await parallel(op1, op2, op3)

# Try operations until one succeeds
result = await fallback(op1, op2, op3)

# Apply an operation to each item in an iterable
# (e.g., transforming [1, 2, 3] to [2, 3, 4] if item_op increments by 1)
mapped_results = await map(item_op, max_concurrency=5)([item1, item2, item3]) 

# Transform the output of a single operation
transformed_result = await transform(op1, lambda x: x * 2)

Higher-Order Flow Operations

FP-Ops provides utilities for creating higher-order operations:

from fp_ops.flow import branch, attempt, retry, wait, loop_until

# Conditional branching
conditional = branch(
    lambda x: x > 0,
    positive_op,
    negative_op
)

# Retry an operation
resilient_op = retry(flaky_operation, max_retries=3, delay=0.5)

# Loop until a condition is met
counter = loop_until(
    lambda x: x >= 10,
    lambda x: x + 1,
    max_iterations=20
)

Data Operations

FP-Ops provides a rich set of data manipulation operations that make it easy to work with nested data structures, collections, and transformations:

from fp_ops.data import (
    get, pick, pluck, build, merge, update,
    filter_by, group_by, sort_by, unique_by,
    map_values, map_keys, rename, omit,
    count_by, sum_by,
    to_lower, to_upper, strip, split, join
)

# Path-based access
user_data = {
    "user": {
        "profile": {"name": "John", "email": "john@example.com"},
        "orders": [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}]
    }
}

# Get nested values
name = await get("user.profile.name").execute(user_data)
email = await get("user.profile.email", "no-email@example.com").execute(user_data)

# Pick specific fields
user_info = await pick("user.profile.name", "user.profile.email").execute(user_data)

# Extract values from a list
order_amounts = await pluck("amount").execute(user_data["user"]["orders"])

# Build new objects
user_summary = await build({
    "name": get("user.profile.name"),
    "email": get("user.profile.email"),
    "order_count": lambda d: len(d["user"]["orders"]),
    "total_spent": sum_by("amount") >> get("user.orders")
}).execute(user_data)

# Collection operations
users = [
    {"name": "Alice", "role": "admin", "score": 90},
    {"name": "Bob", "role": "user", "score": 85},
    {"name": "Charlie", "role": "user", "score": 75}
]

# Filter and group
active_users = await filter_by({"role": "user"}).execute(users)
by_role = await group_by("role").execute(users)

# Sort and get unique values
sorted_users = await sort_by("score", reverse=True).execute(users)
unique_names = await unique_by("name").execute(users)

# Transform dictionaries
data = {"user_name": "John Doe", "user_email": "john@example.com"}
cleaned = await (
    map_keys(lambda k: k.replace("user_", "")) >>
    map_values(strip) >>
    rename({"name": "full_name"})
).execute(data)

# Aggregate data
status_counts = await count_by("status").execute(orders)
total_amount = await sum_by("amount").execute(orders)

These operations can be composed together to create powerful data transformation pipelines:

# Complex pipeline example
pipeline = (
    get("user.orders") >>
    filter_by({"status": "completed"}) >>
    sort_by("amount", reverse=True) >>
    pluck("amount") >>
    sum_by(lambda x: x)
)

total_completed = await pipeline.execute(user_data)

API Reference

Core Classes

  • Operation: The main class representing a composable asynchronous operation
  • BaseContext: Base class for all operation contexts
  • Placeholder: Used to represent where a previous result should be inserted

Decorators

  • @operation: Convert a function to an Operation
  • @operation(context=True, context_type=MyContext): Create a context-aware operation

Operators

  • op1 >> op2: Pipeline composition
  • op1 & op2: Parallel execution
  • op1 | op2: Alternative execution

Methods

  • operation.transform(func): Apply a transformation to the output
  • operation.filter(predicate): Filter the result using a predicate
  • operation.bind(binder): Bind to another operation
  • operation.catch(handler): Add error handling
  • operation.default_value(default): Provide a default value for errors
  • operation.retry(attempts, delay): Retry the operation
  • operation.tap(side_effect): Apply a side effect without changing the value

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fp_ops-0.2.10.tar.gz (34.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fp_ops-0.2.10-py3-none-any.whl (37.4 kB view details)

Uploaded Python 3

File details

Details for the file fp_ops-0.2.10.tar.gz.

File metadata

  • Download URL: fp_ops-0.2.10.tar.gz
  • Upload date:
  • Size: 34.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fp_ops-0.2.10.tar.gz
Algorithm Hash digest
SHA256 1668b242769babc7ffc22f73c9ccbbb133f2169df1b836d29db62d923bdb9a8e
MD5 711e1e59e7647e50b86c5f83caa11621
BLAKE2b-256 810f44effaf194ca4027e1434e522e59d9194bb9dbdce1afc2325f78ee7c1a7f

See more details on using hashes here.

File details

Details for the file fp_ops-0.2.10-py3-none-any.whl.

File metadata

  • Download URL: fp_ops-0.2.10-py3-none-any.whl
  • Upload date:
  • Size: 37.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fp_ops-0.2.10-py3-none-any.whl
Algorithm Hash digest
SHA256 5e4448cb85d180c22ba28d30e6735e27e837b77a8b9a9bf28afbcbeab4c09263
MD5 f01afc1c75c09078bb3182e5627b8f1b
BLAKE2b-256 f9cf367973b7c76c2371ef43e70ada376a627a925c84dcb4c4a629b1ecc4847d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page