Skip to main content

Buffer items from an iterable in a separate thread.

Project description

Threaded Generator

Easily run iterators in background threads or processes to create parallel producer-consumer pipelines.

Documentation: https://kwon-young.github.io/threaded-generator/

This library provides utilities to wrap an iterable (like a generator, a slow I/O process, or a CPU-bound task) in a background thread or process. It buffers items in a queue so that the consumer and producer can work concurrently, smoothing out bursts and improving throughput.

Key Features:

  • ThreadedGenerator: Run an iterator in a background thread (good for I/O bound tasks).
  • ProcessGenerator: Run an iterator in a background process (good for CPU bound tasks, avoids GIL).
  • ParallelGenerator: Split the work of an iterator across multiple parallel workers.
  • ShutdownQueue: A queue wrapper that handles graceful shutdown signaling between producers and consumers.
  • Monitor: Real-time terminal visualization of queue sizes and throughput.

Installation

pip install threaded-generator

Usage

1. Basic Threaded Generation (I/O Bound)

Use ThreadedGenerator when your source is slow due to I/O (network, disk).

import time
from threaded_generator import ThreadedGenerator

def slow_io_task():
    for i in range(5):
        time.sleep(0.5)  # Simulate network/disk wait
        yield i

# Buffers up to 3 items in a background thread
gen = ThreadedGenerator(slow_io_task(), maxsize=3)

for item in gen:
    print(f"Got {item}")

2. Process Generation (CPU Bound)

Use ProcessGenerator to bypass the GIL for CPU-intensive tasks.

from threaded_generator import ProcessGenerator

def heavy_computation():
    for i in range(5):
        # Simulate heavy CPU work
        yield sum(range(1_000_000 * i))

gen = ProcessGenerator(heavy_computation(), maxsize=2)

for result in gen:
    print(result)

3. Parallel Processing (Multiple Workers)

Use ParallelGenerator with num_workers > 1 to distribute work.

Important: You must decorate the generator function with @partial_generator. This allows each worker to create its own fresh instance of the iterator.

from threaded_generator import ParallelGenerator, partial_generator
import time

@partial_generator
def process_data(x):
    # Simulate work
    time.sleep(0.5)
    yield x * x

# Spawns 4 worker processes to process items in parallel
gen = ParallelGenerator(process_data(10), num_workers=4, maxsize=10)

for res in gen:
    print(res)

4. Monitoring

Visualize the performance of your pipeline in the terminal.

from threaded_generator import ThreadedGenerator, Monitor
import time

monitor = Monitor()

# Pass the monitor to the generator
gen = ThreadedGenerator(range(100), monitor=monitor, name="MyGen")

with monitor:
    for item in gen:
        time.sleep(0.1) # Simulate slow consumption

Advanced Usage

Shared Consumption

You can share a single underlying producer among multiple consumers using the context manager.

gen = ThreadedGenerator(range(10), maxsize=5)

with gen:
    it1 = iter(gen)
    it2 = iter(gen)

    # Items are distributed between it1 and it2
    print(next(it1))
    print(next(it2))

ShutdownQueue

Use ShutdownQueue directly if you need a queue that supports graceful shutdown signaling.

import multiprocessing as mp
from queue import ShutDown
from threaded_generator import ShutdownQueue

# Create a shutdown-capable queue backed by a multiprocessing Queue
sq = ShutdownQueue(maxsize=10, queue_type=mp.Queue)

# Producer
sq.put(1)
sq.put(2)
sq.shutdown()  # Signal end of stream

# Consumer
try:
    while True:
        print(sq.get())
except ShutDown:
    print("Queue shut down")

Error Handling

Exceptions raised within the source iterable are caught in the background worker and re-raised in the main thread (wrapped in a RuntimeError) when join() is called or iteration completes.

Note: When using ProcessGenerator or ParallelGenerator (multiprocessing), exception propagation is limited. Because exceptions are pickled to be sent across processes, the original traceback and __cause__ attributes are lost during serialization. As a result, when chaining multiple process-based generators, the main thread will receive a RuntimeError but may not be able to access the full chain of causes or the original stack trace from the worker process. Use ThreadedGenerator if preserving the full exception chain is required.

Credits

The original idea for ThreadedGenerator (combining a generator, a thread, and a queue) is attributed to everilae and their GitHub Gist.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded_generator-0.3.0.tar.gz (141.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threaded_generator-0.3.0-py3-none-any.whl (28.7 kB view details)

Uploaded Python 3

File details

Details for the file threaded_generator-0.3.0.tar.gz.

File metadata

  • Download URL: threaded_generator-0.3.0.tar.gz
  • Upload date:
  • Size: 141.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for threaded_generator-0.3.0.tar.gz
Algorithm Hash digest
SHA256 a728ba961b4be7788fdc24c1edc839df2d6bea2a1612c494b989e38f90b11053
MD5 c84b6b38af6d55d0573147446ae00a05
BLAKE2b-256 bcad45c04821bc6ed0d1a3b29ca48e0987bffd552f345a683c46f30ad846b70e

See more details on using hashes here.

Provenance

The following attestation bundles were made for threaded_generator-0.3.0.tar.gz:

Publisher: publish.yml on kwon-young/threaded-generator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file threaded_generator-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for threaded_generator-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 734aae4b9057fee37e973adca5919c04460260a94457f77ca5baf0d722abec1b
MD5 84d8f615d6acb98d06d0b3bf6efd3d6c
BLAKE2b-256 ab6276ed31e10062cd337e3879cf027b101e24d6bb2dde3d6fba3fad1d19c781

See more details on using hashes here.

Provenance

The following attestation bundles were made for threaded_generator-0.3.0-py3-none-any.whl:

Publisher: publish.yml on kwon-young/threaded-generator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page