Skip to main content

Fluent reliability primitives for Python functions: safe attempts, retries, timeouts, fallbacks, and explicit Result values.

Project description

Fluentity

Tests PyPI Python License

Fluent reliability primitives for Python functions.

Fluentity helps Python code keep moving: safe attempts, retries, timeouts, fallbacks, explicit Result values, and async control chains.

It turns fragile code with try/except blocks, retry loops, result checks, fallbacks, logging hooks, and async timeouts into readable execution chains.

It is not another fluent collection wrapper. Libraries such as flupy and pyfluent-iterables are great when your main problem is transforming iterables with .map(), .filter(), .chunk(), .to_list(), and similar operations.

Fluentity solves a different problem: running unsafe operations safely.

from fluentity import attempt

orders = (
    attempt(load_user, user_id)
    .retry(times=3, delay=1)
    .timeout(10)
    .ensure(lambda user: user.is_active, "User is inactive")
    .then(load_orders)
    .tap(lambda orders: logger.info(f"Loaded {len(orders)} orders"))
    .tap_error(lambda error: logger.exception(f"Failed to load orders: {error}"))
    .recover_value([])
    .run()
    .unwrap()
)

For async code, you can skip the explicit .arun() and await the configured attempt directly:

result = await (
    attempt(fetch_user, user_id)
    .retry(times=3, delay=1)
    .timeout(10)
    .ensure(lambda user: user.is_active, "Inactive user")
    .then(fetch_orders)
    .recover_value([])
)

Why Fluentity?

Use Fluentity when an operation can fail, return invalid data, need a retry, require a fallback, or be composed with more steps.

Good fit:

  • HTTP/API calls;
  • Telegram bots and background handlers;
  • parsers and scraping scripts;
  • local automation scripts;
  • ETL steps;
  • LLM calls;
  • code where you want explicit Ok(...) / Err(...) instead of uncontrolled exceptions.

Not the main goal:

  • replacing list comprehensions;
  • replacing itertools;
  • becoming a DataFrame library;
  • cloning fluent iterable libraries.

Installation

pip install fluentity

For the optional SQLite task manager:

pip install "fluentity[tasks]"

For development:

git clone https://github.com/megamen32/fluentity.git
cd fluentity
pip install -e .[dev]
pytest

Build and check the package locally before publishing:

python -m build
python -m twine check dist/*

Core API

from fluentity import (
    attempt,
    policy,
    try_catch,
    chain,
    choose,
    get_path,
    safe,
    retryable,
    timeoutable,
    Ok,
    Err,
    Result,
)

attempt(...) builds one reliable operation.

policy(...) builds reusable reliability settings.

try_catch(...) gives you a fluent explicit try/except/else/finally block.

chain() builds a readable async-first pipeline with steps, delays, waits, and hooks.

choose() builds a compact conditional branch tree.

get_path(...) safely reads nested dictionaries, lists, and objects.

Result makes success and failure explicit.

Safe function execution

from fluentity import attempt

result = attempt(int, "123").run()

assert result.unwrap() == 123

If the function raises, the program does not crash:

result = attempt(int, "abc").run()

assert result.is_err
assert result.unwrap_or(0) == 0

Retry

result = (
    attempt(fetch_json, "https://example.com/api/user")
    .retry(times=3, delay=1, backoff=2)
    .run()
)

Retries apply to the initial unsafe call. Later steps are only executed if the call succeeds.

Retry by result with retry_if

Sometimes the function succeeds technically, but returns a temporary bad value such as "pending", None, or an incomplete response.

result = (
    attempt(fetch_status)
    .retry_if(lambda status: status == "pending", times=5, delay=1)
    .run()
)

If every attempt still matches the predicate, the result becomes Err(RetryConditionError(...)).

Validation with ensure and ensure_not_none

result = (
    attempt(load_user, user_id)
    .ensure(lambda user: user.is_active, "User is inactive")
    .ensure(lambda user: user.email, "User has no email")
    .run()
)

ensure is not collection filtering. It is a contract check for a successful value.

For the common None case:

result = (
    attempt(find_user, user_id)
    .ensure_not_none("User not found")
    .then(send_message)
    .run()
)

Step composition with then

result = (
    attempt(load_user, user_id)
    .then(load_orders)
    .then(build_report)
    .run()
)

If any step raises, the chain stops and returns Err(exception).

Side effects with tap, tap_error, and finally_

result = (
    attempt(load_user, user_id)
    .tap(lambda user: logger.info(f"Loaded user {user.id}"))
    .then(load_orders)
    .tap_error(lambda error: logger.warning(f"Operation failed: {error}"))
    .finally_(lambda: logger.info("operation finished"))
    .run()
)

tap observes successful values without changing them.

tap_error observes final errors.

finally_ always runs after success, error, or recovery.

Fallbacks with recover and recover_value

config = (
    attempt(load_config, "config.json")
    .recover(lambda error: {"debug": False})
    .run()
    .unwrap()
)

For a fixed fallback value, use recover_value:

config = (
    attempt(load_config, "config.json")
    .recover_value({"debug": False})
    .run()
    .unwrap()
)

You can limit fallback handling to selected exception types:

number = (
    attempt(int, user_input)
    .recover_value(0, exceptions=(ValueError,))
    .run()
    .unwrap()
)

Async support

Use .arun() when you prefer explicit async execution:

result = await (
    attempt(fetch_user, user_id)
    .retry(times=3, delay=1)
    .timeout(10)
    .then(fetch_orders)
    .arun()
)

Or await the configured attempt directly:

result = await (
    attempt(fetch_user, user_id)
    .retry(times=3, delay=1)
    .timeout(10)
    .then(fetch_orders)
)

You can also pass timeout to execution instead of configuring it fluently:

result = await attempt(fetch_user, user_id).then(fetch_orders).arun(timeout=10)

timeout(...) and arun(timeout=...) are designed for async execution. Synchronous run(timeout=...) is accepted for API symmetry, but returns an error because arbitrary synchronous Python code cannot be safely cancelled without threads or processes.

Reusable policies

from fluentity import policy

network = (
    policy()
    .retry(times=3, delay=1, backoff=2)
    .timeout(10)
    .tap_error(lambda error: logger.warning(f"Network failed: {error}"))
    .recover_value(None)
)

user = await network.arun(fetch_user, user_id)
orders = await network.arun(fetch_orders, user_id)

Policies are useful when many calls should share the same retry, timeout, logging, and fallback behavior.

Explicit try/except/else/finally

try_catch keeps the useful idea of a fluent try block, but returns Result instead of hiding errors.

from fluentity import try_catch

result = (
    try_catch(lambda: int(user_input))
    .except_(lambda error: 0, ValueError)
    .else_(lambda value: logger.info(f"parsed={value}"))
    .finally_(lambda: logger.info("parse attempt finished"))
    .run()
)

number = result.unwrap()

Use attempt(...) for retry/timeout/validation/recovery policies. Use try_catch(...) when you specifically want readable try/except/else/finally semantics.

Async pipelines with chain()

attempt(...) is for reliable operations. chain() is for readable async-first pipelines where the workflow itself matters: steps, delays, waits, and completion/error hooks.

from fluentity import chain

result = await (
    chain()
    .then(lambda value: value + 1)
    .delay(0.1)
    .then(load_user)
    .wait_until(lambda: cache_is_ready(), timeout=5)
    .then(build_response)
    .on_complete(lambda value: logger.info(f"done={value}"))
    .on_error(lambda error: logger.warning(f"failed={error}"))
    .run(41, timeout=10)
)

Compact conditional branches with choose()

from fluentity import choose

message = (
    choose()
    .when(lambda: status == "admin", lambda: "Full access")
    .when(lambda: status == "premium", lambda: "Premium access")
    .otherwise(lambda: "Basic access")
    .run()
    .unwrap()
)

Async predicates and actions are supported through await choose(...).arun() or simply await choose(...).

Safe nested access with get_path

from fluentity import get_path

username = get_path(payload, "user.profile.username", default="anonymous")
first_title = get_path(payload, ["items", 0, "title"], default="untitled")

get_path works with dictionaries, lists/tuples, and ordinary object attributes.

Classic safe getters

The older compact helpers are still available because they are useful in small scripts:

from fluentity import try_get, try_gete, try_geta, try_get_attrs, apply_to_list

username = try_get(lambda: payload["user"]["profile"]["username"], default="anonymous")
value, error = try_gete(lambda: int(user_input), default=0)

Decorators

from fluentity import safe, retryable, timeoutable

@safe
def parse_int(text: str) -> int:
    return int(text)

@retryable(times=3, delay=1)
def load_config() -> dict:
    return read_config_from_disk()

@timeoutable(10)
async def fetch_json(url: str) -> dict:
    return await client.get_json(url)

Decorated functions return Result.

Optional local task manager

The task manager is intentionally optional because most users should not install database-related dependencies unless they need them.

pip install "fluentity[tasks]"
from fluentity import TaskManager

manager = TaskManager.instance("demo")

@manager.task
def heavy_square(value: int) -> int:
    return value * value

This is useful for local bots, prototypes, and automation scripts. It is not a replacement for Celery, RQ, Airflow, or distributed workers.

Project structure

fluentity/
  __init__.py
  attempt.py
  chain.py
  choose.py
  path.py
  policy.py
  result.py
  safe_getter.py
  task_manager_db.py
  try_catch.py
examples/
  async_reliability_usage.py
  reliability_usage.py
  task_manager_usage.py
tests/
  test_core.py
pyproject.toml
README.md

Continuous integration

The repository includes a GitHub Actions workflow at .github/workflows/tests.yml. It runs the test suite on Python 3.10, 3.11, 3.12, and 3.13 for pushes and pull requests to main or master.

Design principles

  • Prefer practical reliability over fluent style for its own sake.
  • Keep the core dependency-free.
  • Make failures explicit with Result.
  • Keep collection processing out of scope.
  • Support async workflows without forcing verbose .arun() everywhere.
  • Keep heavier features, such as the SQLite task manager, optional.

When not to use this library

Use a larger framework if you need distributed workers, scheduled DAGs, complex observability, transactional queues, horizontal scaling, or durable cross-machine orchestration. Fluentity is intentionally small and local-first.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluentity-0.1.0.tar.gz (21.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluentity-0.1.0-py3-none-any.whl (20.0 kB view details)

Uploaded Python 3

File details

Details for the file fluentity-0.1.0.tar.gz.

File metadata

  • Download URL: fluentity-0.1.0.tar.gz
  • Upload date:
  • Size: 21.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for fluentity-0.1.0.tar.gz
Algorithm Hash digest
SHA256 49513be0ba6cea8d06a790a1510d62eada6c778e6dcfdd0f95910e5ca8e2f436
MD5 7015a78fb464c00a98202a3a245dff09
BLAKE2b-256 3b82164584323bed828c8cc34618df4ef5ce014219c4aee716704019e1f35681

See more details on using hashes here.

File details

Details for the file fluentity-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fluentity-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 20.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for fluentity-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b461c18180183f6aac74d8011428f5de073dbd3a1fc47545ff54199601f804c9
MD5 2956d6aea5318f7e23015b97e1c31003
BLAKE2b-256 4277fb1a673b669bc064e01e2928464896bec779ea0e5424b5283a1dab4a6771

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page