Skip to main content

an ergonomic wrapper around the expression library

Project description

FP-Ops: Functional Programming Operations for Python

PyPI version Python versions codecov License Code style: black Type check: mypy

FP-Ops is a functional programming library for Python that lets you convert you functions into composable operations.

Features

  • Composition as a First-class Citizen: Build complex pipelines using simple operators like >>, &, and |
  • Context Awareness: Pass context through operation chains with automatic validation
  • Async-First: Designed for asynchronous operations from the ground up
  • Type Safety: Comprehensive type hints for better IDE support and code safety
  • Functional Patterns: Implements common functional programming patterns like map, filter, and reduce
  • Lazy Execution: Only execute operations when the result is needed
  • Composition is associative: (a >> b) >> c == a >> (b >> c)

Installation

pip install fp-ops

Getting Started

Here's a simple example to get you started:

from fp_ops.operator import operation
import asyncio

# Define some operations
@operation
async def get_user(user_id: int) -> dict:
    # Simulate API call
    return {"id": user_id, "name": "John Doe", "age": 30}

@operation
async def format_user(user: dict) -> str:
    return f"User {user['name']} is {user['age']} years old"

# Compose operations
get_and_format = get_user >> format_user

get_and_format(1)

Key Concepts

Operations

The core concept in FP-Ops is the Operation class. An operation wraps an async function and provides methods for composition using operators:

  • >> (pipeline): Passes the result of one operation to the next
  • & (parallel): Executes operations in parallel and returns all results
  • | (alternative): Tries the first operation and falls back to the second if it fails

Placeholders

You can use the placeholder _ to specify where the result of a previous operation should be inserted:

from fp_ops.placeholder import _

# Define operations
@operation
async def double(x: int) -> int:
    return x * 2

@operation
async def add(x: int, y: int) -> int:
    return x + y

# These are equivalent:
pipeline1 = double >> (lambda x: add(x, 10))
pipeline2 = double >> add(_, 10)

Context Awareness

Operations can be context-aware, allowing you to pass contextual information through the pipeline:

from fp_ops.operator import operation
from fp_ops.context import BaseContext
from pydantic import BaseModel

class UserContext(BaseContext):
    auth_token: str
    user_id: int

@operation(context=True, context_type=UserContext)
async def get_user_data(context: UserContext) -> dict:
    return {"id": context.user_id, "name": "Jane Doe"}

# Initialize context
context = UserContext(auth_token="abc123", user_id=42)

# Execute with context
result = await get_user_data(context=context)

Advanced Usage

Error Handling

FP-Ops uses the Result type for robust error handling:

@operation
async def divide(a: int, b: int) -> int:
    if b == 0:
        raise ValueError("Division by zero")
    return a / b

# Handle errors with default values
safe_divide = divide.default_value(0)

# Or with custom error handling
safe_divide = divide.catch(lambda e: 0 if isinstance(e, ValueError) else -1)

Composition Functions

Besides operators, FP-Ops provides various composition functions:

from fp_ops.composition import sequence, pipe, parallel, fallback, map, transform

# Run operations in sequence and collect all results
results = await sequence(op1, op2, op3)

# Complex pipelines with conditional logic
pipeline = pipe(
    op1,
    lambda x: op2 if x > 10 else op3,
    op4
)

# Run operations in parallel
combined = await parallel(op1, op2, op3)

# Try operations until one succeeds
result = await fallback(op1, op2, op3)

# Apply an operation to each item in an iterable
# (e.g., transforming [1, 2, 3] to [2, 3, 4] if item_op increments by 1)
mapped_results = await map(item_op, max_concurrency=5)([item1, item2, item3]) 

# Transform the output of a single operation
transformed_result = await transform(op1, lambda x: x * 2)

Higher-Order Flow Operations

FP-Ops provides utilities for creating higher-order operations:

from fp_ops.flow import branch, attempt, retry, wait, loop_until

# Conditional branching
conditional = branch(
    lambda x: x > 0,
    positive_op,
    negative_op
)

# Retry an operation
resilient_op = retry(flaky_operation, max_retries=3, delay=0.5)

# Loop until a condition is met
counter = loop_until(
    lambda x: x >= 10,
    lambda x: x + 1,
    max_iterations=20
)

Data Operations

FP-Ops provides a rich set of data manipulation operations that make it easy to work with nested data structures, collections, and transformations:

from fp_ops.data import (
    get, pick, pluck, build, merge, update,
    filter_by, group_by, sort_by, unique_by,
    map_values, map_keys, rename, omit,
    count_by, sum_by,
    to_lower, to_upper, strip, split, join
)

# Path-based access
user_data = {
    "user": {
        "profile": {"name": "John", "email": "john@example.com"},
        "orders": [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}]
    }
}

# Get nested values
name = await get("user.profile.name").execute(user_data)
email = await get("user.profile.email", "no-email@example.com").execute(user_data)

# Pick specific fields
user_info = await pick("user.profile.name", "user.profile.email").execute(user_data)

# Extract values from a list
order_amounts = await pluck("amount").execute(user_data["user"]["orders"])

# Build new objects
user_summary = await build({
    "name": get("user.profile.name"),
    "email": get("user.profile.email"),
    "order_count": lambda d: len(d["user"]["orders"]),
    "total_spent": sum_by("amount") >> get("user.orders")
}).execute(user_data)

# Collection operations
users = [
    {"name": "Alice", "role": "admin", "score": 90},
    {"name": "Bob", "role": "user", "score": 85},
    {"name": "Charlie", "role": "user", "score": 75}
]

# Filter and group
active_users = await filter_by({"role": "user"}).execute(users)
by_role = await group_by("role").execute(users)

# Sort and get unique values
sorted_users = await sort_by("score", reverse=True).execute(users)
unique_names = await unique_by("name").execute(users)

# Transform dictionaries
data = {"user_name": "John Doe", "user_email": "john@example.com"}
cleaned = await (
    map_keys(lambda k: k.replace("user_", "")) >>
    map_values(strip) >>
    rename({"name": "full_name"})
).execute(data)

# Aggregate data
status_counts = await count_by("status").execute(orders)
total_amount = await sum_by("amount").execute(orders)

These operations can be composed together to create powerful data transformation pipelines:

# Complex pipeline example
pipeline = (
    get("user.orders") >>
    filter_by({"status": "completed"}) >>
    sort_by("amount", reverse=True) >>
    pluck("amount") >>
    sum_by(lambda x: x)
)

total_completed = await pipeline.execute(user_data)

API Reference

Core Classes

  • Operation: The main class representing a composable asynchronous operation
  • BaseContext: Base class for all operation contexts
  • Placeholder: Used to represent where a previous result should be inserted

Decorators

  • @operation: Convert a function to an Operation
  • @operation(context=True, context_type=MyContext): Create a context-aware operation

Operators

  • op1 >> op2: Pipeline composition
  • op1 & op2: Parallel execution
  • op1 | op2: Alternative execution

Methods

  • operation.transform(func): Apply a transformation to the output
  • operation.filter(predicate): Filter the result using a predicate
  • operation.bind(binder): Bind to another operation
  • operation.catch(handler): Add error handling
  • operation.default_value(default): Provide a default value for errors
  • operation.retry(attempts, delay): Retry the operation
  • operation.tap(side_effect): Apply a side effect without changing the value

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fp_ops-0.2.7.tar.gz (34.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fp_ops-0.2.7-py3-none-any.whl (36.6 kB view details)

Uploaded Python 3

File details

Details for the file fp_ops-0.2.7.tar.gz.

File metadata

  • Download URL: fp_ops-0.2.7.tar.gz
  • Upload date:
  • Size: 34.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fp_ops-0.2.7.tar.gz
Algorithm Hash digest
SHA256 3bd5bfc2c403ac1cb042039b43262e3ae196f671a86c30dc668effe7e6f3eb26
MD5 f27eb27d623542a38b7a9c7293d37c3c
BLAKE2b-256 29164a7d40810a9259e0a081331dcfdca908c00e74faafa1a616385aed78bc68

See more details on using hashes here.

File details

Details for the file fp_ops-0.2.7-py3-none-any.whl.

File metadata

  • Download URL: fp_ops-0.2.7-py3-none-any.whl
  • Upload date:
  • Size: 36.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fp_ops-0.2.7-py3-none-any.whl
Algorithm Hash digest
SHA256 95021a7d1e331a27bbf79bfc560f5ff2be90595a9d9548bc0b0ffb2ebfb6cd8b
MD5 5cd05820d4d56ec8501e04cd419429b2
BLAKE2b-256 4de0914a9f42a0e68d855bb9149040c4e5fa157147dcb820a0daf449b1ef8e5a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page