Skip to main content

A threaded version of map reduce

Project description

threaded-map-reduce

A Python library that implements map, unordered map, and map-reduce using threads.

This library is intented to be performant for CPU-bound tasks. The map implementation has been tested to be much more performant than the map method of the futures.ThreadPoolExecutor class of the standar library.


Features

  • Parallel map with deterministic order: map
  • Parallel unordered map for maximum throughput: map_unordered
  • Parallel map-reduce: map_reduce
  • It groups items into chunks to reduce the parallelization overhead
  • No external dependencies

Installation

pip install threaded-map-reduce

(Or, if you use uv:)

uv pip install threaded-map-reduce

Quick Start

1. Parallel map (ordered)

from threaded_map_reduce import map

def square(x):
    return x * x

nums = range(1000)
result = list(threaded_map(square, nums, num_computing_threads=4, chunk_size=100))
print(result[-10:])

2. Parallel map (unordered)

Faster, but order is not preserved:

from threaded_map_reduce import map_unordered

nums = range(1000)
result = list(map_unordered(square, nums, num_computing_threads=4, chunk_size=100))
print(sorted(result))
print(result[-10:])

3. Parallel map-reduce

Useful for reductions such as sums, counts, or any associative operation.

from operator import add
from threaded_map_reduce import map_reduce

def square(x):
    return x * x

nums = range(0, 1000)
result = map_reduce(square, add, nums,
                    num_computing_threads=4,
                    chunk_size=100)
print(result)

API Summary

threaded_map(map_fn, items: Iterable, num_computing_threads: int, chunk_size: int = (100,)

Runs map_fn over every item in parallel and yields results keeping input order.

threaded_map(map_fn, items: Iterable, num_computing_threads: int, chunk_size: int = (100,)

Same as above, but yields items in any order.

map_reduce(map_fn, reduce_fn, iterable: Iterable, num_computing_threads: int, chunk_size: int = 100,)

Maps items in parallel, reduces mapped chunks using the provided reducer function, and returns a single result.

Performance

map-reduce

The map_reduce performance was tested with the following test:

  • Task: Count the number of primes up to 5,000,000
  • Workload: CPU-bound, using the same is_prime implementation for all tests (see the code at the end of this section)
  • Chunk size for all parallel map implementations: 500
  • Repetitions: 5 runs per configuration, reporting the mean runtime
  • Python: free-threaded CPython 3.14.0
  • Hardware: laptop with Intel® Core™ i7-1260P (4 performance cores, 6 efficiency cores)

Threaded map-reduce performance

map

To study the performance of the map function implemented in this library we ran a similar benchmark:

  • Task: check whether each integer from 1 to 5,000,000 is prime
  • Workload: CPU-bound, using the same is_prime implementation for all tests (see the code at the end of this section)
  • Chunk size for all parallel map implementations: 500
  • Repetitions: 5 runs per configuration, reporting the mean runtime
  • Python: free-threaded CPython 3.14.0
  • Hardware: laptop with Intel® Core™ i7-1260P (4 performance cores, 6 efficiency cores)

ideal vs this map performance

The performance of this library’s map implementation improves with the number of threads. For this task there is no appreciable difference between the ordered and unordered maps.

Compared with the ideal scaling, for two and four threads our implementation is 1.19 and 1.54 times slower respectively. In an ideal scenario, the runtime with N threads to perform the task should be the time with just one thread (using the standard non-threaded map implementation) divided by N.

Chunk size

The chunk size is a critical parameter for performance. Both map and map_reduce process items in chunks, and the parallelization overhead depends strongly on how many items are grouped into each chunk.

Using very small chunks (e.g. one item per chunk) usually produces poor performance, because the cost of thread scheduling and queue operations dominates the useful work. Using very large chunks eventually stops giving additional speedups, and may increase memory usage.

We used the same banchmark as in the previous map section. A chunk size of 1 is 6.7× slower than a chunk size of 500, and increasing the chunk size beyond 500 does not reduce the runtime further. The optimal chunk size depends on the specific workload.

Note that each chunk is materialized as a list before being processed, so larger chunk sizes increase memory usage.

Time used to carry out the task for different chunk sizes:

Time used to do the task for different chunk sizes

ThreadPoolExecutor.map vs this library’s map

We compared, using the same mapping benchmark, the performance of this library's map implementation with the one available in the ThreadPoolExecutor found in the futures module of the standard library:

futures map vs this map performance

In this benchmark, the ThreadPoolExecutor.map implementation is 4.11 (1 thread) times slower than the threaded_map implementation when using a single thread, and its performance degrades further as the number of threads increases (e.g. 5.99 slower with (4 threads)).

is_prime function

The is_prime function used was:

def is_prime(n):
    if n == 1:
        return False
    elif n == 2 or n == 3:
        return True
    elif n % 2 == 0:
        return False
    elif n < 9:
        return True
    elif n % 3 == 0:
        return False
    r = int(sqrt(n))
    for f in range(5, r + 1, 6):
        if n % f == 0:
            return False
        elif n % (f + 2) == 0:
            return False
    return True

map-reduce alternative implementations

The map-reduce function was implemented using different architectures to check which one was the most performant.

ThreadPoolExecutor

Architecture:

  • A ThreadPoolExecutor is created.
  • Each item a mapping task is created using the ThreadPoolExecutor's map method and the task is represented by a Future object that eventually will hold the result of the mapping.
  • The Future objects are used by the reduce function.

ThreadPoolExecutor map reduce performance

The result of this approach was pretty bad, the execution time was around 10 times larger than with a naive non-threaded approach and it did not improved wit the number of threads.

This is the code for this approach.

def map_reduce_with_executor_naive(
    map_fn,
    reduce_fn,
    items,
    max_workers,
    initial_reduce_value=None,
):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        reduced_result = initial_reduce_value
        for item in items:
            future = executor.submit(map_fn, item)
            futures.append(future)

            if len(futures) > max_workers:
                future = futures.pop()
                result = future.result()
                reduced_result = reduce_fn(result, reduced_result)

        for future in futures:
            reduced_result = reduce_fn(future.result(), reduced_result)
    return reduced_result

ThreadPoolExecutors with chunk

Another attempt using ThreadPoolExcutor was done, but instead of mapping every item, they were packed in chunks and those chunks were then mapped and reduced.

ThreadPoolExecutor with chunks map reduce performance

The time in this case was similar to the non-threaded one, but it did not improved with the number of threads.

Thread-safe iterator and computing threads

Iterators are not thread safe in Python, but we can use a lock to solve the issue.

class ThreadSafeIterator(Iterator):
    def __init__(self, it):
        self._it = iter(it)
        self._lock = threading.Lock()

    def __next__(self):
        with self._lock:
            return next(self._it)

Now that we have a thread safe iterator we can spawn a set of computing threads that will do the map and reduction put their results in a queue and, finally, an additional step to reduce the results of each computing thread.

Thread safe iterator and computing threads map reduce performance

The time with 1 thread in this implementation is similar to the non-threaded map-reduce, and the time was cut down when 2 or 3 computing threads were added, but it did not improved with more threads.

Chunks and computing threads

In the implementation based on the thread-safe iterator the lock used to yield each item might be the bottleneck, so more computing threads might not improve the performance because they will be waiting to get an item to process.

The approach used used by this library is similar to the previous one, but instead of processing items, it processes chunks of items. The class in charge of delivering the chunks to be mapped and reduced makes sure that the chunk yielding is thread-safe by using a lock.

class _ChunkDispenser(Iterator):
    def __init__(self, it, chunk_size):
        self._it = iter(it)
        self._lock = threading.Lock()
        self._chunk_size = chunk_size

    def __next__(self):
        with self._lock:
            chunk = list(itertools.islice(self._it, self._chunk_size))
            if not chunk:
                raise StopIteration
            else:
                return chunk

The chunks are then mapped and reduced by a pool of computing threads that put their final reduced result into a queue that the main thread reduces when their computing is done.

Chunk thread safe iterator and computing threads map reduce performance

This implementation gave the best result.

More complex implementations

Other more complex approaches we also tried, but none worked as well as the previous one.

License

MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded_map_reduce-0.1.1.tar.gz (141.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threaded_map_reduce-0.1.1-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file threaded_map_reduce-0.1.1.tar.gz.

File metadata

  • Download URL: threaded_map_reduce-0.1.1.tar.gz
  • Upload date:
  • Size: 141.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.13 {"installer":{"name":"uv","version":"0.9.13"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for threaded_map_reduce-0.1.1.tar.gz
Algorithm Hash digest
SHA256 eddb600eae7ae3889883fccc4db5e516496980b4e03c2a30849c5b369f8040bb
MD5 ffd12d93344662b07f6e85c21b70a0b1
BLAKE2b-256 7568fa973dca7bff1809956a8afbd51093f07bf400539ea17d283328b1322485

See more details on using hashes here.

File details

Details for the file threaded_map_reduce-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: threaded_map_reduce-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 9.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.13 {"installer":{"name":"uv","version":"0.9.13"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for threaded_map_reduce-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0589bd4cef3577552a541200300ef6ed6d3c5f3c68869f0853d564433776c73b
MD5 724c0bd4140858a64df9883cd21a3caa
BLAKE2b-256 edc8c2931e6faf8a38d2e1a039a8d6aab0eee76efc9c30a9a5fdfee9f20014d8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page