Production-ready toolkit for exception handling, structured logging, database ops, HTTP responses, retry, circuit breaking, and dict comparison.
Project description
aniket_tools
A reusable Python library for:
- building consistent API responses (
create_response,value_correction) - translating any exception into a safe, structured JSON payload (
ErrorHandler,ApiError) - structured logging with levels, context, redaction, and formatting (
logs,get_logger) - timing helpers for function/block duration (
log_timing,log_timing_block) - high-throughput SQLAlchemy batch inserts with conflict handling and retries (
smart_insert,smart_insert_sync) - high-throughput SQLAlchemy batch updates with key-based WHERE clauses and retries (
smart_update,smart_update_sync) - per-field model-validated dict coercion for Pydantic and SQLAlchemy (
coerce_model_data) - deep value-by-value dict comparison with type-aware comparators (
compare_dicts)
Install
pip install aniket_tools
Import
from aniket_tools import (
# HTTP responses
ApiError,
ErrorHandler,
ExceptionHandler,
PaginationRes,
create_response,
explain_error,
get_status_code,
handle_exception,
unified_exception_handler,
value_correction,
# Logging & timing
configure_logging,
reset_logging_config,
get_logger,
log_timing,
log_timing_block,
logs,
SUPPORTED_LOG_TYPES,
# Database
smart_insert,
smart_insert_sync,
smart_update,
smart_update_sync,
smart_upsert,
smart_upsert_sync,
InsertMetrics,
UpdateMetrics,
UpsertMetrics,
# Serializer
serialize_data,
register_serializer,
clear_serializers,
# Reliability
retry,
with_retry,
async_with_retry,
RetryConfig,
CircuitBreaker,
CircuitBreakerOpen,
# Model coercion
coerce_model_data,
ModelCoercionResult,
to_dict,
# Dict comparison
compare_dicts,
compare_values,
align_types_by_expected,
coerce_actual_to_expected_type,
DictDiffResult,
register_comparator,
clear_comparators,
)
Quick Start — FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from aniket_tools import create_response, unified_exception_handler
app = FastAPI()
app.add_exception_handler(HTTPException, unified_exception_handler)
app.add_exception_handler(Exception, unified_exception_handler)
app.add_exception_handler(RequestValidationError, unified_exception_handler)
@app.get("/health")
async def health():
return create_response(200, data={"status": "ok"})
Quick Start — Standalone (no FastAPI)
from aniket_tools import logs, get_logger
# Structured logging in any Python script or service
logs("Starting job", type="info")
logs("DB connected", type="success")
logs("Low disk space", type="warning", context={"free_gb": 1.2})
# JSON logger for production — outputs one JSON line per call
lg = get_logger("my_service", json=True)
logs("Request received", context={"user_id": 42, "path": "/api/users"}, logger=lg)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from aniket_tools import smart_insert_sync, smart_upsert_sync
engine = create_engine("postgresql+psycopg2://user:pass@localhost/mydb")
Session = sessionmaker(bind=engine)
records = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
# Batch insert — skip duplicates by id
metrics = smart_insert_sync(Session, User, records, conflict_columns=["id"])
print(metrics["inserted_count"], metrics["skipped_count"])
# Batch upsert — update name on conflict
metrics = smart_upsert_sync(Session, User, records, conflict_columns=["id"])
print(metrics["upserted_count"])
from aniket_tools import retry, CircuitBreaker, CircuitBreakerOpen
# Retry any flaky call
@retry(max_retries=3, on=(IOError,), base_delay=0.1)
def fetch_report(report_id: int):
return external_api.get(f"/reports/{report_id}")
# Protect an upstream dependency with a circuit breaker
cb = CircuitBreaker(threshold=5, cooldown=30.0)
try:
data = cb.call(lambda: fetch_report(report_id=42))
except CircuitBreakerOpen:
data = {"error": "service_unavailable"}
create_response
Builds the standard success or error payload and returns a JSONResponse (or a plain dict with as_json_response=False).
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
response_code |
int |
required | HTTP status code |
data |
Any |
None |
Response body data |
schema |
Pydantic model class | None |
Validates data before returning |
pagination |
dict | PaginationRes |
None |
Pagination metadata |
error_message |
str |
None |
Error description |
error_code |
str |
None |
Machine-readable error code |
details |
list[dict] |
None |
Field-level error details |
meta |
dict | model |
None |
Request metadata (request_id, trace_id, …) |
as_json_response |
bool |
True |
Return JSONResponse vs plain dict |
Success responses
# Minimal
create_response(200, data={"status": "ok"})
# With meta
create_response(200, data={"name": "Aniket"}, meta={"request_id": "req-1", "trace_id": "t-2"})
# With pagination — dict form
create_response(200, data=rows, pagination={"page": 1, "rows": 25, "total_rows": 250})
# With pagination — typed model
create_response(200, data=rows, pagination=PaginationRes(page=1, rows=25, total_rows=250))
# Schema validation — Pydantic model class validates each item
create_response(200, data=raw_rows, schema=UserSchema)
# No content
create_response(204) # returns FastAPI Response(status_code=204) — no body
# Return plain dict instead of JSONResponse (useful in tests or non-FastAPI code)
payload = create_response(200, data={"ok": True}, as_json_response=False)
Error responses
# Simple 404
create_response(404, error_message="Report not found.", error_code="report_not_found")
# 422 with field-level details
create_response(
422,
error_message="One or more fields are invalid.",
error_code="validation_error",
details=[
{"field": "email", "message": "field required"},
{"field": "age", "message": "must be a positive integer"},
],
)
Success response shape
{
"success": true,
"response_code": 200,
"meta": {"request_id": "req-1"},
"data": {"name": "Aniket"},
"pagination": {"page": 1, "rows": 25, "total_rows": 250}
}
Error response shape
{
"success": false,
"response_code": 422,
"error_message": "One or more fields are invalid.",
"meta": {},
"error": {
"code": "validation_error",
"message": "One or more fields are invalid.",
"details": [{"field": "email", "message": "field required"}]
},
"errors": [{"field": "email", "message": "field required"}]
}
error_messageanderror.messageare aliases.errorsanderror.detailsare aliases. Both exist for backward compatibility.
PaginationRes
Typed dataclass for pagination metadata.
from aniket_tools import PaginationRes
p = PaginationRes(page=1, rows=25, total_rows=250)
create_response(200, data=rows, pagination=p)
Validation rules inside create_response:
page≥ 1rows≥ 0total_rows≥ 0- All three are required integers
- Extra keys on the dict form are preserved
If any rule fails, create_response returns a 422 validation error instead of a broken payload.
value_correction
Recursively normalizes Python values into JSON-safe types. Called automatically by create_response on all data.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
data |
Any |
required | Value to normalize |
mode |
str |
"response" |
"response" enables all conversions; "storage" disables most |
float_precision |
int | None |
2 in response mode |
Decimal places to round floats to |
datetime_format |
str |
"%Y-%m-%d %H:%M:%S" |
Output format for datetime values |
date_format |
str |
"%Y-%m-%d" |
Output format for date values |
strip_strings |
bool |
True |
Strip whitespace from strings |
convert_decimal |
bool | None |
None |
Override Decimal conversion (None follows mode) |
convert_datetime |
bool | None |
None |
Override datetime conversion (None follows mode) |
convert_date |
bool | None |
None |
Override date conversion (None follows mode) |
convert_timedelta |
bool | None |
None |
Override timedelta conversion (None follows mode) |
nan_to_none |
bool | None |
None |
Convert NaN/Inf to None (None follows mode) |
convert_uuid |
bool | None |
None |
Override UUID conversion (None follows mode) |
convert_enum |
bool | None |
None |
Override Enum conversion (None follows mode) |
convert_bytes |
bool | None |
None |
Override bytes decoding (None follows mode) |
convert_objects |
bool | None |
None |
Override dataclass/pydantic object conversion (None follows mode) |
What it converts by default:
| Input type | Output |
|---|---|
str |
stripped string |
bytes |
UTF-8 decoded string |
Decimal |
float (rounded to 2 dp) |
datetime |
"2024-01-15 09:30:00" |
date |
"2024-01-15" |
timedelta |
"0:01:30" |
float NaN / Inf |
None |
float |
rounded to 2 dp |
UUID |
"550e8400-..." |
Enum |
enum value (recursed) |
dataclass |
dict (recursed) |
| Pydantic model | dict (recursed) |
numpy.integer |
int |
numpy.floating |
float (recursed) |
numpy.ndarray |
list (recursed) |
dict |
keys and values recursed |
list |
recursed to list |
tuple |
recursed to tuple |
set / frozenset |
recursed to sorted list |
None, bool, int |
unchanged |
from decimal import Decimal
from datetime import datetime
from uuid import UUID
from aniket_tools import value_correction
value_correction({
"amount": Decimal("10.567"),
"created": datetime(2024, 1, 15, 9, 30),
"name": " Aniket ",
"rate": float("nan"),
"id": UUID("550e8400-e29b-41d4-a716-446655440000"),
})
# → {
# "amount": 10.57,
# "created": "2024-01-15 09:30:00",
# "name": "Aniket",
# "rate": None,
# "id": "550e8400-e29b-41d4-a716-446655440000",
# }
# Custom float precision
value_correction(3.14159, float_precision=4) # → 3.1416
# Storage mode — most conversions disabled
value_correction(Decimal("10.5"), mode="storage") # → Decimal("10.5") unchanged
logs
Unified logging function. Handles plain messages, structured context, redaction, SQL queries, ASCII tables, JSON pretty-print, and file output.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
msg |
object |
"" |
Message, data structure, or SQL statement |
type |
str |
"info" |
Log level / mode (see table below) |
file_name |
str | Path |
None |
Also write to this file (auto-creates dirs) |
logger |
Logger |
None |
Use a specific logger instead of the default |
dialect |
object |
None |
SQLAlchemy dialect for type="query" |
context |
dict |
None |
Key-value fields appended to the log line |
exc_info |
bool | Exception |
False |
Attach exception traceback |
redact |
list[str] |
None |
Context keys to mask as *** |
sample_rate |
float |
None |
0.0-1.0 — emit about this fraction of calls |
indent |
int |
4 |
JSON indent size for type="json" |
log_format |
str |
'%(asctime)s | %(name)s | %(levelname)s | %(message)s' |
Text pattern used when logs() creates the logger |
date_format |
str | None |
None |
Timestamp pattern passed to logging.Formatter |
Log types / levels
type |
Level | Color | Use for |
|---|---|---|---|
"trace" |
5 | dim | Very fine-grained internal tracing |
"debug" |
10 | cyan | Developer debug info |
"info" |
20 | default | General status messages |
"success" |
25 | green | Positive confirmations |
"warning" |
30 | yellow | Non-critical concerns |
"error" |
40 | red | Errors and failures |
"critical" |
50 | bold red | System-level failures |
"audit" |
45 | magenta | Compliance / security events |
"exception" |
40 | red | Same as error + auto-attaches traceback |
"query" |
20 | default | SQL statements (auto-compiles with literals) |
"table" |
20 | default | list[dict] → ASCII table |
"divider" |
20 | default | Section separator line |
"json" |
20 | default | Pretty-prints any JSON-serializable object |
Examples
from aniket_tools import logs, get_logger
# Standard levels
logs("Server started")
logs("Connecting to DB", type="debug")
logs("Disk above 80%", type="warning")
logs("Save failed", type="error")
logs("Out of memory", type="critical")
# Custom levels
logs("Entering resolve_user", type="trace")
logs("Payment processed", type="success")
logs("User admin deleted record #42", type="audit")
# Exception with traceback
try:
raise ValueError("bad input")
except Exception as e:
logs("Caught error", type="error", exc_info=e)
# Or the shorthand
logs("DB failed", type="exception") # auto-attaches current exception
# Context fields
logs("User logged in", context={"user_id": 42, "ip": "10.0.0.1"})
# → ... | INFO | User logged in | user_id=42 ip=10.0.0.1
# Redaction
logs("API call", context={"api_key": "secret123", "endpoint": "/v1"}, redact=["api_key"])
# → ... | INFO | API call | api_key=*** endpoint=/v1
# SQL query (plain string)
logs("SELECT * FROM users WHERE id = 1", type="query")
# SQL query (SQLAlchemy statement with bound params)
from sqlalchemy import select
stmt = select(User).where(User.id == 7)
logs(stmt, type="query", dialect="postgresql")
logs(stmt, type="query", dialect=session) # session / engine also accepted
# ASCII table
logs([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], type="table")
# Divider
logs("Auth Section", type="divider") # ──── Auth Section ─────────────────────
# JSON pretty-print
logs({"user": "alice", "roles": ["admin", "editor"]}, type="json")
# Write to file (also still logs to console)
logs("Report generated", file_name="logs/app.log")
# Sampling — only ~10% of calls produce output
logs("high-frequency event", sample_rate=0.1)
# One-off custom text pattern
logs(
"Report generated",
file_name="logs/app.log",
log_format="%(asctime)s | %(filename)s | %(funcName)s | %(levelname)s | %(message)s",
date_format="%Y-%m-%d %H:%M:%S",
)
get_logger
Returns a configured logging.Logger.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
name |
str |
"aniket" |
Logger name |
file_name |
str | Path |
None |
Log file path |
json |
bool |
False |
Emit structured JSON lines instead of plain text |
rotate |
bool |
False |
Rotate at 10 MB, keep 5 backups |
sample_rate |
float |
None |
Emit about this fraction of all logs from this logger |
log_format |
str |
'%(asctime)s | %(name)s | %(levelname)s | %(message)s' |
Custom text log pattern |
date_format |
str | None |
None |
Custom timestamp format passed to logging.Formatter |
from aniket_tools import get_logger, logs
# Plain logger
lg = get_logger("my_app")
logs("started", logger=lg)
# File + rotation
lg = get_logger("my_app", file_name="logs/app.log", rotate=True)
logs("started", logger=lg)
# JSON output — ready for Datadog, Loki, ELK
lg = get_logger(json=True)
logs("User created", context={"user_id": 99, "env": "prod"}, logger=lg)
# → {"time": "...", "name": "aniket", "level": "INFO", "msg": "User created", "user_id": 99, "env": "prod"}
# JSON + redaction
logs("Login", context={"user": "admin", "password": "hunter2"}, redact=["password"], logger=lg)
# → {"time": "...", ..., "user": "admin", "password": "***"}
# 50% sampling on the logger level
lg = get_logger(sample_rate=0.5)
logs("background event", logger=lg)
# Custom text pattern
lg = get_logger(
"my_app",
file_name="logs/app.log",
log_format="%(asctime)s | %(filename)s | %(funcName)s | %(lineno)d | %(levelname)s | %(message)s",
date_format="%Y-%m-%d %H:%M:%S",
)
logs("started", logger=lg)
# → 2026-04-28 16:35:00 | service.py | run | 42 | INFO | started
log_format uses standard Python logging.Formatter fields. Useful built-ins include:
| Field | Meaning |
|---|---|
%(asctime)s |
Formatted timestamp |
%(name)s |
Logger name |
%(levelname)s |
Log level |
%(filename)s |
Source file name |
%(pathname)s |
Full source file path |
%(module)s |
Source module |
%(funcName)s |
Calling function name |
%(lineno)d |
Calling line number |
%(process)d |
Process ID |
%(thread)d |
Thread ID |
%(message)s |
The message passed to logs() |
Formatter fields such as time, file, function, line number, level, and logger name are configured once. File, function, and line number point to the user code that called logs(...). Runtime values such as request method, URL path, status code, username, tenant, or client IP still need to be included in the log message or context from middleware.
configure_logging
Sets process-wide logging defaults applied to all subsequent get_logger() and logs() calls. Any setting not passed is left unchanged. Existing managed loggers are reconfigured immediately.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
file_name |
str | Path | None |
None |
Default log file path for all loggers |
json |
bool | None |
None |
Default JSON output mode |
rotate |
bool | None |
None |
Default rotating-file mode (10 MB, 5 backups) |
sample_rate |
float | None |
None |
Default sampling rate (0.0–1.0) |
colored |
bool | None |
None |
Default colorized console output |
debug_log |
bool | None |
None |
Default enable/disable switch for logs() |
log_format |
str | None |
None |
Default text formatter string |
date_format |
str | None |
None |
Default datetime formatter string |
from aniket_tools import configure_logging, reset_logging_config, logs, get_logger
# Disable color globally (useful in CI or when piping to files)
configure_logging(colored=False)
# Switch all loggers to JSON output (Datadog, Loki, ELK)
configure_logging(json=True)
# Write all logs to a rotating file by default
configure_logging(file_name="logs/app.log", rotate=True)
# Apply a uniform format across all loggers
configure_logging(
log_format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
date_format="%Y-%m-%d %H:%M:%S",
)
# High-traffic service: JSON + 10% sampling
configure_logging(json=True, sample_rate=0.1)
# All logs() and get_logger() calls after this inherit the updated defaults
logs("server started")
reset_logging_config
Restores all module-wide logging defaults to their factory values. Intended for test teardown after configure_logging changes defaults — prevents settings from leaking between tests.
from aniket_tools import configure_logging, reset_logging_config
def teardown():
reset_logging_config()
log_timing and log_timing_block
Timing is intentionally separated from logs(...).
log_timing(...): decorator for function execution timelog_timing_block(...): context manager for inline block/query timing
Parameters (both APIs):
| Param | Type | Default | Purpose |
|---|---|---|---|
label |
str | None (log_timing) / str (log_timing_block) |
function name / required | Human-readable timing label |
file_name |
str | Path |
None |
Also write timing log to this file |
colored |
bool | None |
None |
Override console coloring for the timing log |
log_format |
str | None |
None |
Override message pattern for timing log |
date_format |
str | None |
None |
Override timestamp format for timing log |
debug_log |
bool | None |
None |
False suppresses timing output |
from aniket_tools import log_timing, log_timing_block
# Decorator: function duration
@log_timing("fetch users")
def fetch_users():
...
# Context manager: inline block/query duration
def list_users():
with log_timing_block("simple query"):
...
Output format:
TIMING[fetch users] took 0.0421s
ApiError
Raise a controlled API error from anywhere in your code.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
message |
str |
required | User-facing error text |
status_code |
int |
400 |
HTTP status code |
code |
str |
"api_error" |
Machine-readable error code |
details |
list[dict] |
None |
Field-level details |
log_message |
str |
None |
Extra developer context (logged, not returned) |
from aniket_tools import ApiError
# Simple
raise ApiError("Report not found.", status_code=404, code="report_not_found")
# With field-level details
raise ApiError(
"Validation failed.",
status_code=422,
code="validation_error",
details=[{"field": "email", "message": "already registered"}],
)
# With a private log message (not sent to the client)
raise ApiError(
"Something went wrong.",
status_code=500,
code="internal_error",
log_message=f"DB query failed on table=billing sql={raw_sql}",
)
When unified_exception_handler catches an ApiError, the message, status_code, code, and details are returned exactly as provided. The log_message is written to the error log but never included in the response.
ErrorHandler
Core exception classifier. Understands 60+ exception types across all major Python libraries.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
logger_name |
str |
"aniket_tools.errors" |
Name of the logger used for log_exception |
logger |
Logger |
None |
Preconfigured logger to use for exception logs |
file_name |
str | Path |
None |
Error log file path when creating the logger |
json |
bool |
False |
Emit JSON error log lines |
rotate |
bool |
False |
Rotate error log file at 10 MB, keep 5 backups |
sample_rate |
float |
None |
Emit about this fraction of error logs |
log_format |
str |
default logger pattern | Custom text pattern for exception logs |
date_format |
str | None |
None |
Custom timestamp format for exception logs |
use_default_message_for_long_errors |
bool |
True |
Replace long raw messages with safe defaults |
from aniket_tools import ErrorHandler
handler = ErrorHandler()
# Classify any exception into a structured ErrorInfo
info = handler.describe(some_exception)
print(info.status_code) # e.g. 422
print(info.code) # e.g. "duplicate_resource"
print(info.message) # e.g. "A record with this email already exists."
print(info.retryable) # True / False / None
# Build the full JSON payload
payload = handler.build_payload(some_exception, meta={"trace_id": "t-1"})
# Log the raw exception and return JSONResponse
response = handler.handle_exception(some_exception, request=request)
# Log only (no response)
handler.log_exception(some_exception, request=request)
# Configured exception log format
handler = ErrorHandler(
file_name="logs/errors.log",
log_format="%(asctime)s | %(filename)s | %(funcName)s | %(levelname)s | %(message)s",
date_format="%Y-%m-%d %H:%M:%S",
)
handler.log_exception(ValueError("bad input"))
# → 2026-04-28 16:35:00 | service.py | create_report | ERROR | bad input
log_exception() writes only str(exc) by default. For ApiError, it writes log_message when provided. It does not attach a traceback unless you call logs(..., type="exception") yourself.
Exception error response shape
{
"success": false,
"response_code": 409,
"error_message": "A record with this email already exists.",
"error_type": "IntegrityError",
"meta": {"request_id": "req-1", "path": "/users"},
"error": {
"code": "duplicate_resource",
"type": "IntegrityError",
"message": "A record with this email already exists.",
"retryable": false,
"details": [
{"type": "duplicate_resource", "field": "email", "value": "a@b.com", "constraint": "users_email_key"}
]
},
"errors": [
{"type": "duplicate_resource", "field": "email", "value": "a@b.com", "constraint": "users_email_key"}
]
}
retryable field:
true— client should retry (timeouts, deadlocks, transient unavailability, cache/queue conflicts)false— retrying will not help (duplicate key, bad input, auth failure, SSL error)- absent — not determined for this error type
Exception families covered
| Family | Libraries | Example codes |
|---|---|---|
| HTTP / Validation | FastAPI, Starlette, Pydantic | validation_error, http_404 |
| Database | SQLAlchemy, psycopg2, psycopg3, asyncpg, MySQL Connector, PyMySQL, MySQLdb, sqlite3, PyMongo | duplicate_resource, invalid_reference, database_timeout, database_unavailable, database_retryable_conflict |
| Upstream HTTP | requests, httpx, aiohttp, urllib3 | upstream_timeout, upstream_unavailable, upstream_ssl_error, upstream_bad_response |
| Auth | PyJWT | token_expired, invalid_token, invalid_token_claim |
| Cloud | botocore / boto3 | cloud_timeout, cloud_not_found, cloud_rate_limited, cloud_forbidden |
| Cache | Redis | cache_timeout, cache_conflict, cache_unavailable, cache_auth_failed |
| Queue / Tasks | kafka-python, confluent_kafka, Celery, Kombu | queue_timeout, queue_unavailable, task_timeout, invalid_queue_payload |
| Data tools | Pandas, NumPy, PyArrow, Polars, SciPy | invalid_data, data_backend_unavailable |
| Python builtins | stdlib | invalid_json, invalid_yaml, resource_not_found, bad_request, undefined_reference, internal_error |
Common error codes and status codes
| Code | Status | Retryable | Cause |
|---|---|---|---|
duplicate_resource |
409 | false | Unique constraint violation |
invalid_reference |
422 | false | Foreign key violation |
missing_required_field |
422 | false | NOT NULL violation |
constraint_violation |
422 | false | CHECK constraint |
database_retryable_conflict |
409 | true | Deadlock / serialization failure |
database_timeout |
504 | true | Statement / network timeout |
database_unavailable |
503 | true | Cannot connect to DB server |
database_programming_error |
500 | false | Undefined table / SQL syntax bug |
validation_error |
422 | false | Request field validation |
upstream_timeout |
504 | true | HTTP client timeout |
upstream_unavailable |
503 | true | Cannot reach upstream service |
upstream_ssl_error |
502 | false | TLS / certificate failure |
token_expired |
401 | false | JWT expired |
invalid_token |
401 | false | JWT invalid signature / decode |
cloud_rate_limited |
429 | true | Cloud SDK throttle |
cache_timeout |
504 | true | Redis timeout |
cache_conflict |
409 | true | Redis WATCH / lock conflict |
queue_timeout |
504 | true | Kafka / Kombu timeout |
task_timeout |
504 | false | Celery time limit hit |
invalid_json |
400 | false | Malformed JSON body |
bad_request |
400 | false | ValueError, TypeError, etc. |
undefined_reference |
500 | false | NameError / UnboundLocalError in server code |
internal_error |
500 | false | Uncaught programming bug |
unified_exception_handler
FastAPI exception handler. Logs the exception text and returns the standard error JSON.
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from aniket_tools import unified_exception_handler
app = FastAPI()
app.add_exception_handler(HTTPException, unified_exception_handler)
app.add_exception_handler(Exception, unified_exception_handler)
app.add_exception_handler(RequestValidationError, unified_exception_handler)
Use a configured ErrorHandler instance when exception logs need a custom file, rotation, date format, or log pattern:
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from aniket_tools import ErrorHandler
app = FastAPI()
error_handler = ErrorHandler(
file_name="logs/errors.log",
rotate=True,
log_format="%(asctime)s | %(filename)s | %(funcName)s | %(levelname)s | %(message)s",
date_format="%Y-%m-%d %H:%M:%S",
)
app.add_exception_handler(HTTPException, error_handler.unified_exception_handler)
app.add_exception_handler(Exception, error_handler.unified_exception_handler)
app.add_exception_handler(RequestValidationError, error_handler.unified_exception_handler)
ExceptionHandler
Route-level helper. Converts any exception into a FastAPI HTTPException so FastAPI's own handler picks it up.
from aniket_tools import ExceptionHandler
try:
result = db.query(...)
except Exception as exc:
ExceptionHandler(exc)
handle_exception
Returns the standard error payload directly (as JSONResponse or dict).
from aniket_tools import handle_exception
response = handle_exception(ValueError("bad id"))
payload = handle_exception(ValueError("bad id"), as_json_response=False) # plain dict
# With request context (extracts request_id and path automatically)
response = handle_exception(exc, request=request, meta={"trace_id": "t-1"})
explain_error / get_status_code
Quick one-liners when you only need the message or the status code.
from aniket_tools import explain_error, get_status_code
msg = explain_error(ValueError("bad input")) # "The request data is invalid."
status = get_status_code(ValueError("bad input")) # 400
Standard Response Shapes
Success with pagination
{
"success": true,
"response_code": 200,
"meta": {"request_id": "req-1"},
"data": [{"id": 1, "name": "Alice"}],
"pagination": {"page": 1, "rows": 25, "total_rows": 250}
}
Validation error (422)
{
"success": false,
"response_code": 422,
"error_message": "One or more fields are invalid.",
"error_type": "RequestValidationError",
"meta": {"request_id": "req-1", "path": "/users"},
"error": {
"code": "validation_error",
"type": "RequestValidationError",
"message": "One or more fields are invalid.",
"details": [
{"type": "missing", "field": "email", "message": "field required", "source": "body"},
{"type": "missing", "field": "page", "message": "field required", "source": "query"}
]
},
"errors": [
{"type": "missing", "field": "email", "message": "field required", "source": "body"}
]
}
Retryable error (deadlock / timeout)
{
"success": false,
"response_code": 409,
"error_message": "The database could not complete the operation because of a temporary concurrency conflict.",
"error_type": "OperationalError",
"meta": {},
"error": {
"code": "database_retryable_conflict",
"type": "OperationalError",
"message": "The database could not complete the operation because of a temporary concurrency conflict.",
"retryable": true,
"details": [{"type": "database_retryable_conflict", "message": "...", "retryable": true}]
}
}
smart_insert and smart_insert_sync
High-throughput SQLAlchemy batch inserts with dialect-aware conflict handling, retries, and circuit breaking.
smart_insert(...): async, usesasyncio+ thread-pool workerssmart_insert_sync(...): synchronous, usesThreadPoolExecutordirectly
Parameters (both APIs):
| Param | Type | Default | Purpose |
|---|---|---|---|
session_factory |
any | required | Synchronous or async sessionmaker bound to the target database |
model |
SA mapped class | required | Target table model class |
records |
list[dict] |
required | Rows to insert; plain dicts only |
batch_size |
int |
1000 |
Rows per batch; auto-capped for PostgreSQL bind-parameter limits |
num_workers |
int |
1 |
Concurrent DB worker threads; keep <= pool_size - 2 |
queue_maxsize |
int |
0 |
Backpressure queue limit; 0 sets it to num_workers × 3 automatically |
max_retries |
int |
2 |
Retry attempts for transient errors before binary-splitting the batch |
conflict_columns |
list[str] | None |
None |
Column names for native conflict detection (ON CONFLICT DO NOTHING); None disables conflict handling |
batch_timeout |
float |
30.0 |
Seconds before a batch is considered hung |
circuit_breaker_threshold |
int |
10 |
Consecutive failures required to open the circuit breaker |
circuit_breaker_cooldown |
float |
30.0 |
Seconds the circuit breaker waits before probing for recovery |
log_failed_records |
bool |
True |
Log each permanently failed record |
max_logged_failures |
int |
100 |
Maximum failure messages to emit; additional failures are summarised |
debug_log |
bool |
True |
False suppresses all insert log output |
Returns: InsertMetrics dict.
| Key | Type | Meaning |
|---|---|---|
total |
int |
Total records submitted |
inserted_count |
int |
Rows written |
skipped_count |
int |
Rows skipped due to conflict resolution |
failed_count |
int |
Rows permanently rejected |
retries |
int |
Total retry attempts |
batches |
int |
Batches committed successfully |
total_insert_time_sec |
float |
Wall-clock seconds |
inserted_data_size_bytes |
int |
Estimated byte size of inserted rows |
from aniket_tools import smart_insert_sync, smart_insert
# Synchronous
metrics = smart_insert_sync(
session_factory,
MyModel,
records,
batch_size=500,
conflict_columns=["id"],
)
print(metrics["inserted_count"], metrics["skipped_count"])
# Async
metrics = await smart_insert(
async_session_factory,
MyModel,
records,
batch_size=500,
conflict_columns=["id"],
)
smart_update and smart_update_sync
High-throughput SQLAlchemy batch updates with key-based WHERE clauses, retries, and circuit breaking.
smart_update(...): asyncsmart_update_sync(...): synchronous
Parameters (both APIs):
| Param | Type | Default | Purpose |
|---|---|---|---|
session_factory |
any | required | Synchronous or async sessionmaker bound to the target database |
model |
SA mapped class | required | Target table model class |
records |
list[dict] |
required | Rows to update; each must contain all conflict_columns plus at least one other field |
conflict_columns |
list[str] |
required | Column names forming the WHERE clause |
batch_size |
int |
1000 |
Rows per batch |
num_workers |
int |
1 |
Concurrent DB worker threads |
queue_maxsize |
int |
0 |
Backpressure queue limit; 0 sets it to num_workers × 3 |
max_retries |
int |
2 |
Retry attempts for transient errors before binary-splitting |
batch_timeout |
float |
30.0 |
Seconds before a batch is considered hung |
circuit_breaker_threshold |
int |
10 |
Consecutive failures to open circuit breaker |
circuit_breaker_cooldown |
float |
30.0 |
Seconds the circuit breaker waits before recovery probe |
log_failed_records |
bool |
True |
Log each permanently failed record |
max_logged_failures |
int |
100 |
Maximum failure messages to emit |
debug_log |
bool |
True |
False suppresses all update log output |
Returns: UpdateMetrics dict.
| Key | Type | Meaning |
|---|---|---|
total |
int |
Total records submitted |
updated_count |
int |
Rows matched and updated |
failed_count |
int |
Rows permanently rejected |
retries |
int |
Total retry attempts |
batches |
int |
Batches committed successfully |
total_update_time_sec |
float |
Wall-clock seconds |
updated_data_size_bytes |
int |
Estimated byte size sent to the database |
from aniket_tools import smart_update_sync, smart_update
# Synchronous
metrics = smart_update_sync(
session_factory,
MyModel,
records,
conflict_columns=["id"],
batch_size=500,
)
print(metrics["updated_count"], metrics["failed_count"])
# Async
metrics = await smart_update(
async_session_factory,
MyModel,
records,
conflict_columns=["id"],
)
smart_upsert and smart_upsert_sync
High-throughput SQLAlchemy batch upserts with dialect-aware conflict resolution, retries, and circuit breaking. On conflict, updates all non-conflict columns (or a custom set).
Supported dialects: PostgreSQL, MySQL, MariaDB, SQLite.
smart_upsert(...): async, usesasyncio+ async session factorysmart_upsert_sync(...): synchronous, usesThreadPoolExecutor
Parameters (both APIs):
| Param | Type | Default | Purpose |
|---|---|---|---|
session_factory |
any | required | Sync or async sessionmaker bound to the target database |
model |
SA mapped class | required | Target table model class |
records |
list[dict] |
required | Rows to upsert; each must contain all conflict_columns |
conflict_columns |
list[str] |
required | Column names identifying the unique constraint |
update_columns |
list[str] | None |
None |
Columns to update on conflict; None = all non-conflict columns from first record |
batch_size |
int |
1000 |
Rows per batch; auto-capped for PostgreSQL bind-parameter limits |
num_workers |
int |
1 |
Concurrent DB worker threads; keep <= pool_size - 2 |
queue_maxsize |
int |
0 |
Backpressure queue limit; 0 sets it to max(num_workers × 3, 10) |
max_retries |
int |
2 |
Retry attempts for transient errors before failing the batch |
batch_timeout |
float |
30.0 |
Seconds before a batch is considered hung |
circuit_breaker_threshold |
int |
10 |
Consecutive failures required to open the circuit breaker |
circuit_breaker_cooldown |
float |
30.0 |
Seconds the circuit breaker waits before probing for recovery |
log_failed_records |
bool |
True |
Log each permanently failed batch |
max_logged_failures |
int |
100 |
Maximum failure messages to emit |
debug_log |
bool |
True |
False suppresses all upsert log output |
Returns: UpsertMetrics dict.
| Key | Type | Meaning |
|---|---|---|
total |
int |
Total records submitted |
upserted_count |
int |
Rows inserted or updated |
failed_count |
int |
Rows permanently rejected |
retries |
int |
Total retry attempts |
batches |
int |
Batches committed successfully |
total_upsert_time_sec |
float |
Wall-clock seconds |
upserted_data_size_bytes |
int |
Estimated byte size of rows sent to the database |
from aniket_tools import smart_upsert_sync, smart_upsert
# Synchronous — insert or update on conflict
metrics = smart_upsert_sync(
session_factory,
Product,
records,
conflict_columns=["sku"],
)
print(metrics["upserted_count"], metrics["failed_count"])
# Async
metrics = await smart_upsert(
async_session_factory,
Product,
records,
conflict_columns=["sku"],
)
# Only update specific columns on conflict
metrics = smart_upsert_sync(
session_factory,
Product,
records,
conflict_columns=["sku"],
update_columns=["price", "stock"],
)
serialize_data, register_serializer, clear_serializers
Convert SQLAlchemy query results and custom objects into plain Python dicts and lists for JSON-safe output.
serialize_data
Detects the shape of any SQLAlchemy query result and converts it to a dict or list[dict]. Passes through plain dicts unchanged.
from aniket_tools import serialize_data
# Single ORM instance
user = session.get(User, 1)
data = serialize_data(user) # {"id": 1, "name": "Alice", ...}
# List of ORM instances
users = session.execute(select(User)).scalars().all()
data = serialize_data(users) # [{"id": 1, ...}, {"id": 2, ...}]
# Raw Row from session.execute()
row = session.execute(select(User.id, User.name)).first()
data = serialize_data(row) # {"id": 1, "name": "Alice"}
# namedtuple
from collections import namedtuple
Point = namedtuple("Point", ["x", "y"])
serialize_data(Point(3, 4)) # {"x": 3, "y": 4}
# None / empty — safe sentinels
serialize_data(None) # {}
serialize_data([]) # []
Recognized shapes:
| Input | Output |
|---|---|
None |
{} |
[] |
[] |
Single Row |
dict |
list[Row] |
list[dict] |
RowMapping |
dict |
list[RowMapping] |
list[dict] |
| ORM instance | dict |
list of ORM instances |
list[dict] |
namedtuple |
dict |
Single-element tuple |
scalar value |
Multi-element tuple |
list |
| Unrecognized shape | returned unchanged |
register_serializer
Register a custom handler that runs before all built-in detection. The first matching predicate wins.
from aniket_tools import register_serializer, serialize_data
import dataclasses
@dataclasses.dataclass
class GeoPoint:
lat: float
lon: float
register_serializer(
lambda d: isinstance(d, GeoPoint),
lambda d: {"lat": d.lat, "lon": d.lon},
)
serialize_data(GeoPoint(37.7, -122.4)) # {"lat": 37.7, "lon": -122.4}
clear_serializers
Remove all registered custom serializers. Use in test teardown to prevent state leaking between tests.
from aniket_tools import clear_serializers
def teardown():
clear_serializers()
coerce_model_data
Coerces a raw dict to match a Pydantic model's or SQLAlchemy mapped class's declared types. Returns a per-field result table describing what changed, what failed, and what was already correct.
Parameters:
| Param | Type | Purpose |
|---|---|---|
model |
Pydantic BaseModel subclass or SA mapped class/instance |
Target type contract; pass the class, not an instance |
data |
dict[str, Any] |
Raw dict to coerce |
Returns: ModelCoercionResult
| Field | Type | Meaning |
|---|---|---|
success |
bool |
True when all fields coerced without error |
converted |
dict[str, Any] |
Coerced output dict; empty when success is False |
table |
list[dict] |
One FieldRow dict per input key |
FieldRow status values:
| Status | Meaning |
|---|---|
"coerced" |
Type changed during coercion |
"unchanged" |
Already the correct type |
"failed" |
Coercion raised an error |
"pending" |
Could not be re-validated after a batch ValidationError |
"unknown_field" |
Key absent from the model |
"skipped" |
SA column exposes no python_type |
from pydantic import BaseModel
from aniket_tools import coerce_model_data
class User(BaseModel):
id: int
active: bool
result = coerce_model_data(User, {"id": "42", "active": "true"})
result.success # True
result.converted # {"id": 42, "active": True}
# Per-field breakdown
for row in result.table:
print(row["field"], row["from_type"], "→", row["to_type"], row["status"])
# id str → int coerced
# active str → bool coerced
to_dict
Converts a Pydantic model instance, SQLAlchemy ORM instance, (ModelClass, dict) tuple, or plain dict into a serialization-ready plain Python dict. Handles nested objects, Enum values, UUIDs, datetimes, Decimal, bytes, Pydantic secrets, and SQLAlchemy relationships.
Parameters (key options):
| Param | Type | Default | Purpose |
|---|---|---|---|
data |
Pydantic instance, ORM instance, (cls, dict) tuple, or plain dict |
required | Object to convert |
model_class |
Pydantic BaseModel subclass | None |
None |
Validate data dict through this class first (alternative to tuple form) |
enum_mode |
"value" | "name" | "keep" |
"value" |
Enum output format |
uuid_mode |
"str" | "keep" |
"str" |
UUID output format |
datetime_mode |
"iso" | "keep" |
"iso" |
datetime output format |
decimal_mode |
"keep" | "str" | "float" |
"keep" |
Decimal output format |
handle_secrets |
"redact" | "expose" | "raise" |
"redact" |
Pydantic SecretStr/SecretBytes handling |
include_relationships |
bool |
False |
Include SQLAlchemy relationship attributes |
fallback_handler |
Callable | None |
None |
Custom handler for unrecognized types |
max_depth |
int |
25 |
Maximum nesting depth |
on_cycle |
"raise" | "skip" | "placeholder" |
"raise" |
Circular reference handling |
unknown_types |
"raise" | "passthrough" |
"raise" |
Behaviour for unrecognized types |
from pydantic import BaseModel
from aniket_tools import to_dict
class Order(BaseModel):
id: int
product: str
result = to_dict(Order(id=1, product="widget"))
# {"id": 1, "product": "widget"}
# Validate a raw dict through a model first
result = to_dict({"id": "42", "product": "bolt"}, model_class=Order)
# {"id": 42, "product": "bolt"}
compare_dicts
Deep value-by-value comparison of two dicts with type-aware comparators. Nested dicts recurse and emit flat dot-notation field paths. Lists of dicts use greedy unordered matching.
Parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
main |
dict |
required | Reference dict (expected baseline) |
comparing |
dict |
required | Dict to compare against main |
numeric_tolerance |
float | None |
0.001 |
Max absolute difference for numeric comparisons; None means exact |
nan_equal |
bool |
False |
Treat two NaN values as equal |
string_case_sensitive |
bool |
True |
False uses casefold comparison |
whitespace_normalize |
bool |
False |
Collapse internal whitespace runs before comparing strings |
datetime_date_only |
bool |
False |
Compare only the date portion of datetime values |
datetime_precision |
"second" | "minute" | "hour" | "day" | None |
None |
Truncate datetimes to this precision before comparing |
enum_by |
"value" | "name" |
"value" |
Compare Enum members by value or name |
same_type |
bool |
False |
Require Enum members and objects to be the same class |
Returns: DictDiffResult (CompareResult)
| Field | Type | Meaning |
|---|---|---|
match |
bool |
True when every field comparison matched |
fields |
list[FieldResult] |
One entry per compared key; nested diffs use dot-notation paths |
summary |
CompareSummary |
Aggregated field counts |
from aniket_tools import compare_dicts
result = compare_dicts(
{"id": 1, "price": 10.001, "name": "Alice"},
{"id": 1, "price": 10.002, "name": "alice"},
numeric_tolerance=0.01,
string_case_sensitive=False,
)
result.match # True — both differences fall within tolerance/case rules
# Nested dict — diff flattened to dot-notation
result = compare_dicts(
{"user": {"id": 1, "city": "NYC"}},
{"user": {"id": 1, "city": "LA"}},
)
result.match # False
result.fields[0].field # "user.city"
# Summary counts
result.summary.mismatched_fields # 1
result.summary.matched_fields # 1
compare_values
Single-value comparison using the same type-aware rules as compare_dicts. Accepts the same tolerance and mode parameters.
from aniket_tools import compare_values
compare_values(1, 1.0005) # True (within default tolerance 0.001)
compare_values("Alice", "alice", string_case_sensitive=False) # True
compare_values(float("nan"), float("nan"), nan_equal=True) # True
register_comparator and clear_comparators
Register a custom comparison function for a specific Python type. The registered function runs before all built-in dispatch when both values are instances of the registered type.
from aniket_tools import register_comparator, clear_comparators, compare_dicts
from decimal import Decimal
# Custom comparator: Decimal exact match only
register_comparator(Decimal, lambda a, b: a == b)
result = compare_dicts({"price": Decimal("10.0")}, {"price": Decimal("10.01")})
result.match # False — custom comparator bypasses tolerance
# Test teardown — prevent state leaking between tests
def teardown():
clear_comparators()
align_types_by_expected
Recursively aligns the types of actual to match expected, logging every conversion applied. Useful before calling compare_dicts when input data may have string-encoded numerics or mixed date/datetime values.
from aniket_tools import align_types_by_expected
expected = {"id": 1, "price": Decimal("10.5")}
actual = {"id": "1", "price": "10.5"}
_, aligned, conversions = align_types_by_expected(expected, actual)
aligned # {"id": 1, "price": Decimal("10.5")}
conversions # [TypeConversionResult(path="id", from_type="str", to_type="int", ...), ...]
coerce_actual_to_expected_type
Single-value coercion — converts actual to the same type as expected and returns (converted, changed_flag, note).
from aniket_tools import coerce_actual_to_expected_type
converted, changed, note = coerce_actual_to_expected_type(1, "42")
# (42, True, "Coerced actual value to int using expected type.")
retry, with_retry, async_with_retry, RetryConfig
Exponential-backoff retry with full jitter for any sync or async callable. Supports decorator, callable-wrapper, and pre-built config forms.
RetryConfig
Reusable retry policy dataclass.
| Field | Type | Default | Purpose |
|---|---|---|---|
max_retries |
int |
3 |
Max retry attempts after the first failure; 0 = call once, raise on failure |
on |
tuple[type[Exception], ...] |
(Exception,) |
Exception types that trigger a retry |
base_delay |
float |
0.1 |
Starting sleep duration in seconds |
backoff_factor |
float |
2.0 |
Multiplier applied to delay on each attempt |
max_delay |
float |
30.0 |
Upper bound on computed delay before jitter |
jitter |
bool |
True |
Full jitter: random.uniform(0, computed_cap) — prevents thundering herd |
predicate |
Callable | None |
None |
Extra callable returning True when retryable; both type check and predicate must pass |
on_retry |
Callable | None |
None |
Called before each sleep with (attempt, exc, delay); may raise to abort |
retry — decorator
Works on both sync and async functions. Detected automatically.
from aniket_tools import retry
# Sync
@retry(max_retries=3, on=(IOError,), base_delay=0.1)
def read_file(path: str) -> str:
return open(path).read()
# Async (same decorator — auto-detected)
@retry(max_retries=3, on=(ConnectionError,))
async def fetch(url: str) -> dict:
return await http_client.get(url)
# No-parens form
@retry
def connect():
...
with_retry — sync callable wrapper
from aniket_tools import with_retry
result = with_retry(
lambda: redis_client.get("key"),
max_retries=3,
on=(ConnectionError,),
base_delay=0.05,
)
async_with_retry — async callable wrapper
from aniket_tools import async_with_retry
result = await async_with_retry(
lambda: producer.send(msg),
max_retries=2,
on=(KafkaError,),
)
Pre-built RetryConfig
from aniket_tools import RetryConfig, with_retry
# Reuse the same policy across multiple call sites
db_retry = RetryConfig(
max_retries=3,
on=(OperationalError,),
predicate=lambda exc: "deadlock" in str(exc).lower(),
base_delay=0.1,
backoff_factor=2.0,
)
result = with_retry(lambda: session.execute(stmt), config=db_retry)
# on_retry hook — log before sleeping
def log_retry(attempt: int, exc: Exception, delay: float) -> None:
logs(f"Retry {attempt + 1}", context={"error": str(exc), "delay": delay}, type="warning")
cfg = RetryConfig(max_retries=3, on=(IOError,), on_retry=log_retry)
with_retry(lambda: risky_call(), config=cfg)
CircuitBreaker and CircuitBreakerOpen
Three-state circuit breaker that stops cascading failures by fast-failing calls when an upstream dependency is consistently failing.
States: closed (normal) → open (fast-failing) → half-open (probing recovery) → closed
Constructor parameters:
| Param | Type | Default | Purpose |
|---|---|---|---|
threshold |
int |
10 |
Consecutive failures required to open the circuit |
cooldown |
float |
30.0 |
Seconds to wait before allowing a probe; 0.0 = probe immediately |
on_state_change |
Callable[[str, str], None] | None |
None |
Callback fired with (old_state, new_state) on every transition |
from aniket_tools import CircuitBreaker, CircuitBreakerOpen
cb = CircuitBreaker(threshold=5, cooldown=10.0)
# Sync call
try:
result = cb.call(lambda: requests.get("https://api.example.com/data"))
except CircuitBreakerOpen:
result = fallback_response() # circuit is open — skip the call entirely
# Async call
try:
result = await cb.async_call(lambda: fetch_from_upstream())
except CircuitBreakerOpen:
result = cached_value
# State inspection
print(cb.state) # "closed" | "open" | "half_open"
print(cb.consecutive_failures) # int — resets to 0 on any success
# State-change callback (useful for alerting / metrics)
def on_change(old: str, new: str) -> None:
logs(f"Circuit: {old} → {new}", type="warning")
cb = CircuitBreaker(threshold=5, cooldown=30.0, on_state_change=on_change)
# Reset — discards all failure history (intended for test isolation)
cb.reset()
CircuitBreakerOpen is raised when a call is rejected because the circuit is open and the cooldown has not elapsed. Catch it to return a fallback or cached response.
Code Structure
src/aniket_tools/
__init__.py ← public exports
http/
responses.py ← create_response, value_correction, PaginationRes
observability/
logging.py ← logs, get_logger, configure_logging, reset_logging_config, SUPPORTED_LOG_TYPES
timing.py ← log_timing, log_timing_block
exceptions/
core.py ← ApiError, ErrorHandler, unified_exception_handler
handlers/
base.py ← ErrorInfo dataclass, message helpers
api_http_validation.py ← FastAPI/Starlette/Pydantic
database_family.py ← all SQL and MongoDB drivers
http_auth_cloud_family.py ← requests, httpx, aiohttp, urllib3, PyJWT, botocore
cache_queue_family.py ← Redis, Kafka, Celery, Kombu
data_tool_family.py ← Pandas, NumPy, PyArrow, Polars, SciPy
python_family.py ← stdlib builtins, asyncio, ssl, socket
database/
insertion.py ← smart_insert, smart_insert_sync, InsertMetrics
updatetion.py ← smart_update, smart_update_sync, UpdateMetrics
utils/
model_coercer.py ← coerce_model_data, ModelCoercionResult, to_dict
dict_compare.py ← compare_dicts, compare_values, align_types_by_expected, coerce_actual_to_expected_type, DictDiffResult, register_comparator, clear_comparators
compat.py ← optional import helpers
Safe Editing Rules
- Add specific exception checks before generic ones (e.g.
redis.TimeoutErrorbefore PythonTimeoutError) - Keep message logic in
_database_message(...), status logic in_database_status(...) - Keep JSON shape logic in
build_payload(...)orcreate_response(...) - If you add a new public function, also export it from
__init__.py create_response(...)is for normal route returns;unified_exception_handler(...)is for exceptions — they are separate code paths
HTML Output References
| File | Shows |
|---|---|
result/logging_results.html |
All logs() types and options with rendered output |
result/responses_results.html |
All create_response and value_correction use cases |
result/exceptions_results.html |
All ErrorHandler / ApiError exception families and payloads |
Feature Overview
- HTTP response standardization (
create_response,PaginationRes,value_correction) - Unified exception translation (
ErrorHandler,ApiError,unified_exception_handler) - Observability utilities (
logs,get_logger,configure_logging, timing helpers) - Database batch write helpers (
smart_insert,smart_update,smart_upsert) - Data normalization and comparison (
coerce_model_data,compare_dicts) - Reliability primitives (
retry,with_retry,CircuitBreaker)
Function Catalog
| Function / Class | Description | Why Use It | Input | Output | Example |
|---|---|---|---|---|---|
create_response |
Standard success/error API payload builder | Keep API responses consistent | status code + payload parts | JSONResponse or dict |
create_response(200, data={"ok": True}) |
value_correction |
Recursive JSON-safe value normalization | Clean/serialize mixed Python values | any Python object | normalized value | value_correction({"amt": Decimal("1.23")}) |
PaginationRes |
Typed pagination metadata | Avoid malformed pagination blocks | page, rows, total_rows |
dataclass instance | PaginationRes(page=1, rows=10, total_rows=40) |
logs |
Unified logging helper | One API for text/json/query/table logging | message + log options | log side effects | logs("started", type="info") |
get_logger |
Configured logger factory | Reuse shared logger settings | logger configuration args | logging.Logger |
get_logger("service", json=True) |
configure_logging |
Root logging setup helper | Configure process-wide logging once | global logging options | logging.Logger |
configure_logging(level="INFO") |
reset_logging_config |
Restore factory logging defaults | Test teardown after configure_logging | none | none | reset_logging_config() |
log_timing |
Execution-time decorator | Track function latency | function + label/options | wrapped callable | @log_timing("fetch users") |
log_timing_block |
Inline timing context manager | Track specific code blocks | label + options | context manager | with log_timing_block("query"): |
ApiError |
Controlled API error type | Raise typed user-safe errors | message, status, code, details | exception | raise ApiError("Not found", 404) |
ErrorHandler |
Exception classifier and payload builder | Convert unknown exceptions safely | exception (+ request/meta) | structured payload/response | ErrorHandler().handle_exception(exc) |
unified_exception_handler |
FastAPI-compatible exception handler | Plug into app.add_exception_handler |
(request, exc) |
JSONResponse |
app.add_exception_handler(Exception, unified_exception_handler) |
ExceptionHandler |
Backward-compatible exception wrapper | Keep legacy call sites working | exception | raises/handles | ExceptionHandler(exc) |
handle_exception |
Convenience wrapper around handler | Translate exception without manual setup | exception (+ request/meta) | response object | handle_exception(exc) |
explain_error |
Human-readable error description | Debug and diagnostics | exception | str |
explain_error(exc) |
get_status_code |
Suggested HTTP status lookup | Consistent status mapping | exception | int |
get_status_code(exc) |
smart_insert / smart_insert_sync |
Batch SQLAlchemy insert helpers | High-throughput inserts with retries | session/model/records | insert metrics | smart_insert_sync(sf, User, rows) |
smart_update / smart_update_sync |
Batch SQLAlchemy update helpers | Efficient key-based updates | session/model/records/keys | update metrics | smart_update_sync(sf, User, rows, ["id"]) |
smart_upsert / smart_upsert_sync |
Batch SQLAlchemy upsert helpers | Insert-or-update at scale | session/model/records/conflict keys | upsert metrics | smart_upsert_sync(sf, User, rows, ["id"]) |
register_serializer |
Register custom serializer | Support custom domain objects | Python type + serializer fn | None |
register_serializer(UUID, str) |
clear_serializers |
Reset serializer registry | Test isolation / config reload | none | None |
clear_serializers() |
serialize_data |
Serialize nested objects/ORM rows | Convert mixed values before JSON/logging | any nested value | serialized value | serialize_data(payload) |
coerce_model_data |
Per-field coercion by model schema | Normalize external input reliably | model + raw dict/list | ModelCoercionResult |
coerce_model_data(UserModel, row) |
to_dict |
Convert model/ORM instance to plain dict | Serialize structured objects for JSON | Pydantic/ORM/dict | dict |
to_dict(user_instance) |
compare_dicts |
Deep dict comparison engine | Auditing/testing data parity | expected dict + actual dict | DictDiffResult |
compare_dicts(expected, actual) |
compare_values |
Single-value type-aware comparison | Quick value equality check | two values + options | bool |
compare_values(1, 1.0005) |
register_comparator |
Register custom type comparator | Override comparison for a type | Python type + fn | none | register_comparator(Decimal, fn) |
clear_comparators |
Reset comparator registry | Test isolation / config reload | none | none | clear_comparators() |
align_types_by_expected |
Recursive expected-type alignment | Pre-normalize before comparison | expected + actual | aligned values + conversions | align_types_by_expected(exp, act) |
coerce_actual_to_expected_type |
Single-value expected-type coercion | Lightweight type correction | expected + actual value | (value, changed, note) |
coerce_actual_to_expected_type(1, "1") |
retry / with_retry / async_with_retry |
Retry decorators/helpers | Handle transient failures | callable + retry config | return value or raised error | @retry(max_attempts=3) |
RetryConfig |
Retry policy model | Reusable retry settings | retry options | dataclass instance | RetryConfig(max_attempts=5) |
CircuitBreaker / CircuitBreakerOpen |
Failure-threshold breaker | Stop cascading upstream failures | thresholds + protected call | protected execution / exception | breaker.call(fn) |
Module Overview
| Module | Purpose | Documentation |
|---|---|---|
src/aniket_tools/http/responses.py |
Response schema + normalization | research/http/responses/README.md |
src/aniket_tools/exceptions/core.py |
Exception abstraction + payload mapping | research/exceptions/core/README.md |
src/aniket_tools/observability/logging.py |
Structured logging helpers | research/observability/logging/README.md |
src/aniket_tools/observability/timing.py |
Timing decorators/context managers | research/observability/timing/README.md |
src/aniket_tools/database/insertion.py |
Batch insert operations | research/database/insertion/README.md |
src/aniket_tools/database/updatetion.py |
Batch update operations | research/database/updatetion/README.md |
src/aniket_tools/database/upsert.py |
Batch upsert operations | research/database/upsert/README.md |
src/aniket_tools/database/serializer.py |
Serialization registry + conversion | research/database/serializer/README.md |
src/aniket_tools/utils/model_coercer.py |
Model-driven data coercion | research/utils/model_coercer/README.md |
src/aniket_tools/utils/dict_compare.py |
Deep comparison + type alignment | research/utils/dict_compare/README.md |
src/aniket_tools/utils/compat.py |
Optional import/type compatibility helpers | research/utils/compat/README.md |
src/aniket_tools/reliability/retry.py |
Retry utilities | research/reliability/retry/README.md |
src/aniket_tools/reliability/circuit_breaker.py |
Circuit breaker state machine | research/reliability/circuit_breaker/README.md |
Return Structure Reference
create_response(...)success payload:success,response_code,meta,data, optionalpaginationcreate_response(...)error payload:success,response_code,error_message,meta,error,errorsErrorHandler.build_payload(...): same public envelope as error response + retry hints/detailscompare_dicts(...) -> DictDiffResult:match,fields,summarycoerce_model_data(...) -> ModelCoercionResult: coerced rows plus per-field conversion metadata/errorssmart_insert/smart_insert_sync(...) -> InsertMetrics:total,inserted_count,skipped_count,failed_count,retries,batches,total_insert_time_sec,inserted_data_size_bytessmart_update/smart_update_sync(...) -> UpdateMetrics:total,updated_count,failed_count,retries,batches,total_update_time_sec,updated_data_size_bytessmart_upsert/smart_upsert_sync(...) -> UpsertMetrics:total,upserted_count,failed_count,retries,batches,total_upsert_time_sec,upserted_data_size_bytesRetryConfig: dataclass —max_retries,on,base_delay,backoff_factor,max_delay,jitter,predicate,on_retry
Edge Cases
- Empty input collections (
[],{}) in comparison and DB helpers None,NaN, andInfhandling in response normalization- Missing keys / key-case differences in dict comparison
- Invalid input types for serializer and coercion APIs
- Nested dict/list/set/object coercion paths
- Large batch sizes and retryable database failures
FAQ
- Should I use
create_responseinside exception handlers? Useunified_exception_handlerfor exceptions andcreate_responsefor normal route returns. - Can I return plain dictionaries instead of
JSONResponse? Yes, setas_json_response=Falseincreate_response. - Which APIs are safe without FastAPI installed? Most utilities are framework-agnostic; FastAPI integration points are optional.
- How do I extend serialization for custom types?
Register a converter with
register_serializer(MyType, fn). - How do I reduce logging overhead?
Use
sample_rate, structured context, and file rotation as needed.
Version Compatibility
- Python:
>=3.9(frompyproject.toml) - Optional dependency groups:
fastapi:fastapi>=0.110pydantic:pydantic>=1.10database:sqlalchemy>=1.4,asyncpg>=0.29,psycopg2-binary>=2.9
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aniket_tools-0.1.7.tar.gz.
File metadata
- Download URL: aniket_tools-0.1.7.tar.gz
- Upload date:
- Size: 140.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3ff33bf50a790c2f0dcc863a441b144de301cf0cedb7076cc03234b8a7f6d51a
|
|
| MD5 |
7d27b1dc299e3f8567ec0dc5fafe1662
|
|
| BLAKE2b-256 |
249e4ed5219da85ced38294d4d328258e0c13237d8f49f1610ef9eff55ccaca0
|
Provenance
The following attestation bundles were made for aniket_tools-0.1.7.tar.gz:
Publisher:
publish-pypi.yml on aniketmodi123/reusable_code_lib
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aniket_tools-0.1.7.tar.gz -
Subject digest:
3ff33bf50a790c2f0dcc863a441b144de301cf0cedb7076cc03234b8a7f6d51a - Sigstore transparency entry: 1765732764
- Sigstore integration time:
-
Permalink:
aniketmodi123/reusable_code_lib@011168c3430763847904f8cc99763087ca632c19 -
Branch / Tag:
refs/heads/prod - Owner: https://github.com/aniketmodi123
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@011168c3430763847904f8cc99763087ca632c19 -
Trigger Event:
pull_request
-
Statement type:
File details
Details for the file aniket_tools-0.1.7-py3-none-any.whl.
File metadata
- Download URL: aniket_tools-0.1.7-py3-none-any.whl
- Upload date:
- Size: 111.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
252288d659566cfb657514832e7e0388cff56bb12e516a6c159c4522c3251a68
|
|
| MD5 |
478733a6c328c9761ef24d7d5147bc2a
|
|
| BLAKE2b-256 |
54ba488c95f68a6bdfc160d29aac6fb6c5a2e70b8a7cac606336c926344c6f9a
|
Provenance
The following attestation bundles were made for aniket_tools-0.1.7-py3-none-any.whl:
Publisher:
publish-pypi.yml on aniketmodi123/reusable_code_lib
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aniket_tools-0.1.7-py3-none-any.whl -
Subject digest:
252288d659566cfb657514832e7e0388cff56bb12e516a6c159c4522c3251a68 - Sigstore transparency entry: 1765733654
- Sigstore integration time:
-
Permalink:
aniketmodi123/reusable_code_lib@011168c3430763847904f8cc99763087ca632c19 -
Branch / Tag:
refs/heads/prod - Owner: https://github.com/aniketmodi123
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@011168c3430763847904f8cc99763087ca632c19 -
Trigger Event:
pull_request
-
Statement type: