Skip to main content

Parallel processing utilities using Pathos mpprocessing library

Project description

twat-mp

Parallel processing utilities using Pathos and aiomultiprocess libraries. This package provides convenient context managers and decorators for parallel processing, with process-based, thread-based, and async-based pools.

Features

  • Multiple parallel processing options:
    • ProcessPool: For CPU-intensive parallel processing using Pathos
    • ThreadPool: For I/O-bound parallel processing using Pathos
    • AsyncMultiPool: For combined async/await with multiprocessing using aiomultiprocess
  • Decorators for common parallel mapping operations:
    • pmap: Standard parallel map (eager evaluation)
    • imap: Lazy parallel map returning an iterator
    • amap: Asynchronous map with automatic result retrieval
    • apmap: Async parallel map for use with async/await functions
  • Automatic CPU core detection for optimal pool sizing
  • Clean resource management with context managers
  • Full type hints and modern Python features
  • Flexible pool configuration with customizable worker count
  • Graceful error handling and resource cleanup
  • Optional dependencies to reduce installation footprint
  • Version control system (VCS) based versioning using hatch-vcs

Recent Updates

  • Fixed build system configuration with proper version handling
  • Enhanced error handling and resource cleanup
  • Improved compatibility with Python 3.12+ async features
  • Added comprehensive API reference documentation
  • Added real-world examples for various use cases

Installation

Basic installation:

pip install twat-mp

With async support:

pip install 'twat-mp[aio]'

With all extras and development tools:

pip install 'twat-mp[all,dev]'

Usage

Basic Usage

from twat_mp import ProcessPool, pmap

# Using the pool directly
with ProcessPool() as pool:
    results = pool.map(lambda x: x * 2, range(10))

# Using the decorator
@pmap
def double(x):
    return x * 2

results = double(range(10))

Async Support

The package provides async support through aiomultiprocess, allowing you to combine the benefits of async/await with multiprocessing:

import asyncio
from twat_mp import AsyncMultiPool, apmap

# Using the pool directly
async def process_items():
    async with AsyncMultiPool() as pool:
        async def work(x):
            await asyncio.sleep(0.1)  # Some async work
            return x * 2

        results = await pool.map(work, range(10))
        return results

# Using the decorator
@apmap
async def double(x):
    await asyncio.sleep(0.1)  # Some async work
    return x * 2

async def main():
    results = await double(range(10))
    print(results)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

asyncio.run(main())

The async support is particularly useful when you need to:

  • Perform CPU-intensive tasks in parallel
  • Handle many concurrent I/O operations
  • Combine async/await with true multiprocessing
  • Process results from async APIs in parallel

Advanced Async Features

The AsyncMultiPool provides additional methods for different mapping strategies:

import asyncio
from twat_mp import AsyncMultiPool

async def main():
    # Using starmap for unpacking arguments
    async def sum_values(a, b):
        await asyncio.sleep(0.01)
        return a + b

    async with AsyncMultiPool() as pool:
        # Regular map
        double_results = await pool.map(
            lambda x: x * 2,
            range(5)
        )
        print(double_results)  # [0, 2, 4, 6, 8]

        # Starmap unpacks arguments
        sum_results = await pool.starmap(
            sum_values,
            [(1, 2), (3, 4), (5, 6)]
        )
        print(sum_results)  # [3, 7, 11]

        # imap returns an async iterator
        async for result in pool.imap(sum_values, [(1, 1), (2, 2), (3, 3)]):
            print(result)  # Prints 2, 4, 6 as they complete

asyncio.run(main())

Using Process and Thread Pools

The package provides dedicated context managers for both process and thread pools:

from twat_mp import ProcessPool, ThreadPool

# For CPU-intensive operations
with ProcessPool() as pool:
    results = pool.map(lambda x: x * x, range(10))
    print(list(results))  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# For I/O-bound operations
with ThreadPool() as pool:
    results = pool.map(lambda x: x * 2, range(10))
    print(list(results))  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# Custom number of workers
with ProcessPool(nodes=4) as pool:
    results = pool.map(lambda x: x * x, range(10))

Using Map Decorators

The package provides three decorators for different mapping strategies:

from twat_mp import amap, imap, pmap

# Standard parallel map (eager evaluation)
@pmap
def square(x: int) -> int:
    return x * x

results = list(square(range(10)))
print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# Lazy parallel map (returns iterator)
@imap
def cube(x: int) -> int:
    return x * x * x

for result in cube(range(5)):
    print(result)  # Prints results as they become available

# Asynchronous parallel map with automatic result retrieval
@amap
def double(x: int) -> int:
    return x * 2

results = list(double(range(10)))
print(results)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Function Composition

Decorators can be composed for complex parallel operations:

from twat_mp import amap

@amap
def compute_intensive(x: int) -> int:
    result = x
    for _ in range(1000):  # Simulate CPU-intensive work
        result = (result * x + x) % 10000
    return result

@amap
def io_intensive(x: int) -> int:
    import time
    time.sleep(0.001)  # Simulate I/O wait
    return x * 2

# Chain parallel operations
results = list(io_intensive(compute_intensive(range(100))))

Real-World Examples

Image Processing

Processing images in parallel can significantly speed up operations like resizing, filtering, or format conversion:

from twat_mp import ProcessPool
from PIL import Image
import os

def resize_image(file_path):
    """Resize an image to 50% of its original size."""
    try:
        with Image.open(file_path) as img:
            # Get the original size
            width, height = img.size
            # Resize to 50%
            resized = img.resize((width // 2, height // 2))
            # Save with '_resized' suffix
            output_path = os.path.splitext(file_path)[0] + '_resized' + os.path.splitext(file_path)[1]
            resized.save(output_path)
            return output_path
    except Exception as e:
        return f"Error processing {file_path}: {e}"

# Get all image files in a directory
image_files = [f for f in os.listdir('images') if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
image_paths = [os.path.join('images', f) for f in image_files]

# Process images in parallel
with ProcessPool() as pool:
    results = list(pool.map(resize_image, image_paths))

print(f"Processed {len(results)} images")

Data Processing with Pandas

Splitting a large DataFrame into chunks and processing them in parallel:

from twat_mp import ProcessPool
import pandas as pd
import numpy as np

def process_chunk(chunk):
    """Apply complex transformations to a DataFrame chunk."""
    # Simulate CPU-intensive calculations
    chunk['calculated'] = np.sqrt(chunk['value'] ** 2 + chunk['other_value'] ** 2)
    chunk['category'] = chunk['calculated'].apply(lambda x: 'high' if x > 50 else 'medium' if x > 20 else 'low')
    return chunk

# Create a large DataFrame
df = pd.DataFrame({
    'value': np.random.randint(1, 100, 100000),
    'other_value': np.random.randint(1, 100, 100000)
})

# Split into chunks
chunk_size = 10000
chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

# Process chunks in parallel
with ProcessPool() as pool:
    processed_chunks = list(pool.map(process_chunk, chunks))

# Combine results
result_df = pd.concat(processed_chunks)
print(f"Processed DataFrame with {len(result_df)} rows")

Web Scraping with Async Support

Using the async capabilities to scrape multiple web pages concurrently:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from twat_mp import AsyncMultiPool, apmap

async def fetch_page(url):
    """Fetch a web page and extract its title."""
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 200:
                    html = await response.text()
                    soup = BeautifulSoup(html, 'html.parser')
                    title = soup.title.string if soup.title else "No title found"
                    return {'url': url, 'title': title, 'status': response.status}
                else:
                    return {'url': url, 'error': f'Status code: {response.status}', 'status': response.status}
        except Exception as e:
            return {'url': url, 'error': str(e), 'status': None}

# Use the decorator for parallel processing
@apmap
async def fetch_all_pages(url):
    return await fetch_page(url)

async def main():
    # List of URLs to scrape
    urls = [
        'https://python.org',
        'https://github.com',
        'https://stackoverflow.com',
        'https://news.ycombinator.com',
        'https://reddit.com'
    ]

    # Fetch all pages in parallel
    results = await fetch_all_pages(urls)

    # Print results
    for result in results:
        if 'error' in result:
            print(f"Error fetching {result['url']}: {result['error']}")
        else:
            print(f"Title of {result['url']}: {result['title']}")

if __name__ == "__main__":
    asyncio.run(main())

File System Operations

Processing files in a directory structure:

from twat_mp import ThreadPool
import os
import hashlib

def calculate_file_hash(file_path):
    """Calculate SHA-256 hash of a file."""
    if not os.path.isfile(file_path):
        return (file_path, None, "Not a file")

    try:
        hasher = hashlib.sha256()
        with open(file_path, 'rb') as f:
            # Read in chunks to handle large files
            for chunk in iter(lambda: f.read(4096), b''):
                hasher.update(chunk)
        return (file_path, hasher.hexdigest(), None)
    except Exception as e:
        return (file_path, None, str(e))

def find_files(directory):
    """Recursively find all files in a directory."""
    file_paths = []
    for root, _, files in os.walk(directory):
        for file in files:
            file_paths.append(os.path.join(root, file))
    return file_paths

# Get all files in a directory
files = find_files('/path/to/directory')

# Use ThreadPool for I/O-bound operations
with ThreadPool() as pool:
    results = list(pool.map(calculate_file_hash, files))

# Process results
for file_path, file_hash, error in results:
    if error:
        print(f"Error processing {file_path}: {error}")
    else:
        print(f"{file_path}: {file_hash}")

## Dependencies

* `pathos`: For process and thread-based parallel processing
* `aiomultiprocess` (optional): For async-based parallel processing

## Development

To set up the development environment:

```bash
# Install in development mode with test dependencies
uv pip install -e ".[test]"

# Install with async support for testing all features
uv pip install -e ".[aio,test]"

# Run tests
python -m pytest tests/

# Run benchmarks
python -m pytest tests/test_benchmark.py

License

MIT License .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

twat_mp-2.5.5.tar.gz (148.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

twat_mp-2.5.5-py3-none-any.whl (12.6 kB view details)

Uploaded Python 3

File details

Details for the file twat_mp-2.5.5.tar.gz.

File metadata

  • Download URL: twat_mp-2.5.5.tar.gz
  • Upload date:
  • Size: 148.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for twat_mp-2.5.5.tar.gz
Algorithm Hash digest
SHA256 58e77e137ebc4932fec94e14af52af78409b610ea757cd53e2472a061a3e735e
MD5 e9cb2a81f167c547727ed9b829f07472
BLAKE2b-256 7c6604b11aa7f5438fe91e0546ac33f12660214516fdc3e6146a5cd9a028fbbc

See more details on using hashes here.

File details

Details for the file twat_mp-2.5.5-py3-none-any.whl.

File metadata

  • Download URL: twat_mp-2.5.5-py3-none-any.whl
  • Upload date:
  • Size: 12.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for twat_mp-2.5.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c65bf590e5bb52d7942782873a4508edae4c8c6481d9b1d16353fe5ebe1c0351
MD5 8920cd023a053e939b153871c732d02e
BLAKE2b-256 b78840b9283db4733233ea9ae66610ba6ee6a2200d82e7296fd169578b12ab4e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page