Skip to main content

Non-blocking Python methods using decorators

Project description

MultiTasking: Non-blocking Python methods using decorators

Python version PyPi version PyPi status PyPi downloads CodeFactor Star this repo Follow me on twitter


MultiTasking is a lightweight Python library that lets you convert your Python methods into asynchronous, non-blocking methods simply by using a decorator. Perfect for I/O-bound tasks, API calls, web scraping, and any scenario where you want to run multiple operations concurrently without the complexity of manual thread or process management.

What’s New in v0.0.12

  • 🎯 Full Type Hint Support: Complete type annotations for better IDE support and code safety

  • 📚 Enhanced Documentation: Comprehensive docstrings and inline comments for better maintainability

  • 🔧 Improved Error Handling: More robust exception handling with specific error types

  • 🚀 Better Performance: Optimized task creation and management logic

  • 🛡️ Code Quality: PEP8 compliant, linter-friendly codebase

Quick Start

import multitasking
import time

@multitasking.task
def fetch_data(url_id):
    # Simulate API call or I/O operation
    time.sleep(1)
    return f"Data from {url_id}"

# These run concurrently, not sequentially!
for i in range(5):
    fetch_data(i)

# Wait for all tasks to complete
multitasking.wait_for_tasks()
print("All data fetched!")

Basic Example

# example.py
import multitasking
import time
import random
import signal

# Kill all tasks on ctrl-c (recommended for development)
signal.signal(signal.SIGINT, multitasking.killall)

# Or, wait for tasks to finish gracefully on ctrl-c:
# signal.signal(signal.SIGINT, multitasking.wait_for_tasks)

@multitasking.task  # <== this is all it takes! 🎉
def hello(count):
    sleep_time = random.randint(1, 10) / 2
    print(f"Hello {count} (sleeping for {sleep_time}s)")
    time.sleep(sleep_time)
    print(f"Goodbye {count} (slept for {sleep_time}s)")

if __name__ == "__main__":
    # Launch 10 concurrent tasks
    for i in range(10):
        hello(i + 1)

    # Wait for all tasks to complete
    multitasking.wait_for_tasks()
    print("All tasks completed!")

Output:

$ python example.py

Hello 1 (sleeping for 0.5s)
Hello 2 (sleeping for 1.0s)
Hello 3 (sleeping for 5.0s)
Hello 4 (sleeping for 0.5s)
Hello 5 (sleeping for 2.5s)
Hello 6 (sleeping for 3.0s)
Hello 7 (sleeping for 0.5s)
Hello 8 (sleeping for 4.0s)
Hello 9 (sleeping for 3.0s)
Hello 10 (sleeping for 1.0s)
Goodbye 1 (slept for 0.5s)
Goodbye 4 (slept for 0.5s)
Goodbye 7 (slept for 0.5s)
Goodbye 2 (slept for 1.0s)
Goodbye 10 (slept for 1.0s)
Goodbye 5 (slept for 2.5s)
Goodbye 6 (slept for 3.0s)
Goodbye 9 (slept for 3.0s)
Goodbye 8 (slept for 4.0s)
Goodbye 3 (slept for 5.0s)
All tasks completed!

Advanced Usage

Real-World Examples

Web Scraping with Concurrent Requests:

import multitasking
import requests
import signal

signal.signal(signal.SIGINT, multitasking.killall)

@multitasking.task
def fetch_url(url):
    try:
        response = requests.get(url, timeout=10)
        print(f"✅ {url}: {response.status_code}")
        return response.text
    except Exception as e:
        print(f"❌ {url}: {str(e)}")
        return None

# Fetch multiple URLs concurrently
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/status/200",
    "https://httpbin.org/json"
]

for url in urls:
    fetch_url(url)

multitasking.wait_for_tasks()
print(f"Processed {len(urls)} URLs concurrently!")

Database Operations:

import multitasking
import sqlite3
import time

@multitasking.task
def process_batch(batch_id, data_batch):
    # Simulate database processing
    conn = sqlite3.connect(f'batch_{batch_id}.db')
    # ... database operations ...
    conn.close()
    print(f"Processed batch {batch_id} with {len(data_batch)} records")

# Process multiple data batches concurrently
large_dataset = list(range(1000))
batch_size = 100

for i in range(0, len(large_dataset), batch_size):
    batch = large_dataset[i:i + batch_size]
    process_batch(i // batch_size, batch)

multitasking.wait_for_tasks()

Pool Management

MultiTasking uses execution pools to manage concurrent tasks. You can create and configure multiple pools for different types of operations:

import multitasking

# Create a pool for API calls (higher concurrency)
multitasking.createPool("api_pool", threads=20, engine="thread")

# Create a pool for CPU-intensive tasks (lower concurrency)
multitasking.createPool("cpu_pool", threads=4, engine="process")

# Switch between pools
multitasking.use_tag("api_pool")  # Future tasks use this pool

@multitasking.task
def api_call(endpoint):
    # This will use the api_pool
    pass

# Get pool information
pool_info = multitasking.getPool("api_pool")
print(f"Pool: {pool_info}")  # {'engine': 'thread', 'name': 'api_pool', 'threads': 20}

Task Monitoring

Monitor and control your tasks with built-in functions:

import multitasking
import time

@multitasking.task
def long_running_task(task_id):
    time.sleep(2)
    print(f"Task {task_id} completed")

# Start some tasks
for i in range(5):
    long_running_task(i)

# Monitor active tasks
while multitasking.get_active_tasks():
    active_count = len(multitasking.get_active_tasks())
    total_count = len(multitasking.get_list_of_tasks())
    print(f"Progress: {total_count - active_count}/{total_count} completed")
    time.sleep(0.5)

print("All tasks finished!")

Configuration & Settings

Thread/Process Limits

The default maximum threads equals the number of CPU cores. You can customize this:

import multitasking

# Set maximum concurrent tasks
multitasking.set_max_threads(10)

# Scale based on CPU cores (good rule of thumb for I/O-bound tasks)
multitasking.set_max_threads(multitasking.config["CPU_CORES"] * 5)

# Unlimited concurrent tasks (use carefully!)
multitasking.set_max_threads(0)

Execution Engine Selection

Choose between threading and multiprocessing based on your use case:

import multitasking

# For I/O-bound tasks (default, recommended for most cases)
multitasking.set_engine("thread")

# For CPU-bound tasks (avoids GIL limitations)
multitasking.set_engine("process")

When to use threads vs processes:

  • Threads (default): Best for I/O-bound tasks like file operations, network requests, database queries

  • Processes: Best for CPU-intensive tasks like mathematical computations, image processing, data analysis

Advanced Pool Configuration

Create specialized pools for different workloads:

import multitasking

# Fast pool for quick API calls
multitasking.createPool("fast_api", threads=50, engine="thread")

# CPU pool for heavy computation
multitasking.createPool("compute", threads=2, engine="process")

# Unlimited pool for lightweight tasks
multitasking.createPool("unlimited", threads=0, engine="thread")

# Get current pool info
current_pool = multitasking.getPool()
print(f"Using pool: {current_pool['name']}")

Best Practices

Performance Tips

  1. Choose the right engine: Use threads for I/O-bound tasks, processes for CPU-bound tasks

  2. Tune thread counts: Start with CPU cores × 2-5 for I/O tasks, CPU cores for CPU tasks

  3. Use pools wisely: Create separate pools for different types of operations

  4. Monitor memory usage: Each thread/process consumes memory

  5. Handle exceptions: Always wrap risky operations in try-catch blocks

Error Handling

import multitasking
import requests

@multitasking.task
def robust_fetch(url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.Timeout:
        print(f"⏰ Timeout fetching {url}")
    except requests.exceptions.RequestException as e:
        print(f"❌ Error fetching {url}: {e}")
    except Exception as e:
        print(f"💥 Unexpected error: {e}")
    return None

Resource Management

import multitasking
import signal

# Graceful shutdown on interrupt
def cleanup_handler(signum, frame):
    print("🛑 Shutting down gracefully...")
    multitasking.wait_for_tasks()
    print("✅ All tasks completed")
    exit(0)

signal.signal(signal.SIGINT, cleanup_handler)

# Your application code here...

Troubleshooting

Common Issues

Tasks not running concurrently? Check if you’re calling wait_for_tasks() inside your task loop instead of after it.

High memory usage? Reduce the number of concurrent threads or switch to a process-based engine.

Tasks hanging? Ensure your tasks can complete (avoid infinite loops) and handle exceptions properly.

Import errors? Make sure you’re using Python 3.6+ and have installed the latest version.

Debugging

import multitasking

# Enable task monitoring
active_tasks = multitasking.get_active_tasks()
all_tasks = multitasking.get_list_of_tasks()

print(f"Active: {len(active_tasks)}, Total: {len(all_tasks)}")

# Get current pool configuration
pool_info = multitasking.getPool()
print(f"Current pool: {pool_info}")

Installation

Requirements: - Python 3.6 or higher - No external dependencies!

Install via pip:

$ pip install multitasking --upgrade --no-cache-dir

Development installation:

$ git clone https://github.com/ranaroussi/multitasking.git
$ cd multitasking
$ pip install -e .

Compatibility

  • Python: 3.6+ (type hints require 3.6+)

  • Operating Systems: Windows, macOS, Linux

  • Environments: Works in Jupyter notebooks, scripts, web applications

  • Frameworks: Compatible with Flask, Django, FastAPI, and other Python frameworks

API Reference

Decorators

  • @multitasking.task - Convert function to asynchronous task

Configuration Functions

  • set_max_threads(count) - Set maximum concurrent tasks

  • set_engine(type) - Choose “thread” or “process” engine

  • createPool(name, threads, engine) - Create custom execution pool

Task Management

  • wait_for_tasks(sleep=0) - Wait for all tasks to complete

  • get_active_tasks() - Get list of running tasks

  • get_list_of_tasks() - Get list of all tasks

  • killall() - Emergency shutdown (force exit)

Pool Management

  • getPool(name=None) - Get pool information

  • createPool(name, threads=None, engine=None) - Create new pool

Performance Benchmarks

Here’s a simple benchmark comparing synchronous vs asynchronous execution:

import multitasking
import time
import requests

# Synchronous version
def sync_fetch():
    start = time.time()
    for i in range(10):
        requests.get("https://httpbin.org/delay/1")
    print(f"Synchronous: {time.time() - start:.2f}s")

# Asynchronous version
@multitasking.task
def async_fetch():
    requests.get("https://httpbin.org/delay/1")

def concurrent_fetch():
    start = time.time()
    for i in range(10):
        async_fetch()
    multitasking.wait_for_tasks()
    print(f"Concurrent: {time.time() - start:.2f}s")

# Results: Synchronous ~10s, Concurrent ~1s (10x speedup!)

Contributing

We welcome contributions! Here’s how you can help:

  1. Report bugs: Open an issue with details and reproduction steps

  2. Suggest features: Share your ideas for improvements

  3. Submit PRs: Fork, create a feature branch, and submit a pull request

  4. Improve docs: Help make the documentation even better

Development setup:

$ git clone https://github.com/ranaroussi/multitasking.git
$ cd multitasking
$ pip install -e .
$ python -m pytest  # Run tests

Support

  • 📖 Documentation: This README and inline code documentation

  • 🐛 Issues: GitHub Issues

  • 🐦 Twitter: @aroussi


Happy Multitasking! 🚀

Please drop me a note with any feedback you have.

Ran Aroussi

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

multitasking-0.0.13.tar.gz (20.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

multitasking-0.0.13-py3-none-any.whl (16.4 kB view details)

Uploaded Python 3

File details

Details for the file multitasking-0.0.13.tar.gz.

File metadata

  • Download URL: multitasking-0.0.13.tar.gz
  • Upload date:
  • Size: 20.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.4

File hashes

Hashes for multitasking-0.0.13.tar.gz
Algorithm Hash digest
SHA256 d896b5df877c9ca5eeddbf0e5994124694d6cb535aba698fb23344c7025155a1
MD5 a5e56b8779ca9d997156a652a63cbca0
BLAKE2b-256 bec3ac2cc9307fb15cc28ed6d4a9266b216c83ee7fe64299f0264047982bce88

See more details on using hashes here.

File details

Details for the file multitasking-0.0.13-py3-none-any.whl.

File metadata

  • Download URL: multitasking-0.0.13-py3-none-any.whl
  • Upload date:
  • Size: 16.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.4

File hashes

Hashes for multitasking-0.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 ec9243af140c67bfe52dc98d7173c294512735a88e8425c458b250db99dc2b48
MD5 64e95647a11100930e7f90bfbf4c5de7
BLAKE2b-256 d31c24dbf69b247f287401c904a396233a43c89fd4fb9b7cd2e50e430e9cd57c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page