No project description provided
Project description
Keble helpers
Just a collection of helper functions used by keble project.
Shared Typings
keble-helpers owns package-neutral shared enums and value objects that need to stay stable across backend packages.
MarketplaceLanguageCommerceEntityType
CommerceEntityType is the canonical cross-package enum for commerce entities:
BRAND_IDENTITYBRAND_MENTIONCATEGORYPRODUCTLISTINGSKU
Display helpers on CommerceEntityType are part of the shared contract as well. upper_snake_to_title() must preserve known acronyms such as SKU instead of degrading them to title-cased words.
Aliyun
The Aliyun module provides helpers for interacting with Alibaba Cloud (Aliyun) services.
Base Classes
Aliyun
__init__(*, access_key: str, secret: str): Initialize with Aliyun credentials
OSS (Object Storage Service)
AliyunOss
__init__(oss_endpoint: AnyHttpUrl, bucket: str, **kwargs): Initialize OSS clientget_bucket() -> oss2.Bucket: Get OSS bucket instanceget_bucket_with_sts(sts_token: str): Get bucket with STS tokenget_object_meta(key: str) -> AliyunOssMeta: Get object metadatasave_object_to_local(key: str, local_path: str, *args, **kwargs): Download file from OSSsave_local_to_cloud(key: str, local_path: str, *args, **kwargs): Upload file to OSSsave_snapshot_to_local(key: str, local_path: str, seconds: int): Get video snapshotcold_archive_object(key: str): Convert object to cold archive storage classget_sts_signed_url(sts_token: str, key: str, *, expire_seconds: int = 60, content_type: Optional[str] = None, oss_storage_class: Optional[str] = None) -> str: Generate signed URL with STS
STS (Security Token Service)
AliyunSts
__init__(region, **kwargs): Initialize STS clientget_sts(session_name: str, role_arn: str) -> AliyunStsToken: Get STS token
Schemas
AliyunOssPutObjectResponse
status: int: Response statusrequest_id: str: Request IDetag: str: ETagheaders: dict: Response headers
AliyunStsToken
access_key_secret: str: Access key secretsecurity_token: str: Security tokenaccess_key_id: str: Access key ID
AliyunOssMeta
etag: Optional[str]: OSS ETagcontent_length: Optional[int]: File size in byteslast_modified: Optional[int]: Last modified timestampcontent_type: Optional[str]: MIME type of the file
Usage Examples
# Initialize Aliyun OSS
oss_client = AliyunOss(
oss_endpoint="https://oss-cn-beijing.aliyuncs.com",
bucket="your-bucket-name",
access_key="your-access-key-id",
secret="your-access-key-secret"
)
# Upload file to OSS
response = oss_client.save_local_to_cloud(
key="path/in/oss/file.txt",
local_path="/local/path/to/file.txt"
)
# Get file metadata
meta = oss_client.get_object_meta("path/in/oss/file.txt")
# Download file from OSS
oss_client.save_object_to_local(
key="path/in/oss/file.txt",
local_path="/local/path/to/download.txt"
)
# Get STS token
sts_client = AliyunSts(
region="cn-beijing",
access_key="your-access-key-id",
secret="your-access-key-secret"
)
sts_token = sts_client.get_sts(
session_name="session-name",
role_arn="acs:ram::your-account-id:role/your-role-name"
)
# Generate signed URL with STS token
signed_url = oss_client.get_sts_signed_url(
sts_token=sts_token.security_token,
key="path/in/oss/file.txt",
expire_seconds=3600
)
Progress
The Progress module provides a Redis-based task tracking system to monitor the progress of multi-stage operations.
Base Classes
ProgressHandler
__init__(redis: Redis): Initialize with Redis connectionnew(*, key: str, model_key: str | None = None) -> ProgressTask: Create a new progress taskget(*, key: str) -> ProgressReport | None: Retrieve progress report by key
ProgressTask
__init__(redis: Optional[Redis] = None, key: Optional[str] = None, model_key: Optional[str] = None, root: Optional["ProgressTask"] = None): Initialize a progress tasknew_subtask() -> ProgressTask: Create a subtask under this tasksuccess(): Mark task as successfulfailure(error: Optional[str] = None): Mark task as failedset_message(message: Optional[str]): Set a message for the taskget_from_redis(redis: Redis, *, key: str) -> Optional["ProgressTask"]: Class method to retrieve a task from Redisget_prebuilt_subtasks_model(root: "ProgressTask", redis: Redis, *, model_key: str) -> List["ProgressTask"]: Class method to get prebuilt subtasks
Schemas
ProgressTaskStage
Enum with the following values:
PENDING: Task is in progressSUCCESS: Task completed successfullyFAILURE: Task failed
ProgressReport
progress_key: Optional[str]: Key used to store progress in Redisprogress: float: Completion percentage (0.0 to 1.0)is_root_success: bool: Whether the root task is successfulsuccess: int: Number of successful tasksfailure: int: Number of failed taskspending: int: Number of pending tasksassigned: int: Number of assigned taskstotal: int: Total number of tasksmessage: Optional[str]: Optional messageerrors: List[str]: List of error messages
Usage Examples
import uuid
from redis import Redis
from keble_helpers import ProgressHandler
# Initialize Redis connection
redis = Redis(host='localhost', port=6379, db=0)
# Create a progress handler
handler = ProgressHandler(redis=redis)
# Create a new progress task
task_key = str(uuid.uuid4())
task = handler.new(key=task_key)
# Create subtasks
subtask1 = task.new_subtask()
subtask2 = task.new_subtask()
subtask3 = task.new_subtask()
# Mark tasks as complete or failed
subtask1.success()
subtask2.failure(error="Something went wrong")
subtask3.success()
task.success()
# Get progress report
report = handler.get(key=task_key)
print(f"Progress: {report.progress * 100}%")
print(f"Success: {report.success}, Failure: {report.failure}, Pending: {report.pending}")
# Using model_key for prebuilt subtasks
model_key = str(uuid.uuid4())
root_task = handler.new(key=str(uuid.uuid4()), model_key=model_key)
# When you create a new task with the same model_key,
# it will have the same number of subtasks
new_task = handler.new(key=str(uuid.uuid4()), model_key=model_key)
Pydantic
The Pydantic module provides helpers and utilities for working with Pydantic models.
Functions
is_http_url(url: Any) -> bool: Validates if a string is a valid HTTP or HTTPS URL by checking if it has a valid HTTP/HTTPS scheme and netloc
Base Classes
PydanticModelConfig
default_dict(**kwargs) -> dict: Returns a dictionary with default configurationdefault(**kwargs) -> ConfigDict: Returns a ConfigDict with default configuration
CloudStorageBase
- Base model for cloud storage objects with standardized fields
Enums
CloudStorageType
AWS_S3: Amazon S3 storageALIYUN_OSS: Alibaba Cloud OSS storage
CloudStorageObjectType
IMAGE: Image filesVIDEO: Video filesEXCEL: Excel spreadsheetsCSV: CSV filesOTHER: Other file typesdetermine_type(*, mime: str) -> CloudStorageObjectType: Determine type from MIME
Usage Examples
from keble_helpers.pydantic import CloudStorageBase, CloudStorageObjectType, CloudStorageType
from keble_helpers.pydantic.schemas import is_http_url
from pydantic import BaseModel
# Check if a URL is valid HTTP/HTTPS
valid = is_http_url("https://example.com") # True
valid = is_http_url("ftp://example.com") # False
valid = is_http_url("example.com") # False (missing scheme)
valid = is_http_url("http://") # False (missing netloc)
# Create a custom model with Pydantic configuration
class MyModel(BaseModel):
model_config = PydanticModelConfig.default()
# Fields go here
# Create a cloud storage object
storage = CloudStorageBase(
key="path/to/file.jpg",
base_url="https://example.com/storage",
type=CloudStorageType.AWS_S3,
object_type=CloudStorageObjectType.IMAGE,
original_file_name="photo.jpg"
)
# Determine object type from MIME
object_type = CloudStorageObjectType.determine_type(mime="image/jpeg")
Common
The Common module provides general utility functions for common tasks.
Functions
String and ID Utilities
id_generator() -> str: Generate a UUID4 stringgenerate_random_string(length: int = 32, *, lower: bool = True, upper: bool = True, digit: bool = True) -> str: Generate a random stringhash_string(arg: str) -> str: Generate MD5 hash of a stringinline_string(string: str, max_len: int = 30): Format a string for inline display
Pydantic Helpers
is_pydantic_field_empty(obj: BaseModel, field: str) -> bool: Check if a field is empty in a Pydantic model
Date and Time
date_to_datetime(d: date) -> datetime: Convert a date to datetimedatetime_to_date(d: datetime) -> date: Convert a datetime to date
List and Collection Operations
slice_to_list(items: List[Any], slice_size: int) -> List[List[Any]]: Split a list into chunksget_first_match(items: list, key_fn, value): Find first item in a list matching a criterion
File System Operations
ensure_has_folder(path: str) -> str: Create a directory if it doesn't existyield_files(folder: str) -> Iterator[str | Path]: Recursively yield files in a directoryget_files(folder: str) -> List[str | Path]: Get a list of all files in a directoryzip_dir(folder: Path | str, zip_filepath: Path | str): Zip a directoryremove_dir(dir: Path | str): Remove a directory
MIME Type Checking
is_mime_prefix_in(mime, mime_start: List[str]): Check if a MIME type has a specific prefixis_mime_image(mime: str): Check if a MIME type is an imageis_mime_video(mime: str): Check if a MIME type is a videois_mime_audio(mime: str): Check if a MIME type is audiois_mime_media(mime: str): Check if a MIME type is any media (image, video, audio)is_mime_ms_excel(mime: str): Check if a MIME type is MS Excelis_mime_csv(mime: str): Check if a MIME type is CSV
Usage Examples
from keble_helpers.common import (
id_generator, hash_string, ensure_has_folder, get_files,
is_mime_image, slice_to_list
)
# Generate a unique ID
unique_id = id_generator()
# Generate a hash of a string
file_hash = hash_string("content to hash")
# Ensure a directory exists
path = ensure_has_folder("/path/to/directory")
# Get all files in a directory
files = get_files("/path/to/directory")
# Check if a MIME type is an image
is_image = is_mime_image("image/jpeg") # True
# Split a list into chunks of size 3
chunks = slice_to_list([1, 2, 3, 4, 5, 6, 7], 3) # [[1, 2, 3], [4, 5, 6], [7]]
DateTime
The DateTime module provides utilities for working with dates and times.
Functions
days_in_month(year, month): Get the number of days in a specific month
Usage Examples
from keble_helpers.datetime import days_in_month
# Get days in February 2024 (leap year)
days = days_in_month(2024, 2) # 29
# Get days in February 2023 (non-leap year)
days = days_in_month(2023, 2) # 28
Enum
The Enum module provides predefined enumerations.
Enums
Environment
development: Development environmenttest: Test environmentproduction: Production environment
Usage Examples
from keble_helpers.enum import Environment
# Use environment enum
current_env = Environment.development
# Check environment
if current_env == Environment.production:
# Production-specific code
pass
FastAPI
The FastAPI module provides helpers for working with FastAPI applications, focusing on JSON encoding compatible with Pydantic v2.
Functions
jsonable_encoder(obj: Any, include: Optional[IncEx] = None, exclude: Optional[IncEx] = None, by_alias: bool = True, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, custom_encoder: Optional[Dict[Any, Callable[[Any], Any]]] = None, sqlalchemy_safe: bool = True) -> Any: Convert a Python object to a JSON-compatible object
Constants
PYDANTIC_V2: Boolean indicating if Pydantic v2 is in useENCODERS_BY_TYPE: Dictionary mapping Python types to encoder functions
Usage Examples
from keble_helpers.fastapi import jsonable_encoder
from pydantic import BaseModel
from datetime import datetime
class User(BaseModel):
id: int
name: str
created_at: datetime
updated_at: datetime | None = None
user = User(id=1, name="John Doe", created_at=datetime.now())
# Convert to JSON-compatible dict
json_data = jsonable_encoder(user)
# Convert excluding some fields
json_data = jsonable_encoder(user, exclude={"created_at"})
# Convert with custom encoders
json_data = jsonable_encoder(
user,
custom_encoder={datetime: lambda dt: dt.strftime("%Y-%m-%d")}
)
File
The File module provides utilities for file operations, particularly for downloading files.
Functions
adownload_file(*, url: str, folder: Path, filename: str) -> Path: Asynchronously download a file from a URL
Usage Examples
import asyncio
from pathlib import Path
from keble_helpers.file import adownload_file
async def download_example():
# Download a file
file_path = await adownload_file(
url="https://example.com/file.pdf",
folder=Path("/path/to/downloads"),
filename="document.pdf"
)
print(f"Downloaded to: {file_path}")
# Run the async function
asyncio.run(download_example())
Multithread (Deprecated)
Note: This module is deprecated. The project now uses async-based approaches instead of multithreading.
The Multithread module provides utilities for thread management and parallel execution.
Classes
ThreadController
__init__(thread_size: int): Initialize with a maximum number of threadscreate_thread(target: Callable, *, args: Optional[tuple] = None, kwargs: Optional[Dict[str, Any]] = None, thread_owner: Optional[str | int] = None, disable_sema: Optional[bool] = False, join: Optional[bool] = False): Create and start a new threadacquire(*, thread_owner: Optional[str | int] = None): Acquire a semaphorerelease(*, thread_owner: Optional[str | int] = None): Release a semaphorewait_all_to_finish(): Wait for all threads to completewait_owner_to_finish(thread_owner: str | int): Wait for all threads by a specific owner to complete
Decorators
threaded(*, sema: Optional[Semaphore] = None, join: Optional[bool] = False): Decorator to run a function in a separate thread
Usage Examples
from keble_helpers import ThreadController, threaded
from threading import Semaphore
# Using ThreadController
controller = ThreadController(thread_size=5)
def task(results):
# Perform task
results.append("Task completed")
controller.release()
results = []
for _ in range(10):
controller.create_thread(target=task, args=(results,))
controller.wait_all_to_finish()
# Using threaded decorator
sema = Semaphore(3)
@threaded(sema=sema)
def background_task(results):
results.append("Background task completed")
sema.release()
threads = []
results = []
for _ in range(5):
threads.append(background_task(results))
for thread in threads:
thread.join()
NumPy Utils
The NumPy Utils module provides helper functions for working with NumPy arrays and handling numerical values.
Functions
is_invalid_float(value: Optional[float]) -> bool: Check if a float value is NaN or infinityguard_invalid_float(value: float | None | np.floating) -> float | None: Replace invalid float values (NaN, inf) with None
Usage Examples
import numpy as np
from keble_helpers.np_utils import is_invalid_float, guard_invalid_float
# Check if a value is an invalid float
invalid = is_invalid_float(float('nan')) # True
invalid = is_invalid_float(float('inf')) # True
invalid = is_invalid_float(42.0) # False
# Guard against invalid floats
safe_value = guard_invalid_float(np.nan) # None
safe_value = guard_invalid_float(np.inf) # None
safe_value = guard_invalid_float(42.0) # 42.0
safe_value = guard_invalid_float(np.float32(3.14)) # 3.14
Pydantic AI image runtime
keble_helpers.ai.image_runtime provides reusable multimodal runtime hardening helpers for any pydantic-ai callsite that may include typed ImageUrl parts.
Key exports:
ImagePromptChecker: bounded-concurrency URL preflight with TTL cacheImagePreflightDecision,ImagePreflightBatchResult,ImagePreflightReason: typed preflight decisionsextract_image_urls(...),strip_image_url_parts(...)is_image_url_model_404_error(...)arun_with_image_fallback(...): preflight + image-404 text-only fallback retry (tenacitywithreraise=True)typed_tool(...),typed_tool_plain(...): additive wrappers aroundagent.tool(...)/agent.tool_plain(...)that preserve the decorated callable type for downstream code
Ownership model:
- host applications should instantiate one shared
ImagePromptChecker - downstream libs should accept that instance and reuse it
- retry policy and preflight policy live on the same checker instance
- libs should not construct hidden per-module checker objects with divergent settings
Status policy (ImagePromptChecker):
only_accepts: list[int] | Nonerejects: list[int] | None- provide only one of them (
ValueErrorif both are provided) - when both are omitted, default behavior is
rejects=[404] - probe requests do not auto-follow redirects;
3xxstatus codes are surfaced to policy evaluation directly
Usage:
from keble_helpers import ImagePromptChecker, arun_with_image_fallback
checker = ImagePromptChecker(
enabled=True,
timeout_secs=2.0,
max_concurrency=4,
cache_ttl_secs=600,
image_model_404_retry_attempts=1,
only_accepts=[200, 206], # strict image status policy
)
result = await arun_with_image_fallback(
agent=agent,
prompt=prompt_parts, # Sequence[UserContent] with optional ImageUrl
image_prompt_checker=checker,
)
output = result.output # preserves the agent's concrete output type
# or call through the checker directly
result = await checker.arun_with_image_fallback(
agent=agent,
prompt=prompt_parts,
)
Typed tool registration:
from pydantic_ai import Agent
from pydantic_ai.tools import RunContext
from keble_helpers import typed_tool, typed_tool_plain
agent = Agent("test", deps_type=int, output_type=str)
@typed_tool(agent, require_parameter_descriptions=True)
async def repeat(ctx: RunContext[int], word: str) -> str:
"""Repeat a word.
Args:
word: Word to repeat.
"""
return f"{ctx.deps}:{word}"
@typed_tool_plain(agent, name="slugify")
def slugify(name: str) -> str:
return name.strip().lower().replace(" ", "-")
Contract:
- Keep domain models/validators pure (no network I/O in validators).
- Apply runtime preflight right before model invocation.
- Keep retry ownership on the injected checker instead of separate function kwargs.
- Preserve terminal exception surfaces (
reraise=True) for upstream task/error layers.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file keble_helpers-1.12.1.tar.gz.
File metadata
- Download URL: keble_helpers-1.12.1.tar.gz
- Upload date:
- Size: 244.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f32b8882cd0f51d80e8ab2e026e45c12fbce903806d442d673a1c92af2504045
|
|
| MD5 |
11acd3701f4e22e0d57e26d1fc75f72b
|
|
| BLAKE2b-256 |
2d8c6567074fb414095c70636000ecb6ff0772528e1e707d2152cf30026ed08c
|
File details
Details for the file keble_helpers-1.12.1-py3-none-any.whl.
File metadata
- Download URL: keble_helpers-1.12.1-py3-none-any.whl
- Upload date:
- Size: 48.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0cdd0ff8dce6e6b2ba7019348632c0c68027b71e1c3d485575357b98f4a7acf2
|
|
| MD5 |
2c95e12cf6a05fb42a86606802b30d94
|
|
| BLAKE2b-256 |
8e544a08dabd945585b2a0156cdbd3ddd90ca7b5bf487df676ff035828302ce3
|