Skip to main content

Fast and easy-to-use package for data science

Project description

Speedy Utils

PyPI Python Versions License

Speedy Utils is a Python utility library designed to streamline common programming tasks such as caching, parallel processing, file I/O, and data manipulation. It provides a collection of decorators, functions, and classes to enhance productivity and performance in your Python projects.

🚀 Recent Updates (January 27, 2026)

Enhanced Error Handling in Parallel Processing:

  • Rich-formatted error tracebacks with code context and syntax highlighting
  • Three error handling modes: 'raise', 'ignore', and 'log'
  • Filtered tracebacks focusing on user code (hiding infrastructure)
  • Real-time progress reporting with error/success statistics
  • Automatic error logging to timestamped files
  • Caller frame information showing where parallel functions were invoked

Quick Start

Parallel Processing with Error Handling

from speedy_utils import multi_thread, multi_process

# Simple parallel processing
results = multi_thread(lambda x: x * 2, [1, 2, 3, 4, 5])
# Results: [2, 4, 6, 8, 10]

# Robust processing with error handling
def process_item(item):
    if item == 3:
        raise ValueError(f"Cannot process item {item}")
    return item * 2

# Continue processing despite errors
results = multi_thread(process_item, [1, 2, 3, 4, 5], error_handler='log')
# Results: [2, 4, None, 8, 10] - errors logged automatically

Table of Contents

Features

  • Caching Mechanisms: Disk-based and in-memory caching to optimize function calls.
  • Parallel Processing: Multi-threading, multi-processing, and asynchronous multi-threading utilities with enhanced error handling.
  • File I/O: Simplified JSON, JSONL, and pickle file handling with support for various file extensions.
  • Data Manipulation: Utilities for flattening lists and dictionaries, converting data types, and more.
  • Timing Utilities: Tools to measure and log execution time of functions and processes.
  • Pretty Printing: Enhanced printing functions for structured data, including HTML tables for Jupyter notebooks.
  • Enhanced Error Handling: Rich error tracebacks with code context, configurable error handling modes ('raise', 'ignore', 'log'), and detailed progress reporting.

Installation

You can install Speedy Utils via PyPI:

pip install speedy-utils
# or
uv pip install speedy-utils

Alternatively, install directly from the repository:

pip install git+https://github.com/anhvth/speedy
# or
uv pip install git+https://github.com/anhvth/speedy

For local development:

git clone https://github.com/anhvth/speedy
cd speedy
uv sync

Updating from previous versions

To update from previous versions or switch to v1.x, first uninstall any old packages, then install the latest version:

pip uninstall speedy_llm_utils speedy_utils
pip install -e ./  # for local development
# or
pip install speedy-utils -U  # for PyPI upgrade

Usage

Below are examples demonstrating how to utilize various features of Speedy Utils.

Caching

Memoize Decorator

Cache the results of function calls to disk to avoid redundant computations.

from speedy_utils import memoize

@memoize
def expensive_function(x):
    # Simulate an expensive computation
    import time
    time.sleep(2)
    return x * x

result = expensive_function(4)  # Takes ~2 seconds
result = expensive_function(4)  # Retrieved from cache instantly

In-Memory Memoization

Cache function results in memory for faster access within the same runtime.

from speedy_utils import imemoize

@imemoize
def compute_sum(a, b):
    return a + b

result = compute_sum(5, 7)  # Computed and cached
result = compute_sum(5, 7)  # Retrieved from in-memory cache

Parallel Processing

Multi-threading with Enhanced Error Handling

Execute functions concurrently using multiple threads with comprehensive error handling. The enhanced error handling provides three modes: 'raise' (default), 'ignore', and 'log'. When errors occur, you'll see rich-formatted tracebacks with code context and caller information.

from speedy_utils import multi_thread

def process_item(item):
    # Simulate processing that might fail
    if item == 3:
        raise ValueError(f"Invalid item: {item}")
    return item * 2

items = [1, 2, 3, 4, 5]

# Default behavior: raise on first error with rich traceback
try:
    results = multi_thread(process_item, items, workers=3)
except SystemExit:
    print("Error occurred and was displayed with rich formatting")

# Continue processing on errors, return None for failed items
results = multi_thread(process_item, items, workers=3, error_handler='ignore')
print(results)  # [2, 4, None, 8, 10]

# Log errors to files and continue processing
results = multi_thread(process_item, items, workers=3, error_handler='log', max_error_files=10)
print(results)  # [2, 4, None, 8, 10] - errors logged to .cache/speedy_utils/error_logs/

Multi-processing with Error Handling

Process items across multiple processes with the same enhanced error handling capabilities.

from speedy_utils import multi_process

def risky_computation(x):
    """Computation that might fail for certain inputs."""
    if x % 5 == 0:
        raise RuntimeError(f"Cannot process multiples of 5: {x}")
    return x ** 2

data = list(range(12))

# Process with error logging (continues on errors)
results = multi_process(
    risky_computation, 
    data, 
    backend='mp',
    error_handler='log',
    max_error_files=5
)
print(results)  # [0, 1, 4, 9, 16, None, 36, 49, 64, 81, None, 121]

Multi-Process with Inner Thread Pools

For maximum parallelism, multi_process supports nested parallelism where each process has its own thread pool. This is ideal for CPU-bound tasks that also benefit from I/O parallelism.

from speedy_utils import multi_process

def process_data(item):
    """Process a data item with potential I/O operations."""
    # Simulate CPU work
    result = item ** 2
    # Simulate I/O (each thread can do I/O in parallel)
    time.sleep(0.01)
    return result

data = list(range(100))

# 4 processes, each with 4 threads = 16 concurrent workers
results = multi_process(
    process_data,
    data,
    num_procs=4,      # Number of processes
    num_threads=4,    # Threads per process
    backend='mp',
    progress=True,
)

Backend Options:

Backend Description Use Case
'mp' Multi-processing with optional inner threads CPU-bound work, bypasses GIL
'safe' In-process thread pool (for testing) Debugging, unit tests
'seq' Sequential execution Debugging, reproducibility

When to use num_procs vs num_threads:

  • CPU-bound tasks: Use num_procs > 1, num_threads = 1 (processes bypass GIL)
  • I/O-bound tasks: Use num_procs = 1, num_threads > 1 (threads are lighter)
  • Mixed workloads: Use both (e.g., num_procs=4, num_threads=4)
# Example: Web scraping with multi-process + multi-thread
def fetch_and_parse(url):
    response = requests.get(url)  # I/O bound
    return parse_content(response.text)  # CPU bound

# 4 processes for parsing, 8 threads per process for fetching
results = multi_process(
    fetch_and_parse,
    urls,
    num_procs=4,
    num_threads=8,
    backend='mp',
    error_handler='log',  # Continue on failed URLs
)

mpython (CLI Tool)

mpython is a CLI tool for running Python scripts in multiple tmux windows with automatic GPU/CPU allocation for parallel processing.

Basic Usage:

# Run script.py with 16 parallel processes across GPUs 0-7
mpython script.py

# Run with 8 processes
mpython -t 8 script.py

# Run on specific GPUs only
mpython --gpus 0,1,2 script.py

Multi-Process Script Setup:

Your script must use MP_ID and MP_TOTAL environment variables for sharding:

import os

MP_ID = int(os.getenv("MP_ID", "0"))
MP_TOTAL = int(os.getenv("MP_TOTAL", "1"))

# Shard your data - each process gets its slice
inputs = list(range(1000))
my_inputs = inputs[MP_ID::MP_TOTAL]

for item in my_inputs:
    process(item)

Managing Sessions:

  • Sessions are named incrementally: mpython, mpython-1, mpython-2, etc.
  • Kill all sessions: kill-mpython
  • Attach to session: tmux attach -t mpython

Enhanced Error Handling

Speedy Utils now provides comprehensive error handling for parallel processing with rich formatting and detailed diagnostics.

Rich Error Tracebacks

When errors occur, you'll see beautifully formatted tracebacks with:

  • Code context: Lines of code around the error location
  • Caller information: Shows where the parallel function was invoked
  • Filtered frames: Focuses on user code, hiding infrastructure details
  • Color coding: Easy-to-read formatting with syntax highlighting

Error Handling Modes

Choose how to handle errors in parallel processing:

  • 'raise' (default): Stop on first error with detailed traceback
  • 'ignore': Continue processing, return None for failed items
  • 'log': Log errors to files and continue processing

Error Logging

When using error_handler='log', errors are automatically saved to timestamped files in .cache/speedy_utils/error_logs/ with full context and stack traces.

Progress Reporting with Error Statistics

Progress bars now show real-time error and success counts:

Multi-thread [8/10] [00:02<00:00, 3.45it/s, success=8, errors=2, pending=0]

This makes it easy to monitor processing health at a glance.

Example: Robust Data Processing

from speedy_utils import multi_thread

def process_data_record(record):
    """Process a data record that might have issues."""
    try:
        # Your processing logic here
        value = record['value'] / record['divisor']
        return {'result': value, 'status': 'success'}
    except KeyError as e:
        raise ValueError(f"Missing required field in record: {e}")
    except ZeroDivisionError:
        raise ValueError("Division by zero in record")

# Sample data with some problematic records
data = [
    {'value': 10, 'divisor': 2},     # OK
    {'value': 15, 'divisor': 0},     # Will error
    {'value': 20, 'divisor': 4},     # OK
    {'value': 25},                   # Missing divisor - will error
]

# Process with error logging - continues despite errors
results = multi_thread(
    process_data_record, 
    data, 
    workers=4,
    error_handler='log',
    max_error_files=10
)

print("Results:", results)
# Output: Results: [{'result': 5.0, 'status': 'success'}, None, {'result': 5.0, 'status': 'success'}, None]
# Errors are logged to files for later analysis

File I/O

Dumping Data

Save data in JSON, JSONL, or pickle formats.

from speedy_utils import dump_json_or_pickle, dump_jsonl

data = {"name": "Alice", "age": 30}

# Save as JSON
dump_json_or_pickle(data, "data.json")

# Save as JSONL
dump_jsonl([data, {"name": "Bob", "age": 25}], "data.jsonl")

# Save as Pickle
dump_json_or_pickle(data, "data.pkl")

Loading Data

Load data based on file extensions.

from speedy_utils import load_json_or_pickle, load_by_ext

# Load JSON
data = load_json_or_pickle("data.json")

# Load JSONL
data_list = load_json_or_pickle("data.jsonl")

# Load Pickle
data = load_json_or_pickle("data.pkl")

# Load based on extension with parallel processing
loaded_data = load_by_ext(["data.json", "data.pkl"])

Data Manipulation

Flattening Lists and Dictionaries

from speedy_utils import flatten_list, flatten_dict

nested_list = [[1, 2], [3, 4], [5]]
flat_list = flatten_list(nested_list)
print(flat_list)  # [1, 2, 3, 4, 5]

nested_dict = {"a": {"b": 1, "c": 2}, "d": 3}
flat_dict = flatten_dict(nested_dict)
print(flat_dict)  # {'a.b': 1, 'a.c': 2, 'd': 3}

Converting to Built-in Python Types

from speedy_utils import convert_to_builtin_python
from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int

user = User(name="Charlie", age=28)
builtin_user = convert_to_builtin_python(user)
print(builtin_user)  # {'name': 'Charlie', 'age': 28}

Utility Functions

Pretty Printing

from speedy_utils import fprint, print_table

data = {"name": "Dana", "age": 22, "city": "New York"}

# Pretty print as table
fprint(data)

# Print as table using tabulate
print_table(data)

Timing Utilities

from speedy_utils import timef, Clock

@timef
def slow_function():
    import time
    time.sleep(3)
    return "Done"

result = slow_function()  # Prints execution time

# Using Clock
clock = Clock()
# ... your code ...
clock.log()

LLM

The LLM class provides a unified interface for language model interactions with structured input/output handling. It supports text completion, structured outputs, caching, streaming, and OpenAI-compatible client integration.

Basic Text Completion

from llm_utils import LLM

llm = LLM(
    instruction="You are a helpful assistant.",
    model="gpt-4o-mini"
)

# Simple text completion
message = llm.chat_completion("What is Python?")
print(message.content)  # The text response

Structured Output with Pydantic

from pydantic import BaseModel
from llm_utils import LLM

class Sentiment(BaseModel):
    sentiment: str
    confidence: float

llm = LLM(
    instruction="Analyze the sentiment of the input.",
    model="gpt-4o-mini"
)

parsed: Sentiment = llm.pydantic_parse(
    "I love this product!",
    response_model=Sentiment,
)
print(f"Sentiment: {parsed.sentiment}, Confidence: {parsed.confidence}")

Streaming Responses

from llm_utils import LLM

llm = LLM(model="gpt-4o-mini")

# Stream text responses
stream = llm("Tell me a story", stream=True)
for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

Client Configuration

The LLM class accepts various client configurations:

from llm_utils import LLM
from openai import OpenAI

# Using a custom OpenAI client
custom_client = OpenAI(base_url="http://localhost:8000/v1", api_key="your-key")
llm = LLM(client=custom_client, model="llama-2-7b")

# Using a port number
llm = LLM(client=8000, model="llama-2-7b")

# Using a base URL string
llm = LLM(client="http://localhost:8000/v1", model="llama-2-7b")

Caching

Enable response caching to avoid redundant API calls:

from llm_utils import LLM

# Enable caching (default: True)
llm = LLM(model="gpt-4o-mini", cache=True)

# First call hits the API
result1 = llm("What is 2+2?")

# Second call returns cached result
result2 = llm("What is 2+2?")  # Instant response from cache

# Disable caching for a specific call
result3 = llm("What is 2+2?", cache=False)

Reasoning Models

Handle reasoning models that provide thinking traces:

from llm_utils import LLM

# For models like DeepSeek-R1 that output reasoning
llm = LLM(model="deepseek-reasoner")

message = llm.chat_completion("Solve this math problem: 15 * 23")

# Access the final answer
answer = message.content

# Access reasoning content when the response includes it
reasoning = getattr(message, "reasoning_content", None)

For custom staged generation with explicit tags, Qwen3LLM also exposes complete_until():

from llm_utils import Qwen3LLM

llm = Qwen3LLM(model="Qwen/Qwen3-0.6B")

memory_state = llm.complete_until(
    [{"role": "user", "content": "Solve this in stages"}],
    "<memory>",
    stop="</memory>",
    max_tokens=256,
)

think_state = llm.complete_until(
    [{"role": "user", "content": "Solve this in stages"}],
    memory_state.assistant_prompt_prefix + "\n<think_efficient>",
    stop="</think_efficient>",
    max_tokens=512,
)

final_state = llm.complete_until(
    [{"role": "user", "content": "Solve this in stages"}],
    think_state.assistant_prompt_prefix,
    stop="<|im_end|>",
    max_tokens=512,
)

Conversation History

Inspect previous conversations:

from llm_utils import LLM

llm = LLM(model="gpt-4o-mini")

# Make some calls
llm("Hello")
llm("How are you?")

# Inspect the last conversation
history = llm.inspect_history(idx=-1)
print(history)

# Get last 3 messages from the conversation
history = llm.inspect_history(idx=-1, k_last_messages=3)

Testing

The test suite uses pytest:

uv sync
uv run pytest

Project details


Release history Release notifications | RSS feed

This version

2.0.2

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

speedy_utils-2.0.2.tar.gz (818.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

speedy_utils-2.0.2-py3-none-any.whl (133.7 kB view details)

Uploaded Python 3

File details

Details for the file speedy_utils-2.0.2.tar.gz.

File metadata

  • Download URL: speedy_utils-2.0.2.tar.gz
  • Upload date:
  • Size: 818.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for speedy_utils-2.0.2.tar.gz
Algorithm Hash digest
SHA256 b23c0f100f41b0d1aff7916425bed6f3fab976b806eab5acba31d37febc0c53a
MD5 c186f43da3c9e6b45c831df941e21200
BLAKE2b-256 c21c2b64d5b7e2f5f13a56d6d14ed702706a8ce666fb35c58b3e59e6bde6591e

See more details on using hashes here.

File details

Details for the file speedy_utils-2.0.2-py3-none-any.whl.

File metadata

  • Download URL: speedy_utils-2.0.2-py3-none-any.whl
  • Upload date:
  • Size: 133.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for speedy_utils-2.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cd6e6a6013fece5876aec0ad428716caa2ba987e5cdf86f777e2e6e6a30e827a
MD5 997c93a810c570960a397e823838cd04
BLAKE2b-256 e7e01fdcf05425a7054ecc8cdc747fcf2c26408f51e7473a49a94977a1d9d7e2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page