Fluent reliability primitives for Python functions: safe attempts, retries, timeouts, fallbacks, and explicit Result values.
Project description
Fluentity
Fluent reliability primitives for Python functions.
Fluentity helps Python code keep moving: safe attempts, retries, timeouts, fallbacks, explicit Result values, and async control chains.
It turns fragile code with try/except blocks, retry loops, result checks, fallbacks, logging hooks, and async timeouts into readable execution chains.
It is not another fluent collection wrapper. Libraries such as flupy and pyfluent-iterables are great when your main problem is transforming iterables with .map(), .filter(), .chunk(), .to_list(), and similar operations.
Fluentity solves a different problem: running unsafe operations safely.
from fluentity import attempt
orders = (
attempt(load_user, user_id)
.retry(times=3, delay=1)
.timeout(10)
.ensure(lambda user: user.is_active, "User is inactive")
.then(load_orders)
.tap(lambda orders: logger.info(f"Loaded {len(orders)} orders"))
.tap_error(lambda error: logger.exception(f"Failed to load orders: {error}"))
.recover_value([])
.run()
.unwrap()
)
For async code, you can skip the explicit .arun() and await the configured attempt directly:
result = await (
attempt(fetch_user, user_id)
.retry(times=3, delay=1)
.timeout(10)
.ensure(lambda user: user.is_active, "Inactive user")
.then(fetch_orders)
.recover_value([])
)
Why Fluentity?
Use Fluentity when an operation can fail, return invalid data, need a retry, require a fallback, or be composed with more steps.
Good fit:
- HTTP/API calls;
- Telegram bots and background handlers;
- parsers and scraping scripts;
- local automation scripts;
- ETL steps;
- LLM calls;
- code where you want explicit
Ok(...)/Err(...)instead of uncontrolled exceptions.
Not the main goal:
- replacing list comprehensions;
- replacing
itertools; - becoming a DataFrame library;
- cloning fluent iterable libraries.
Installation
pip install fluentity
For the optional SQLite task manager:
pip install "fluentity[tasks]"
For development:
git clone https://github.com/megamen32/fluentity.git
cd fluentity
pip install -e .[dev]
pytest
Build and check the package locally before publishing:
python -m build
python -m twine check dist/*
Core API
from fluentity import (
attempt,
policy,
try_catch,
chain,
choose,
get_path,
safe,
retryable,
timeoutable,
Ok,
Err,
Result,
)
attempt(...) builds one reliable operation.
policy(...) builds reusable reliability settings.
try_catch(...) gives you a fluent explicit try/except/else/finally block.
chain() builds a readable async-first pipeline with steps, delays, waits, and hooks.
choose() builds a compact conditional branch tree.
get_path(...) safely reads nested dictionaries, lists, and objects.
Result makes success and failure explicit.
Safe function execution
from fluentity import attempt
result = attempt(int, "123").run()
assert result.unwrap() == 123
If the function raises, the program does not crash:
result = attempt(int, "abc").run()
assert result.is_err
assert result.unwrap_or(0) == 0
Retry
result = (
attempt(fetch_json, "https://example.com/api/user")
.retry(times=3, delay=1, backoff=2)
.run()
)
Retries apply to the initial unsafe call. Later steps are only executed if the call succeeds.
Retry by result with retry_if
Sometimes the function succeeds technically, but returns a temporary bad value such as "pending", None, or an incomplete response.
result = (
attempt(fetch_status)
.retry_if(lambda status: status == "pending", times=5, delay=1)
.run()
)
If every attempt still matches the predicate, the result becomes Err(RetryConditionError(...)).
Validation with ensure and ensure_not_none
result = (
attempt(load_user, user_id)
.ensure(lambda user: user.is_active, "User is inactive")
.ensure(lambda user: user.email, "User has no email")
.run()
)
ensure is not collection filtering. It is a contract check for a successful value.
For the common None case:
result = (
attempt(find_user, user_id)
.ensure_not_none("User not found")
.then(send_message)
.run()
)
Step composition with then
result = (
attempt(load_user, user_id)
.then(load_orders)
.then(build_report)
.run()
)
If any step raises, the chain stops and returns Err(exception).
Side effects with tap, tap_error, and finally_
result = (
attempt(load_user, user_id)
.tap(lambda user: logger.info(f"Loaded user {user.id}"))
.then(load_orders)
.tap_error(lambda error: logger.warning(f"Operation failed: {error}"))
.finally_(lambda: logger.info("operation finished"))
.run()
)
tap observes successful values without changing them.
tap_error observes final errors.
finally_ always runs after success, error, or recovery.
Fallbacks with recover and recover_value
config = (
attempt(load_config, "config.json")
.recover(lambda error: {"debug": False})
.run()
.unwrap()
)
For a fixed fallback value, use recover_value:
config = (
attempt(load_config, "config.json")
.recover_value({"debug": False})
.run()
.unwrap()
)
You can limit fallback handling to selected exception types:
number = (
attempt(int, user_input)
.recover_value(0, exceptions=(ValueError,))
.run()
.unwrap()
)
Async support
Use .arun() when you prefer explicit async execution:
result = await (
attempt(fetch_user, user_id)
.retry(times=3, delay=1)
.timeout(10)
.then(fetch_orders)
.arun()
)
Or await the configured attempt directly:
result = await (
attempt(fetch_user, user_id)
.retry(times=3, delay=1)
.timeout(10)
.then(fetch_orders)
)
You can also pass timeout to execution instead of configuring it fluently:
result = await attempt(fetch_user, user_id).then(fetch_orders).arun(timeout=10)
timeout(...) and arun(timeout=...) are designed for async execution. Synchronous run(timeout=...) is accepted for API symmetry, but returns an error because arbitrary synchronous Python code cannot be safely cancelled without threads or processes.
Reusable policies
from fluentity import policy
network = (
policy()
.retry(times=3, delay=1, backoff=2)
.timeout(10)
.tap_error(lambda error: logger.warning(f"Network failed: {error}"))
.recover_value(None)
)
user = await network.arun(fetch_user, user_id)
orders = await network.arun(fetch_orders, user_id)
Policies are useful when many calls should share the same retry, timeout, logging, and fallback behavior.
Explicit try/except/else/finally
try_catch keeps the useful idea of a fluent try block, but returns Result instead of hiding errors.
from fluentity import try_catch
result = (
try_catch(lambda: int(user_input))
.except_(lambda error: 0, ValueError)
.else_(lambda value: logger.info(f"parsed={value}"))
.finally_(lambda: logger.info("parse attempt finished"))
.run()
)
number = result.unwrap()
Use attempt(...) for retry/timeout/validation/recovery policies. Use try_catch(...) when you specifically want readable try/except/else/finally semantics.
Async pipelines with chain()
attempt(...) is for reliable operations. chain() is for readable async-first pipelines where the workflow itself matters: steps, delays, waits, and completion/error hooks.
from fluentity import chain
result = await (
chain()
.then(lambda value: value + 1)
.delay(0.1)
.then(load_user)
.wait_until(lambda: cache_is_ready(), timeout=5)
.then(build_response)
.on_complete(lambda value: logger.info(f"done={value}"))
.on_error(lambda error: logger.warning(f"failed={error}"))
.run(41, timeout=10)
)
Compact conditional branches with choose()
from fluentity import choose
message = (
choose()
.when(lambda: status == "admin", lambda: "Full access")
.when(lambda: status == "premium", lambda: "Premium access")
.otherwise(lambda: "Basic access")
.run()
.unwrap()
)
Async predicates and actions are supported through await choose(...).arun() or simply await choose(...).
Safe nested access with get_path
from fluentity import get_path
username = get_path(payload, "user.profile.username", default="anonymous")
first_title = get_path(payload, ["items", 0, "title"], default="untitled")
get_path works with dictionaries, lists/tuples, and ordinary object attributes.
Classic safe getters
The older compact helpers are still available because they are useful in small scripts:
from fluentity import try_get, try_gete, try_geta, try_get_attrs, apply_to_list
username = try_get(lambda: payload["user"]["profile"]["username"], default="anonymous")
value, error = try_gete(lambda: int(user_input), default=0)
Decorators
from fluentity import safe, retryable, timeoutable
@safe
def parse_int(text: str) -> int:
return int(text)
@retryable(times=3, delay=1)
def load_config() -> dict:
return read_config_from_disk()
@timeoutable(10)
async def fetch_json(url: str) -> dict:
return await client.get_json(url)
Decorated functions return Result.
Optional local task manager
The task manager is intentionally optional because most users should not install database-related dependencies unless they need them.
pip install "fluentity[tasks]"
from fluentity import TaskManager
manager = TaskManager.instance("demo")
@manager.task
def heavy_square(value: int) -> int:
return value * value
This is useful for local bots, prototypes, and automation scripts. It is not a replacement for Celery, RQ, Airflow, or distributed workers.
Project structure
fluentity/
__init__.py
attempt.py
chain.py
choose.py
path.py
policy.py
result.py
safe_getter.py
task_manager_db.py
try_catch.py
examples/
async_reliability_usage.py
reliability_usage.py
task_manager_usage.py
tests/
test_core.py
pyproject.toml
README.md
Continuous integration
The repository includes a GitHub Actions workflow at .github/workflows/tests.yml. It runs the test suite on Python 3.10, 3.11, 3.12, and 3.13 for pushes and pull requests to main or master.
Design principles
- Prefer practical reliability over fluent style for its own sake.
- Keep the core dependency-free.
- Make failures explicit with
Result. - Keep collection processing out of scope.
- Support async workflows without forcing verbose
.arun()everywhere. - Keep heavier features, such as the SQLite task manager, optional.
When not to use this library
Use a larger framework if you need distributed workers, scheduled DAGs, complex observability, transactional queues, horizontal scaling, or durable cross-machine orchestration. Fluentity is intentionally small and local-first.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fluentity-0.1.0.tar.gz.
File metadata
- Download URL: fluentity-0.1.0.tar.gz
- Upload date:
- Size: 21.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
49513be0ba6cea8d06a790a1510d62eada6c778e6dcfdd0f95910e5ca8e2f436
|
|
| MD5 |
7015a78fb464c00a98202a3a245dff09
|
|
| BLAKE2b-256 |
3b82164584323bed828c8cc34618df4ef5ce014219c4aee716704019e1f35681
|
File details
Details for the file fluentity-0.1.0-py3-none-any.whl.
File metadata
- Download URL: fluentity-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b461c18180183f6aac74d8011428f5de073dbd3a1fc47545ff54199601f804c9
|
|
| MD5 |
2956d6aea5318f7e23015b97e1c31003
|
|
| BLAKE2b-256 |
4277fb1a673b669bc064e01e2928464896bec779ea0e5424b5283a1dab4a6771
|