Skip to main content

Production-ready toolkit for exception handling, structured logging, database ops, HTTP responses, retry, circuit breaking, and dict comparison.

Project description

aniket_tools

A reusable Python library for:

  • building consistent API responses (create_response, value_correction)
  • translating any exception into a safe, structured JSON payload (ErrorHandler, ApiError)
  • structured logging with levels, context, redaction, and formatting (logs, get_logger)
  • timing helpers for function/block duration (log_timing, log_timing_block)
  • high-throughput SQLAlchemy batch inserts with conflict handling and retries (smart_insert, smart_insert_sync)
  • high-throughput SQLAlchemy batch updates with key-based WHERE clauses and retries (smart_update, smart_update_sync)
  • per-field model-validated dict coercion for Pydantic and SQLAlchemy (coerce_model_data)
  • deep value-by-value dict comparison with type-aware comparators (compare_dicts)

Install

pip install aniket_tools

Import

from aniket_tools import (
    # HTTP responses
    ApiError,
    ErrorHandler,
    ExceptionHandler,
    PaginationRes,
    create_response,
    explain_error,
    get_status_code,
    handle_exception,
    unified_exception_handler,
    value_correction,
    # Logging & timing
    configure_logging,
    reset_logging_config,
    get_logger,
    log_timing,
    log_timing_block,
    logs,
    SUPPORTED_LOG_TYPES,
    # Database
    smart_insert,
    smart_insert_sync,
    smart_update,
    smart_update_sync,
    smart_upsert,
    smart_upsert_sync,
    InsertMetrics,
    UpdateMetrics,
    UpsertMetrics,
    # Serializer
    serialize_data,
    register_serializer,
    clear_serializers,
    # Reliability
    retry,
    with_retry,
    async_with_retry,
    RetryConfig,
    CircuitBreaker,
    CircuitBreakerOpen,
    # Model coercion
    coerce_model_data,
    ModelCoercionResult,
    to_dict,
    # Dict comparison
    compare_dicts,
    compare_values,
    align_types_by_expected,
    coerce_actual_to_expected_type,
    DictDiffResult,
    register_comparator,
    clear_comparators,
)

Quick Start — FastAPI

from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from aniket_tools import create_response, unified_exception_handler

app = FastAPI()
app.add_exception_handler(HTTPException, unified_exception_handler)
app.add_exception_handler(Exception, unified_exception_handler)
app.add_exception_handler(RequestValidationError, unified_exception_handler)

@app.get("/health")
async def health():
    return create_response(200, data={"status": "ok"})

Quick Start — Standalone (no FastAPI)

from aniket_tools import logs, get_logger

# Structured logging in any Python script or service
logs("Starting job", type="info")
logs("DB connected", type="success")
logs("Low disk space", type="warning", context={"free_gb": 1.2})

# JSON logger for production — outputs one JSON line per call
lg = get_logger("my_service", json=True)
logs("Request received", context={"user_id": 42, "path": "/api/users"}, logger=lg)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from aniket_tools import smart_insert_sync, smart_upsert_sync

engine = create_engine("postgresql+psycopg2://user:pass@localhost/mydb")
Session = sessionmaker(bind=engine)

records = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

# Batch insert — skip duplicates by id
metrics = smart_insert_sync(Session, User, records, conflict_columns=["id"])
print(metrics["inserted_count"], metrics["skipped_count"])

# Batch upsert — update name on conflict
metrics = smart_upsert_sync(Session, User, records, conflict_columns=["id"])
print(metrics["upserted_count"])
from aniket_tools import retry, CircuitBreaker, CircuitBreakerOpen

# Retry any flaky call
@retry(max_retries=3, on=(IOError,), base_delay=0.1)
def fetch_report(report_id: int):
    return external_api.get(f"/reports/{report_id}")

# Protect an upstream dependency with a circuit breaker
cb = CircuitBreaker(threshold=5, cooldown=30.0)

try:
    data = cb.call(lambda: fetch_report(report_id=42))
except CircuitBreakerOpen:
    data = {"error": "service_unavailable"}

create_response

Builds the standard success or error payload and returns a JSONResponse (or a plain dict with as_json_response=False).

Parameters:

Param Type Default Purpose
response_code int required HTTP status code
data Any None Response body data
schema Pydantic model class None Validates data before returning
pagination dict | PaginationRes None Pagination metadata
error_message str None Error description
error_code str None Machine-readable error code
details list[dict] None Field-level error details
meta dict | model None Request metadata (request_id, trace_id, …)
as_json_response bool True Return JSONResponse vs plain dict

Success responses

# Minimal
create_response(200, data={"status": "ok"})

# With meta
create_response(200, data={"name": "Aniket"}, meta={"request_id": "req-1", "trace_id": "t-2"})

# With pagination — dict form
create_response(200, data=rows, pagination={"page": 1, "rows": 25, "total_rows": 250})

# With pagination — typed model
create_response(200, data=rows, pagination=PaginationRes(page=1, rows=25, total_rows=250))

# Schema validation — Pydantic model class validates each item
create_response(200, data=raw_rows, schema=UserSchema)

# No content
create_response(204)   # returns FastAPI Response(status_code=204) — no body

# Return plain dict instead of JSONResponse (useful in tests or non-FastAPI code)
payload = create_response(200, data={"ok": True}, as_json_response=False)

Error responses

# Simple 404
create_response(404, error_message="Report not found.", error_code="report_not_found")

# 422 with field-level details
create_response(
    422,
    error_message="One or more fields are invalid.",
    error_code="validation_error",
    details=[
        {"field": "email", "message": "field required"},
        {"field": "age",   "message": "must be a positive integer"},
    ],
)

Success response shape

{
  "success": true,
  "response_code": 200,
  "meta": {"request_id": "req-1"},
  "data": {"name": "Aniket"},
  "pagination": {"page": 1, "rows": 25, "total_rows": 250}
}

Error response shape

{
  "success": false,
  "response_code": 422,
  "error_message": "One or more fields are invalid.",
  "meta": {},
  "error": {
    "code": "validation_error",
    "message": "One or more fields are invalid.",
    "details": [{"field": "email", "message": "field required"}]
  },
  "errors": [{"field": "email", "message": "field required"}]
}

error_message and error.message are aliases. errors and error.details are aliases. Both exist for backward compatibility.


PaginationRes

Typed dataclass for pagination metadata.

from aniket_tools import PaginationRes

p = PaginationRes(page=1, rows=25, total_rows=250)
create_response(200, data=rows, pagination=p)

Validation rules inside create_response:

  • page ≥ 1
  • rows ≥ 0
  • total_rows ≥ 0
  • All three are required integers
  • Extra keys on the dict form are preserved

If any rule fails, create_response returns a 422 validation error instead of a broken payload.


value_correction

Recursively normalizes Python values into JSON-safe types. Called automatically by create_response on all data.

Parameters:

Param Type Default Purpose
data Any required Value to normalize
mode str "response" "response" enables all conversions; "storage" disables most
float_precision int | None 2 in response mode Decimal places to round floats to
datetime_format str "%Y-%m-%d %H:%M:%S" Output format for datetime values
date_format str "%Y-%m-%d" Output format for date values
strip_strings bool True Strip whitespace from strings
convert_decimal bool | None None Override Decimal conversion (None follows mode)
convert_datetime bool | None None Override datetime conversion (None follows mode)
convert_date bool | None None Override date conversion (None follows mode)
convert_timedelta bool | None None Override timedelta conversion (None follows mode)
nan_to_none bool | None None Convert NaN/Inf to None (None follows mode)
convert_uuid bool | None None Override UUID conversion (None follows mode)
convert_enum bool | None None Override Enum conversion (None follows mode)
convert_bytes bool | None None Override bytes decoding (None follows mode)
convert_objects bool | None None Override dataclass/pydantic object conversion (None follows mode)

What it converts by default:

Input type Output
str stripped string
bytes UTF-8 decoded string
Decimal float (rounded to 2 dp)
datetime "2024-01-15 09:30:00"
date "2024-01-15"
timedelta "0:01:30"
float NaN / Inf None
float rounded to 2 dp
UUID "550e8400-..."
Enum enum value (recursed)
dataclass dict (recursed)
Pydantic model dict (recursed)
numpy.integer int
numpy.floating float (recursed)
numpy.ndarray list (recursed)
dict keys and values recursed
list recursed to list
tuple recursed to tuple
set / frozenset recursed to sorted list
None, bool, int unchanged
from decimal import Decimal
from datetime import datetime
from uuid import UUID
from aniket_tools import value_correction

value_correction({
    "amount":   Decimal("10.567"),
    "created":  datetime(2024, 1, 15, 9, 30),
    "name":     "  Aniket  ",
    "rate":     float("nan"),
    "id":       UUID("550e8400-e29b-41d4-a716-446655440000"),
})
# → {
#     "amount":  10.57,
#     "created": "2024-01-15 09:30:00",
#     "name":    "Aniket",
#     "rate":    None,
#     "id":      "550e8400-e29b-41d4-a716-446655440000",
# }

# Custom float precision
value_correction(3.14159, float_precision=4)  # → 3.1416

# Storage mode — most conversions disabled
value_correction(Decimal("10.5"), mode="storage")  # → Decimal("10.5") unchanged

logs

Unified logging function. Handles plain messages, structured context, redaction, SQL queries, ASCII tables, JSON pretty-print, and file output.

Parameters:

Param Type Default Purpose
msg object "" Message, data structure, or SQL statement
type str "info" Log level / mode (see table below)
file_name str | Path None Also write to this file (auto-creates dirs)
logger Logger None Use a specific logger instead of the default
dialect object None SQLAlchemy dialect for type="query"
context dict None Key-value fields appended to the log line
exc_info bool | Exception False Attach exception traceback
redact list[str] None Context keys to mask as ***
sample_rate float None 0.0-1.0 — emit about this fraction of calls
indent int 4 JSON indent size for type="json"
log_format str '%(asctime)s | %(name)s | %(levelname)s | %(message)s' Text pattern used when logs() creates the logger
date_format str | None None Timestamp pattern passed to logging.Formatter

Log types / levels

type Level Color Use for
"trace" 5 dim Very fine-grained internal tracing
"debug" 10 cyan Developer debug info
"info" 20 default General status messages
"success" 25 green Positive confirmations
"warning" 30 yellow Non-critical concerns
"error" 40 red Errors and failures
"critical" 50 bold red System-level failures
"audit" 45 magenta Compliance / security events
"exception" 40 red Same as error + auto-attaches traceback
"query" 20 default SQL statements (auto-compiles with literals)
"table" 20 default list[dict] → ASCII table
"divider" 20 default Section separator line
"json" 20 default Pretty-prints any JSON-serializable object

Examples

from aniket_tools import logs, get_logger

# Standard levels
logs("Server started")
logs("Connecting to DB", type="debug")
logs("Disk above 80%",  type="warning")
logs("Save failed",     type="error")
logs("Out of memory",   type="critical")

# Custom levels
logs("Entering resolve_user",         type="trace")
logs("Payment processed",             type="success")
logs("User admin deleted record #42", type="audit")

# Exception with traceback
try:
    raise ValueError("bad input")
except Exception as e:
    logs("Caught error", type="error", exc_info=e)

# Or the shorthand
logs("DB failed", type="exception")   # auto-attaches current exception

# Context fields
logs("User logged in", context={"user_id": 42, "ip": "10.0.0.1"})
# → ... | INFO | User logged in | user_id=42 ip=10.0.0.1

# Redaction
logs("API call", context={"api_key": "secret123", "endpoint": "/v1"}, redact=["api_key"])
# → ... | INFO | API call | api_key=*** endpoint=/v1

# SQL query (plain string)
logs("SELECT * FROM users WHERE id = 1", type="query")

# SQL query (SQLAlchemy statement with bound params)
from sqlalchemy import select
stmt = select(User).where(User.id == 7)
logs(stmt, type="query", dialect="postgresql")
logs(stmt, type="query", dialect=session)   # session / engine also accepted

# ASCII table
logs([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], type="table")

# Divider
logs("Auth Section", type="divider")   # ──── Auth Section ─────────────────────

# JSON pretty-print
logs({"user": "alice", "roles": ["admin", "editor"]}, type="json")

# Write to file (also still logs to console)
logs("Report generated", file_name="logs/app.log")

# Sampling — only ~10% of calls produce output
logs("high-frequency event", sample_rate=0.1)

# One-off custom text pattern
logs(
    "Report generated",
    file_name="logs/app.log",
    log_format="%(asctime)s | %(filename)s | %(funcName)s | %(levelname)s | %(message)s",
    date_format="%Y-%m-%d %H:%M:%S",
)

get_logger

Returns a configured logging.Logger.

Parameters:

Param Type Default Purpose
name str "aniket" Logger name
file_name str | Path None Log file path
json bool False Emit structured JSON lines instead of plain text
rotate bool False Rotate at 10 MB, keep 5 backups
sample_rate float None Emit about this fraction of all logs from this logger
log_format str '%(asctime)s | %(name)s | %(levelname)s | %(message)s' Custom text log pattern
date_format str | None None Custom timestamp format passed to logging.Formatter
from aniket_tools import get_logger, logs

# Plain logger
lg = get_logger("my_app")
logs("started", logger=lg)

# File + rotation
lg = get_logger("my_app", file_name="logs/app.log", rotate=True)
logs("started", logger=lg)

# JSON output — ready for Datadog, Loki, ELK
lg = get_logger(json=True)
logs("User created", context={"user_id": 99, "env": "prod"}, logger=lg)
# → {"time": "...", "name": "aniket", "level": "INFO", "msg": "User created", "user_id": 99, "env": "prod"}

# JSON + redaction
logs("Login", context={"user": "admin", "password": "hunter2"}, redact=["password"], logger=lg)
# → {"time": "...", ..., "user": "admin", "password": "***"}

# 50% sampling on the logger level
lg = get_logger(sample_rate=0.5)
logs("background event", logger=lg)

# Custom text pattern
lg = get_logger(
    "my_app",
    file_name="logs/app.log",
    log_format="%(asctime)s | %(filename)s | %(funcName)s | %(lineno)d | %(levelname)s | %(message)s",
    date_format="%Y-%m-%d %H:%M:%S",
)
logs("started", logger=lg)
# → 2026-04-28 16:35:00 | service.py | run | 42 | INFO | started

log_format uses standard Python logging.Formatter fields. Useful built-ins include:

Field Meaning
%(asctime)s Formatted timestamp
%(name)s Logger name
%(levelname)s Log level
%(filename)s Source file name
%(pathname)s Full source file path
%(module)s Source module
%(funcName)s Calling function name
%(lineno)d Calling line number
%(process)d Process ID
%(thread)d Thread ID
%(message)s The message passed to logs()

Formatter fields such as time, file, function, line number, level, and logger name are configured once. File, function, and line number point to the user code that called logs(...). Runtime values such as request method, URL path, status code, username, tenant, or client IP still need to be included in the log message or context from middleware.


configure_logging

Sets process-wide logging defaults applied to all subsequent get_logger() and logs() calls. Any setting not passed is left unchanged. Existing managed loggers are reconfigured immediately.

Parameters:

Param Type Default Purpose
file_name str | Path | None None Default log file path for all loggers
json bool | None None Default JSON output mode
rotate bool | None None Default rotating-file mode (10 MB, 5 backups)
sample_rate float | None None Default sampling rate (0.0–1.0)
colored bool | None None Default colorized console output
debug_log bool | None None Default enable/disable switch for logs()
log_format str | None None Default text formatter string
date_format str | None None Default datetime formatter string
from aniket_tools import configure_logging, reset_logging_config, logs, get_logger

# Disable color globally (useful in CI or when piping to files)
configure_logging(colored=False)

# Switch all loggers to JSON output (Datadog, Loki, ELK)
configure_logging(json=True)

# Write all logs to a rotating file by default
configure_logging(file_name="logs/app.log", rotate=True)

# Apply a uniform format across all loggers
configure_logging(
    log_format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
    date_format="%Y-%m-%d %H:%M:%S",
)

# High-traffic service: JSON + 10% sampling
configure_logging(json=True, sample_rate=0.1)

# All logs() and get_logger() calls after this inherit the updated defaults
logs("server started")

reset_logging_config

Restores all module-wide logging defaults to their factory values. Intended for test teardown after configure_logging changes defaults — prevents settings from leaking between tests.

from aniket_tools import configure_logging, reset_logging_config

def teardown():
    reset_logging_config()

log_timing and log_timing_block

Timing is intentionally separated from logs(...).

  • log_timing(...): decorator for function execution time
  • log_timing_block(...): context manager for inline block/query timing

Parameters (both APIs):

Param Type Default Purpose
label str | None (log_timing) / str (log_timing_block) function name / required Human-readable timing label
file_name str | Path None Also write timing log to this file
colored bool | None None Override console coloring for the timing log
log_format str | None None Override message pattern for timing log
date_format str | None None Override timestamp format for timing log
debug_log bool | None None False suppresses timing output
from aniket_tools import log_timing, log_timing_block

# Decorator: function duration
@log_timing("fetch users")
def fetch_users():
    ...

# Context manager: inline block/query duration
def list_users():
    with log_timing_block("simple query"):
        ...

Output format:

  • TIMING[fetch users] took 0.0421s

ApiError

Raise a controlled API error from anywhere in your code.

Parameters:

Param Type Default Purpose
message str required User-facing error text
status_code int 400 HTTP status code
code str "api_error" Machine-readable error code
details list[dict] None Field-level details
log_message str None Extra developer context (logged, not returned)
from aniket_tools import ApiError

# Simple
raise ApiError("Report not found.", status_code=404, code="report_not_found")

# With field-level details
raise ApiError(
    "Validation failed.",
    status_code=422,
    code="validation_error",
    details=[{"field": "email", "message": "already registered"}],
)

# With a private log message (not sent to the client)
raise ApiError(
    "Something went wrong.",
    status_code=500,
    code="internal_error",
    log_message=f"DB query failed on table=billing sql={raw_sql}",
)

When unified_exception_handler catches an ApiError, the message, status_code, code, and details are returned exactly as provided. The log_message is written to the error log but never included in the response.


ErrorHandler

Core exception classifier. Understands 60+ exception types across all major Python libraries.

Parameters:

Param Type Default Purpose
logger_name str "aniket_tools.errors" Name of the logger used for log_exception
logger Logger None Preconfigured logger to use for exception logs
file_name str | Path None Error log file path when creating the logger
json bool False Emit JSON error log lines
rotate bool False Rotate error log file at 10 MB, keep 5 backups
sample_rate float None Emit about this fraction of error logs
log_format str default logger pattern Custom text pattern for exception logs
date_format str | None None Custom timestamp format for exception logs
use_default_message_for_long_errors bool True Replace long raw messages with safe defaults
from aniket_tools import ErrorHandler

handler = ErrorHandler()

# Classify any exception into a structured ErrorInfo
info = handler.describe(some_exception)
print(info.status_code)   # e.g. 422
print(info.code)          # e.g. "duplicate_resource"
print(info.message)       # e.g. "A record with this email already exists."
print(info.retryable)     # True / False / None

# Build the full JSON payload
payload = handler.build_payload(some_exception, meta={"trace_id": "t-1"})

# Log the raw exception and return JSONResponse
response = handler.handle_exception(some_exception, request=request)

# Log only (no response)
handler.log_exception(some_exception, request=request)

# Configured exception log format
handler = ErrorHandler(
    file_name="logs/errors.log",
    log_format="%(asctime)s | %(filename)s | %(funcName)s | %(levelname)s | %(message)s",
    date_format="%Y-%m-%d %H:%M:%S",
)

handler.log_exception(ValueError("bad input"))
# → 2026-04-28 16:35:00 | service.py | create_report | ERROR | bad input

log_exception() writes only str(exc) by default. For ApiError, it writes log_message when provided. It does not attach a traceback unless you call logs(..., type="exception") yourself.

Exception error response shape

{
  "success": false,
  "response_code": 409,
  "error_message": "A record with this email already exists.",
  "error_type": "IntegrityError",
  "meta": {"request_id": "req-1", "path": "/users"},
  "error": {
    "code": "duplicate_resource",
    "type": "IntegrityError",
    "message": "A record with this email already exists.",
    "retryable": false,
    "details": [
      {"type": "duplicate_resource", "field": "email", "value": "a@b.com", "constraint": "users_email_key"}
    ]
  },
  "errors": [
    {"type": "duplicate_resource", "field": "email", "value": "a@b.com", "constraint": "users_email_key"}
  ]
}

retryable field:

  • true — client should retry (timeouts, deadlocks, transient unavailability, cache/queue conflicts)
  • false — retrying will not help (duplicate key, bad input, auth failure, SSL error)
  • absent — not determined for this error type

Exception families covered

Family Libraries Example codes
HTTP / Validation FastAPI, Starlette, Pydantic validation_error, http_404
Database SQLAlchemy, psycopg2, psycopg3, asyncpg, MySQL Connector, PyMySQL, MySQLdb, sqlite3, PyMongo duplicate_resource, invalid_reference, database_timeout, database_unavailable, database_retryable_conflict
Upstream HTTP requests, httpx, aiohttp, urllib3 upstream_timeout, upstream_unavailable, upstream_ssl_error, upstream_bad_response
Auth PyJWT token_expired, invalid_token, invalid_token_claim
Cloud botocore / boto3 cloud_timeout, cloud_not_found, cloud_rate_limited, cloud_forbidden
Cache Redis cache_timeout, cache_conflict, cache_unavailable, cache_auth_failed
Queue / Tasks kafka-python, confluent_kafka, Celery, Kombu queue_timeout, queue_unavailable, task_timeout, invalid_queue_payload
Data tools Pandas, NumPy, PyArrow, Polars, SciPy invalid_data, data_backend_unavailable
Python builtins stdlib invalid_json, invalid_yaml, resource_not_found, bad_request, undefined_reference, internal_error

Common error codes and status codes

Code Status Retryable Cause
duplicate_resource 409 false Unique constraint violation
invalid_reference 422 false Foreign key violation
missing_required_field 422 false NOT NULL violation
constraint_violation 422 false CHECK constraint
database_retryable_conflict 409 true Deadlock / serialization failure
database_timeout 504 true Statement / network timeout
database_unavailable 503 true Cannot connect to DB server
database_programming_error 500 false Undefined table / SQL syntax bug
validation_error 422 false Request field validation
upstream_timeout 504 true HTTP client timeout
upstream_unavailable 503 true Cannot reach upstream service
upstream_ssl_error 502 false TLS / certificate failure
token_expired 401 false JWT expired
invalid_token 401 false JWT invalid signature / decode
cloud_rate_limited 429 true Cloud SDK throttle
cache_timeout 504 true Redis timeout
cache_conflict 409 true Redis WATCH / lock conflict
queue_timeout 504 true Kafka / Kombu timeout
task_timeout 504 false Celery time limit hit
invalid_json 400 false Malformed JSON body
bad_request 400 false ValueError, TypeError, etc.
undefined_reference 500 false NameError / UnboundLocalError in server code
internal_error 500 false Uncaught programming bug

unified_exception_handler

FastAPI exception handler. Logs the exception text and returns the standard error JSON.

from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from aniket_tools import unified_exception_handler

app = FastAPI()
app.add_exception_handler(HTTPException,           unified_exception_handler)
app.add_exception_handler(Exception,               unified_exception_handler)
app.add_exception_handler(RequestValidationError,  unified_exception_handler)

Use a configured ErrorHandler instance when exception logs need a custom file, rotation, date format, or log pattern:

from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from aniket_tools import ErrorHandler

app = FastAPI()
error_handler = ErrorHandler(
    file_name="logs/errors.log",
    rotate=True,
    log_format="%(asctime)s | %(filename)s | %(funcName)s | %(levelname)s | %(message)s",
    date_format="%Y-%m-%d %H:%M:%S",
)

app.add_exception_handler(HTTPException,           error_handler.unified_exception_handler)
app.add_exception_handler(Exception,               error_handler.unified_exception_handler)
app.add_exception_handler(RequestValidationError,  error_handler.unified_exception_handler)

ExceptionHandler

Route-level helper. Converts any exception into a FastAPI HTTPException so FastAPI's own handler picks it up.

from aniket_tools import ExceptionHandler

try:
    result = db.query(...)
except Exception as exc:
    ExceptionHandler(exc)

handle_exception

Returns the standard error payload directly (as JSONResponse or dict).

from aniket_tools import handle_exception

response = handle_exception(ValueError("bad id"))
payload  = handle_exception(ValueError("bad id"), as_json_response=False)  # plain dict

# With request context (extracts request_id and path automatically)
response = handle_exception(exc, request=request, meta={"trace_id": "t-1"})

explain_error / get_status_code

Quick one-liners when you only need the message or the status code.

from aniket_tools import explain_error, get_status_code

msg    = explain_error(ValueError("bad input"))    # "The request data is invalid."
status = get_status_code(ValueError("bad input"))  # 400

Standard Response Shapes

Success with pagination

{
  "success": true,
  "response_code": 200,
  "meta": {"request_id": "req-1"},
  "data": [{"id": 1, "name": "Alice"}],
  "pagination": {"page": 1, "rows": 25, "total_rows": 250}
}

Validation error (422)

{
  "success": false,
  "response_code": 422,
  "error_message": "One or more fields are invalid.",
  "error_type": "RequestValidationError",
  "meta": {"request_id": "req-1", "path": "/users"},
  "error": {
    "code": "validation_error",
    "type": "RequestValidationError",
    "message": "One or more fields are invalid.",
    "details": [
      {"type": "missing", "field": "email", "message": "field required", "source": "body"},
      {"type": "missing", "field": "page",  "message": "field required", "source": "query"}
    ]
  },
  "errors": [
    {"type": "missing", "field": "email", "message": "field required", "source": "body"}
  ]
}

Retryable error (deadlock / timeout)

{
  "success": false,
  "response_code": 409,
  "error_message": "The database could not complete the operation because of a temporary concurrency conflict.",
  "error_type": "OperationalError",
  "meta": {},
  "error": {
    "code": "database_retryable_conflict",
    "type": "OperationalError",
    "message": "The database could not complete the operation because of a temporary concurrency conflict.",
    "retryable": true,
    "details": [{"type": "database_retryable_conflict", "message": "...", "retryable": true}]
  }
}

smart_insert and smart_insert_sync

High-throughput SQLAlchemy batch inserts with dialect-aware conflict handling, retries, and circuit breaking.

  • smart_insert(...): async, uses asyncio + thread-pool workers
  • smart_insert_sync(...): synchronous, uses ThreadPoolExecutor directly

Parameters (both APIs):

Param Type Default Purpose
session_factory any required Synchronous or async sessionmaker bound to the target database
model SA mapped class required Target table model class
records list[dict] required Rows to insert; plain dicts only
batch_size int 1000 Rows per batch; auto-capped for PostgreSQL bind-parameter limits
num_workers int 1 Concurrent DB worker threads; keep <= pool_size - 2
queue_maxsize int 0 Backpressure queue limit; 0 sets it to num_workers × 3 automatically
max_retries int 2 Retry attempts for transient errors before binary-splitting the batch
conflict_columns list[str] | None None Column names for native conflict detection (ON CONFLICT DO NOTHING); None disables conflict handling
batch_timeout float 30.0 Seconds before a batch is considered hung
circuit_breaker_threshold int 10 Consecutive failures required to open the circuit breaker
circuit_breaker_cooldown float 30.0 Seconds the circuit breaker waits before probing for recovery
log_failed_records bool True Log each permanently failed record
max_logged_failures int 100 Maximum failure messages to emit; additional failures are summarised
debug_log bool True False suppresses all insert log output

Returns: InsertMetrics dict.

Key Type Meaning
total int Total records submitted
inserted_count int Rows written
skipped_count int Rows skipped due to conflict resolution
failed_count int Rows permanently rejected
retries int Total retry attempts
batches int Batches committed successfully
total_insert_time_sec float Wall-clock seconds
inserted_data_size_bytes int Estimated byte size of inserted rows
from aniket_tools import smart_insert_sync, smart_insert

# Synchronous
metrics = smart_insert_sync(
    session_factory,
    MyModel,
    records,
    batch_size=500,
    conflict_columns=["id"],
)
print(metrics["inserted_count"], metrics["skipped_count"])

# Async
metrics = await smart_insert(
    async_session_factory,
    MyModel,
    records,
    batch_size=500,
    conflict_columns=["id"],
)

smart_update and smart_update_sync

High-throughput SQLAlchemy batch updates with key-based WHERE clauses, retries, and circuit breaking.

  • smart_update(...): async
  • smart_update_sync(...): synchronous

Parameters (both APIs):

Param Type Default Purpose
session_factory any required Synchronous or async sessionmaker bound to the target database
model SA mapped class required Target table model class
records list[dict] required Rows to update; each must contain all conflict_columns plus at least one other field
conflict_columns list[str] required Column names forming the WHERE clause
batch_size int 1000 Rows per batch
num_workers int 1 Concurrent DB worker threads
queue_maxsize int 0 Backpressure queue limit; 0 sets it to num_workers × 3
max_retries int 2 Retry attempts for transient errors before binary-splitting
batch_timeout float 30.0 Seconds before a batch is considered hung
circuit_breaker_threshold int 10 Consecutive failures to open circuit breaker
circuit_breaker_cooldown float 30.0 Seconds the circuit breaker waits before recovery probe
log_failed_records bool True Log each permanently failed record
max_logged_failures int 100 Maximum failure messages to emit
debug_log bool True False suppresses all update log output

Returns: UpdateMetrics dict.

Key Type Meaning
total int Total records submitted
updated_count int Rows matched and updated
failed_count int Rows permanently rejected
retries int Total retry attempts
batches int Batches committed successfully
total_update_time_sec float Wall-clock seconds
updated_data_size_bytes int Estimated byte size sent to the database
from aniket_tools import smart_update_sync, smart_update

# Synchronous
metrics = smart_update_sync(
    session_factory,
    MyModel,
    records,
    conflict_columns=["id"],
    batch_size=500,
)
print(metrics["updated_count"], metrics["failed_count"])

# Async
metrics = await smart_update(
    async_session_factory,
    MyModel,
    records,
    conflict_columns=["id"],
)

smart_upsert and smart_upsert_sync

High-throughput SQLAlchemy batch upserts with dialect-aware conflict resolution, retries, and circuit breaking. On conflict, updates all non-conflict columns (or a custom set).

Supported dialects: PostgreSQL, MySQL, MariaDB, SQLite.

  • smart_upsert(...): async, uses asyncio + async session factory
  • smart_upsert_sync(...): synchronous, uses ThreadPoolExecutor

Parameters (both APIs):

Param Type Default Purpose
session_factory any required Sync or async sessionmaker bound to the target database
model SA mapped class required Target table model class
records list[dict] required Rows to upsert; each must contain all conflict_columns
conflict_columns list[str] required Column names identifying the unique constraint
update_columns list[str] | None None Columns to update on conflict; None = all non-conflict columns from first record
batch_size int 1000 Rows per batch; auto-capped for PostgreSQL bind-parameter limits
num_workers int 1 Concurrent DB worker threads; keep <= pool_size - 2
queue_maxsize int 0 Backpressure queue limit; 0 sets it to max(num_workers × 3, 10)
max_retries int 2 Retry attempts for transient errors before failing the batch
batch_timeout float 30.0 Seconds before a batch is considered hung
circuit_breaker_threshold int 10 Consecutive failures required to open the circuit breaker
circuit_breaker_cooldown float 30.0 Seconds the circuit breaker waits before probing for recovery
log_failed_records bool True Log each permanently failed batch
max_logged_failures int 100 Maximum failure messages to emit
debug_log bool True False suppresses all upsert log output

Returns: UpsertMetrics dict.

Key Type Meaning
total int Total records submitted
upserted_count int Rows inserted or updated
failed_count int Rows permanently rejected
retries int Total retry attempts
batches int Batches committed successfully
total_upsert_time_sec float Wall-clock seconds
upserted_data_size_bytes int Estimated byte size of rows sent to the database
from aniket_tools import smart_upsert_sync, smart_upsert

# Synchronous — insert or update on conflict
metrics = smart_upsert_sync(
    session_factory,
    Product,
    records,
    conflict_columns=["sku"],
)
print(metrics["upserted_count"], metrics["failed_count"])

# Async
metrics = await smart_upsert(
    async_session_factory,
    Product,
    records,
    conflict_columns=["sku"],
)

# Only update specific columns on conflict
metrics = smart_upsert_sync(
    session_factory,
    Product,
    records,
    conflict_columns=["sku"],
    update_columns=["price", "stock"],
)

serialize_data, register_serializer, clear_serializers

Convert SQLAlchemy query results and custom objects into plain Python dicts and lists for JSON-safe output.

serialize_data

Detects the shape of any SQLAlchemy query result and converts it to a dict or list[dict]. Passes through plain dicts unchanged.

from aniket_tools import serialize_data

# Single ORM instance
user = session.get(User, 1)
data = serialize_data(user)                   # {"id": 1, "name": "Alice", ...}

# List of ORM instances
users = session.execute(select(User)).scalars().all()
data = serialize_data(users)                  # [{"id": 1, ...}, {"id": 2, ...}]

# Raw Row from session.execute()
row = session.execute(select(User.id, User.name)).first()
data = serialize_data(row)                    # {"id": 1, "name": "Alice"}

# namedtuple
from collections import namedtuple
Point = namedtuple("Point", ["x", "y"])
serialize_data(Point(3, 4))                   # {"x": 3, "y": 4}

# None / empty — safe sentinels
serialize_data(None)                          # {}
serialize_data([])                            # []

Recognized shapes:

Input Output
None {}
[] []
Single Row dict
list[Row] list[dict]
RowMapping dict
list[RowMapping] list[dict]
ORM instance dict
list of ORM instances list[dict]
namedtuple dict
Single-element tuple scalar value
Multi-element tuple list
Unrecognized shape returned unchanged

register_serializer

Register a custom handler that runs before all built-in detection. The first matching predicate wins.

from aniket_tools import register_serializer, serialize_data
import dataclasses

@dataclasses.dataclass
class GeoPoint:
    lat: float
    lon: float

register_serializer(
    lambda d: isinstance(d, GeoPoint),
    lambda d: {"lat": d.lat, "lon": d.lon},
)

serialize_data(GeoPoint(37.7, -122.4))        # {"lat": 37.7, "lon": -122.4}

clear_serializers

Remove all registered custom serializers. Use in test teardown to prevent state leaking between tests.

from aniket_tools import clear_serializers

def teardown():
    clear_serializers()

coerce_model_data

Coerces a raw dict to match a Pydantic model's or SQLAlchemy mapped class's declared types. Returns a per-field result table describing what changed, what failed, and what was already correct.

Parameters:

Param Type Purpose
model Pydantic BaseModel subclass or SA mapped class/instance Target type contract; pass the class, not an instance
data dict[str, Any] Raw dict to coerce

Returns: ModelCoercionResult

Field Type Meaning
success bool True when all fields coerced without error
converted dict[str, Any] Coerced output dict; empty when success is False
table list[dict] One FieldRow dict per input key

FieldRow status values:

Status Meaning
"coerced" Type changed during coercion
"unchanged" Already the correct type
"failed" Coercion raised an error
"pending" Could not be re-validated after a batch ValidationError
"unknown_field" Key absent from the model
"skipped" SA column exposes no python_type
from pydantic import BaseModel
from aniket_tools import coerce_model_data

class User(BaseModel):
    id: int
    active: bool

result = coerce_model_data(User, {"id": "42", "active": "true"})
result.success       # True
result.converted     # {"id": 42, "active": True}

# Per-field breakdown
for row in result.table:
    print(row["field"], row["from_type"], "→", row["to_type"], row["status"])
# id   str → int   coerced
# active str → bool  coerced

to_dict

Converts a Pydantic model instance, SQLAlchemy ORM instance, (ModelClass, dict) tuple, or plain dict into a serialization-ready plain Python dict. Handles nested objects, Enum values, UUIDs, datetimes, Decimal, bytes, Pydantic secrets, and SQLAlchemy relationships.

Parameters (key options):

Param Type Default Purpose
data Pydantic instance, ORM instance, (cls, dict) tuple, or plain dict required Object to convert
model_class Pydantic BaseModel subclass | None None Validate data dict through this class first (alternative to tuple form)
enum_mode "value" | "name" | "keep" "value" Enum output format
uuid_mode "str" | "keep" "str" UUID output format
datetime_mode "iso" | "keep" "iso" datetime output format
decimal_mode "keep" | "str" | "float" "keep" Decimal output format
handle_secrets "redact" | "expose" | "raise" "redact" Pydantic SecretStr/SecretBytes handling
include_relationships bool False Include SQLAlchemy relationship attributes
fallback_handler Callable | None None Custom handler for unrecognized types
max_depth int 25 Maximum nesting depth
on_cycle "raise" | "skip" | "placeholder" "raise" Circular reference handling
unknown_types "raise" | "passthrough" "raise" Behaviour for unrecognized types
from pydantic import BaseModel
from aniket_tools import to_dict

class Order(BaseModel):
    id: int
    product: str

result = to_dict(Order(id=1, product="widget"))
# {"id": 1, "product": "widget"}

# Validate a raw dict through a model first
result = to_dict({"id": "42", "product": "bolt"}, model_class=Order)
# {"id": 42, "product": "bolt"}

compare_dicts

Deep value-by-value comparison of two dicts with type-aware comparators. Nested dicts recurse and emit flat dot-notation field paths. Lists of dicts use greedy unordered matching.

Parameters:

Param Type Default Purpose
main dict required Reference dict (expected baseline)
comparing dict required Dict to compare against main
numeric_tolerance float | None 0.001 Max absolute difference for numeric comparisons; None means exact
nan_equal bool False Treat two NaN values as equal
string_case_sensitive bool True False uses casefold comparison
whitespace_normalize bool False Collapse internal whitespace runs before comparing strings
datetime_date_only bool False Compare only the date portion of datetime values
datetime_precision "second" | "minute" | "hour" | "day" | None None Truncate datetimes to this precision before comparing
enum_by "value" | "name" "value" Compare Enum members by value or name
same_type bool False Require Enum members and objects to be the same class

Returns: DictDiffResult (CompareResult)

Field Type Meaning
match bool True when every field comparison matched
fields list[FieldResult] One entry per compared key; nested diffs use dot-notation paths
summary CompareSummary Aggregated field counts
from aniket_tools import compare_dicts

result = compare_dicts(
    {"id": 1, "price": 10.001, "name": "Alice"},
    {"id": 1, "price": 10.002, "name": "alice"},
    numeric_tolerance=0.01,
    string_case_sensitive=False,
)
result.match   # True — both differences fall within tolerance/case rules

# Nested dict — diff flattened to dot-notation
result = compare_dicts(
    {"user": {"id": 1, "city": "NYC"}},
    {"user": {"id": 1, "city": "LA"}},
)
result.match   # False
result.fields[0].field   # "user.city"

# Summary counts
result.summary.mismatched_fields   # 1
result.summary.matched_fields      # 1

compare_values

Single-value comparison using the same type-aware rules as compare_dicts. Accepts the same tolerance and mode parameters.

from aniket_tools import compare_values

compare_values(1, 1.0005)                          # True  (within default tolerance 0.001)
compare_values("Alice", "alice", string_case_sensitive=False)  # True
compare_values(float("nan"), float("nan"), nan_equal=True)     # True

register_comparator and clear_comparators

Register a custom comparison function for a specific Python type. The registered function runs before all built-in dispatch when both values are instances of the registered type.

from aniket_tools import register_comparator, clear_comparators, compare_dicts
from decimal import Decimal

# Custom comparator: Decimal exact match only
register_comparator(Decimal, lambda a, b: a == b)

result = compare_dicts({"price": Decimal("10.0")}, {"price": Decimal("10.01")})
result.match   # False — custom comparator bypasses tolerance

# Test teardown — prevent state leaking between tests
def teardown():
    clear_comparators()

align_types_by_expected

Recursively aligns the types of actual to match expected, logging every conversion applied. Useful before calling compare_dicts when input data may have string-encoded numerics or mixed date/datetime values.

from aniket_tools import align_types_by_expected

expected = {"id": 1, "price": Decimal("10.5")}
actual   = {"id": "1", "price": "10.5"}

_, aligned, conversions = align_types_by_expected(expected, actual)
aligned       # {"id": 1, "price": Decimal("10.5")}
conversions   # [TypeConversionResult(path="id", from_type="str", to_type="int", ...), ...]

coerce_actual_to_expected_type

Single-value coercion — converts actual to the same type as expected and returns (converted, changed_flag, note).

from aniket_tools import coerce_actual_to_expected_type

converted, changed, note = coerce_actual_to_expected_type(1, "42")
# (42, True, "Coerced actual value to int using expected type.")

retry, with_retry, async_with_retry, RetryConfig

Exponential-backoff retry with full jitter for any sync or async callable. Supports decorator, callable-wrapper, and pre-built config forms.

RetryConfig

Reusable retry policy dataclass.

Field Type Default Purpose
max_retries int 3 Max retry attempts after the first failure; 0 = call once, raise on failure
on tuple[type[Exception], ...] (Exception,) Exception types that trigger a retry
base_delay float 0.1 Starting sleep duration in seconds
backoff_factor float 2.0 Multiplier applied to delay on each attempt
max_delay float 30.0 Upper bound on computed delay before jitter
jitter bool True Full jitter: random.uniform(0, computed_cap) — prevents thundering herd
predicate Callable | None None Extra callable returning True when retryable; both type check and predicate must pass
on_retry Callable | None None Called before each sleep with (attempt, exc, delay); may raise to abort

retry — decorator

Works on both sync and async functions. Detected automatically.

from aniket_tools import retry

# Sync
@retry(max_retries=3, on=(IOError,), base_delay=0.1)
def read_file(path: str) -> str:
    return open(path).read()

# Async (same decorator — auto-detected)
@retry(max_retries=3, on=(ConnectionError,))
async def fetch(url: str) -> dict:
    return await http_client.get(url)

# No-parens form
@retry
def connect():
    ...

with_retry — sync callable wrapper

from aniket_tools import with_retry

result = with_retry(
    lambda: redis_client.get("key"),
    max_retries=3,
    on=(ConnectionError,),
    base_delay=0.05,
)

async_with_retry — async callable wrapper

from aniket_tools import async_with_retry

result = await async_with_retry(
    lambda: producer.send(msg),
    max_retries=2,
    on=(KafkaError,),
)

Pre-built RetryConfig

from aniket_tools import RetryConfig, with_retry

# Reuse the same policy across multiple call sites
db_retry = RetryConfig(
    max_retries=3,
    on=(OperationalError,),
    predicate=lambda exc: "deadlock" in str(exc).lower(),
    base_delay=0.1,
    backoff_factor=2.0,
)

result = with_retry(lambda: session.execute(stmt), config=db_retry)

# on_retry hook — log before sleeping
def log_retry(attempt: int, exc: Exception, delay: float) -> None:
    logs(f"Retry {attempt + 1}", context={"error": str(exc), "delay": delay}, type="warning")

cfg = RetryConfig(max_retries=3, on=(IOError,), on_retry=log_retry)
with_retry(lambda: risky_call(), config=cfg)

CircuitBreaker and CircuitBreakerOpen

Three-state circuit breaker that stops cascading failures by fast-failing calls when an upstream dependency is consistently failing.

States: closed (normal) → open (fast-failing) → half-open (probing recovery) → closed

Constructor parameters:

Param Type Default Purpose
threshold int 10 Consecutive failures required to open the circuit
cooldown float 30.0 Seconds to wait before allowing a probe; 0.0 = probe immediately
on_state_change Callable[[str, str], None] | None None Callback fired with (old_state, new_state) on every transition
from aniket_tools import CircuitBreaker, CircuitBreakerOpen

cb = CircuitBreaker(threshold=5, cooldown=10.0)

# Sync call
try:
    result = cb.call(lambda: requests.get("https://api.example.com/data"))
except CircuitBreakerOpen:
    result = fallback_response()       # circuit is open — skip the call entirely

# Async call
try:
    result = await cb.async_call(lambda: fetch_from_upstream())
except CircuitBreakerOpen:
    result = cached_value

# State inspection
print(cb.state)                        # "closed" | "open" | "half_open"
print(cb.consecutive_failures)         # int — resets to 0 on any success

# State-change callback (useful for alerting / metrics)
def on_change(old: str, new: str) -> None:
    logs(f"Circuit: {old}{new}", type="warning")

cb = CircuitBreaker(threshold=5, cooldown=30.0, on_state_change=on_change)

# Reset — discards all failure history (intended for test isolation)
cb.reset()

CircuitBreakerOpen is raised when a call is rejected because the circuit is open and the cooldown has not elapsed. Catch it to return a fallback or cached response.


Code Structure

src/aniket_tools/
  __init__.py                      ← public exports
  http/
    responses.py                   ← create_response, value_correction, PaginationRes
  observability/
    logging.py                     ← logs, get_logger, configure_logging, reset_logging_config, SUPPORTED_LOG_TYPES
    timing.py                      ← log_timing, log_timing_block
  exceptions/
    core.py                        ← ApiError, ErrorHandler, unified_exception_handler
    handlers/
      base.py                      ← ErrorInfo dataclass, message helpers
      api_http_validation.py       ← FastAPI/Starlette/Pydantic
      database_family.py           ← all SQL and MongoDB drivers
      http_auth_cloud_family.py    ← requests, httpx, aiohttp, urllib3, PyJWT, botocore
      cache_queue_family.py        ← Redis, Kafka, Celery, Kombu
      data_tool_family.py          ← Pandas, NumPy, PyArrow, Polars, SciPy
      python_family.py             ← stdlib builtins, asyncio, ssl, socket
  database/
    insertion.py                   ← smart_insert, smart_insert_sync, InsertMetrics
    updatetion.py                  ← smart_update, smart_update_sync, UpdateMetrics
  utils/
    model_coercer.py               ← coerce_model_data, ModelCoercionResult, to_dict
    dict_compare.py                ← compare_dicts, compare_values, align_types_by_expected, coerce_actual_to_expected_type, DictDiffResult, register_comparator, clear_comparators
    compat.py                      ← optional import helpers

Safe Editing Rules

  • Add specific exception checks before generic ones (e.g. redis.TimeoutError before Python TimeoutError)
  • Keep message logic in _database_message(...), status logic in _database_status(...)
  • Keep JSON shape logic in build_payload(...) or create_response(...)
  • If you add a new public function, also export it from __init__.py
  • create_response(...) is for normal route returns; unified_exception_handler(...) is for exceptions — they are separate code paths

HTML Output References

File Shows
result/logging_results.html All logs() types and options with rendered output
result/responses_results.html All create_response and value_correction use cases
result/exceptions_results.html All ErrorHandler / ApiError exception families and payloads

Feature Overview

  • HTTP response standardization (create_response, PaginationRes, value_correction)
  • Unified exception translation (ErrorHandler, ApiError, unified_exception_handler)
  • Observability utilities (logs, get_logger, configure_logging, timing helpers)
  • Database batch write helpers (smart_insert, smart_update, smart_upsert)
  • Data normalization and comparison (coerce_model_data, compare_dicts)
  • Reliability primitives (retry, with_retry, CircuitBreaker)

Function Catalog

Function / Class Description Why Use It Input Output Example
create_response Standard success/error API payload builder Keep API responses consistent status code + payload parts JSONResponse or dict create_response(200, data={"ok": True})
value_correction Recursive JSON-safe value normalization Clean/serialize mixed Python values any Python object normalized value value_correction({"amt": Decimal("1.23")})
PaginationRes Typed pagination metadata Avoid malformed pagination blocks page, rows, total_rows dataclass instance PaginationRes(page=1, rows=10, total_rows=40)
logs Unified logging helper One API for text/json/query/table logging message + log options log side effects logs("started", type="info")
get_logger Configured logger factory Reuse shared logger settings logger configuration args logging.Logger get_logger("service", json=True)
configure_logging Root logging setup helper Configure process-wide logging once global logging options logging.Logger configure_logging(level="INFO")
reset_logging_config Restore factory logging defaults Test teardown after configure_logging none none reset_logging_config()
log_timing Execution-time decorator Track function latency function + label/options wrapped callable @log_timing("fetch users")
log_timing_block Inline timing context manager Track specific code blocks label + options context manager with log_timing_block("query"):
ApiError Controlled API error type Raise typed user-safe errors message, status, code, details exception raise ApiError("Not found", 404)
ErrorHandler Exception classifier and payload builder Convert unknown exceptions safely exception (+ request/meta) structured payload/response ErrorHandler().handle_exception(exc)
unified_exception_handler FastAPI-compatible exception handler Plug into app.add_exception_handler (request, exc) JSONResponse app.add_exception_handler(Exception, unified_exception_handler)
ExceptionHandler Backward-compatible exception wrapper Keep legacy call sites working exception raises/handles ExceptionHandler(exc)
handle_exception Convenience wrapper around handler Translate exception without manual setup exception (+ request/meta) response object handle_exception(exc)
explain_error Human-readable error description Debug and diagnostics exception str explain_error(exc)
get_status_code Suggested HTTP status lookup Consistent status mapping exception int get_status_code(exc)
smart_insert / smart_insert_sync Batch SQLAlchemy insert helpers High-throughput inserts with retries session/model/records insert metrics smart_insert_sync(sf, User, rows)
smart_update / smart_update_sync Batch SQLAlchemy update helpers Efficient key-based updates session/model/records/keys update metrics smart_update_sync(sf, User, rows, ["id"])
smart_upsert / smart_upsert_sync Batch SQLAlchemy upsert helpers Insert-or-update at scale session/model/records/conflict keys upsert metrics smart_upsert_sync(sf, User, rows, ["id"])
register_serializer Register custom serializer Support custom domain objects Python type + serializer fn None register_serializer(UUID, str)
clear_serializers Reset serializer registry Test isolation / config reload none None clear_serializers()
serialize_data Serialize nested objects/ORM rows Convert mixed values before JSON/logging any nested value serialized value serialize_data(payload)
coerce_model_data Per-field coercion by model schema Normalize external input reliably model + raw dict/list ModelCoercionResult coerce_model_data(UserModel, row)
to_dict Convert model/ORM instance to plain dict Serialize structured objects for JSON Pydantic/ORM/dict dict to_dict(user_instance)
compare_dicts Deep dict comparison engine Auditing/testing data parity expected dict + actual dict DictDiffResult compare_dicts(expected, actual)
compare_values Single-value type-aware comparison Quick value equality check two values + options bool compare_values(1, 1.0005)
register_comparator Register custom type comparator Override comparison for a type Python type + fn none register_comparator(Decimal, fn)
clear_comparators Reset comparator registry Test isolation / config reload none none clear_comparators()
align_types_by_expected Recursive expected-type alignment Pre-normalize before comparison expected + actual aligned values + conversions align_types_by_expected(exp, act)
coerce_actual_to_expected_type Single-value expected-type coercion Lightweight type correction expected + actual value (value, changed, note) coerce_actual_to_expected_type(1, "1")
retry / with_retry / async_with_retry Retry decorators/helpers Handle transient failures callable + retry config return value or raised error @retry(max_attempts=3)
RetryConfig Retry policy model Reusable retry settings retry options dataclass instance RetryConfig(max_attempts=5)
CircuitBreaker / CircuitBreakerOpen Failure-threshold breaker Stop cascading upstream failures thresholds + protected call protected execution / exception breaker.call(fn)

Module Overview

Module Purpose Documentation
src/aniket_tools/http/responses.py Response schema + normalization research/http/responses/README.md
src/aniket_tools/exceptions/core.py Exception abstraction + payload mapping research/exceptions/core/README.md
src/aniket_tools/observability/logging.py Structured logging helpers research/observability/logging/README.md
src/aniket_tools/observability/timing.py Timing decorators/context managers research/observability/timing/README.md
src/aniket_tools/database/insertion.py Batch insert operations research/database/insertion/README.md
src/aniket_tools/database/updatetion.py Batch update operations research/database/updatetion/README.md
src/aniket_tools/database/upsert.py Batch upsert operations research/database/upsert/README.md
src/aniket_tools/database/serializer.py Serialization registry + conversion research/database/serializer/README.md
src/aniket_tools/utils/model_coercer.py Model-driven data coercion research/utils/model_coercer/README.md
src/aniket_tools/utils/dict_compare.py Deep comparison + type alignment research/utils/dict_compare/README.md
src/aniket_tools/utils/compat.py Optional import/type compatibility helpers research/utils/compat/README.md
src/aniket_tools/reliability/retry.py Retry utilities research/reliability/retry/README.md
src/aniket_tools/reliability/circuit_breaker.py Circuit breaker state machine research/reliability/circuit_breaker/README.md

Return Structure Reference

  • create_response(...) success payload: success, response_code, meta, data, optional pagination
  • create_response(...) error payload: success, response_code, error_message, meta, error, errors
  • ErrorHandler.build_payload(...): same public envelope as error response + retry hints/details
  • compare_dicts(...) -> DictDiffResult: match, fields, summary
  • coerce_model_data(...) -> ModelCoercionResult: coerced rows plus per-field conversion metadata/errors
  • smart_insert/smart_insert_sync(...) -> InsertMetrics: total, inserted_count, skipped_count, failed_count, retries, batches, total_insert_time_sec, inserted_data_size_bytes
  • smart_update/smart_update_sync(...) -> UpdateMetrics: total, updated_count, failed_count, retries, batches, total_update_time_sec, updated_data_size_bytes
  • smart_upsert/smart_upsert_sync(...) -> UpsertMetrics: total, upserted_count, failed_count, retries, batches, total_upsert_time_sec, upserted_data_size_bytes
  • RetryConfig: dataclass — max_retries, on, base_delay, backoff_factor, max_delay, jitter, predicate, on_retry

Edge Cases

  • Empty input collections ([], {}) in comparison and DB helpers
  • None, NaN, and Inf handling in response normalization
  • Missing keys / key-case differences in dict comparison
  • Invalid input types for serializer and coercion APIs
  • Nested dict/list/set/object coercion paths
  • Large batch sizes and retryable database failures

FAQ

  1. Should I use create_response inside exception handlers? Use unified_exception_handler for exceptions and create_response for normal route returns.
  2. Can I return plain dictionaries instead of JSONResponse? Yes, set as_json_response=False in create_response.
  3. Which APIs are safe without FastAPI installed? Most utilities are framework-agnostic; FastAPI integration points are optional.
  4. How do I extend serialization for custom types? Register a converter with register_serializer(MyType, fn).
  5. How do I reduce logging overhead? Use sample_rate, structured context, and file rotation as needed.

Version Compatibility

  • Python: >=3.9 (from pyproject.toml)
  • Optional dependency groups:
    • fastapi: fastapi>=0.110
    • pydantic: pydantic>=1.10
    • database: sqlalchemy>=1.4, asyncpg>=0.29, psycopg2-binary>=2.9

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aniket_tools-0.1.7.tar.gz (140.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aniket_tools-0.1.7-py3-none-any.whl (111.4 kB view details)

Uploaded Python 3

File details

Details for the file aniket_tools-0.1.7.tar.gz.

File metadata

  • Download URL: aniket_tools-0.1.7.tar.gz
  • Upload date:
  • Size: 140.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for aniket_tools-0.1.7.tar.gz
Algorithm Hash digest
SHA256 3ff33bf50a790c2f0dcc863a441b144de301cf0cedb7076cc03234b8a7f6d51a
MD5 7d27b1dc299e3f8567ec0dc5fafe1662
BLAKE2b-256 249e4ed5219da85ced38294d4d328258e0c13237d8f49f1610ef9eff55ccaca0

See more details on using hashes here.

Provenance

The following attestation bundles were made for aniket_tools-0.1.7.tar.gz:

Publisher: publish-pypi.yml on aniketmodi123/reusable_code_lib

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aniket_tools-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: aniket_tools-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 111.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for aniket_tools-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 252288d659566cfb657514832e7e0388cff56bb12e516a6c159c4522c3251a68
MD5 478733a6c328c9761ef24d7d5147bc2a
BLAKE2b-256 54ba488c95f68a6bdfc160d29aac6fb6c5a2e70b8a7cac606336c926344c6f9a

See more details on using hashes here.

Provenance

The following attestation bundles were made for aniket_tools-0.1.7-py3-none-any.whl:

Publisher: publish-pypi.yml on aniketmodi123/reusable_code_lib

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page