Buffer items from an iterable in a separate thread.
Project description
Threaded Generator
Easily run iterators in background threads or processes to create parallel producer-consumer pipelines.
Documentation: https://kwon-young.github.io/threaded-generator/
This library provides utilities to wrap an iterable (like a generator, a slow I/O process, or a CPU-bound task) in a background thread or process. It buffers items in a queue so that the consumer and producer can work concurrently, smoothing out bursts and improving throughput.
Key Features:
ThreadedGenerator: Run an iterator in a background thread (good for I/O bound tasks).ProcessGenerator: Run an iterator in a background process (good for CPU bound tasks, avoids GIL).ParallelGenerator: Split the work of an iterator across multiple parallel workers.ShutdownQueue: A queue wrapper that handles graceful shutdown signaling between producers and consumers.Monitor: Real-time terminal visualization of queue sizes and throughput.
Installation
pip install threaded-generator
Usage
1. Basic Threaded Generation (I/O Bound)
Use ThreadedGenerator when your source is slow due to I/O (network, disk).
import time
from threaded_generator import ThreadedGenerator
def slow_io_task():
for i in range(5):
time.sleep(0.5) # Simulate network/disk wait
yield i
# Buffers up to 3 items in a background thread
gen = ThreadedGenerator(slow_io_task(), maxsize=3)
for item in gen:
print(f"Got {item}")
2. Process Generation (CPU Bound)
Use ProcessGenerator to bypass the GIL for CPU-intensive tasks.
from threaded_generator import ProcessGenerator
def heavy_computation():
for i in range(5):
# Simulate heavy CPU work
yield sum(range(1_000_000 * i))
gen = ProcessGenerator(heavy_computation(), maxsize=2)
for result in gen:
print(result)
3. Parallel Processing (Multiple Workers)
Use ParallelGenerator with num_workers > 1 to distribute work.
Important: You must decorate the generator function with @partial_generator. This allows each worker to create its own fresh instance of the iterator.
from threaded_generator import ParallelGenerator, partial_generator
import time
@partial_generator
def process_data(x):
# Simulate work
time.sleep(0.5)
yield x * x
# Spawns 4 worker processes to process items in parallel
gen = ParallelGenerator(process_data(10), num_workers=4, maxsize=10)
for res in gen:
print(res)
4. Monitoring
Visualize the performance of your pipeline in the terminal.
from threaded_generator import ThreadedGenerator, Monitor
import time
monitor = Monitor()
# Pass the monitor to the generator
gen = ThreadedGenerator(range(100), monitor=monitor, name="MyGen")
with monitor:
for item in gen:
time.sleep(0.1) # Simulate slow consumption
Advanced Usage
Shared Consumption
You can share a single underlying producer among multiple consumers using the context manager.
gen = ThreadedGenerator(range(10), maxsize=5)
with gen:
it1 = iter(gen)
it2 = iter(gen)
# Items are distributed between it1 and it2
print(next(it1))
print(next(it2))
ShutdownQueue
Use ShutdownQueue directly if you need a queue that supports graceful shutdown signaling.
import multiprocessing as mp
from queue import ShutDown
from threaded_generator import ShutdownQueue
# Create a shutdown-capable queue backed by a multiprocessing Queue
sq = ShutdownQueue(maxsize=10, queue_type=mp.Queue)
# Producer
sq.put(1)
sq.put(2)
sq.shutdown() # Signal end of stream
# Consumer
try:
while True:
print(sq.get())
except ShutDown:
print("Queue shut down")
Error Handling
Exceptions raised within the source iterable are caught in the background worker and re-raised in the main thread (wrapped in a RuntimeError) when join() is called or iteration completes.
Note: When using ProcessGenerator or ParallelGenerator (multiprocessing), exception propagation is limited. Because exceptions are pickled to be sent across processes, the original traceback and __cause__ attributes are lost during serialization. As a result, when chaining multiple process-based generators, the main thread will receive a RuntimeError but may not be able to access the full chain of causes or the original stack trace from the worker process. Use ThreadedGenerator if preserving the full exception chain is required.
Credits
The original idea for ThreadedGenerator (combining a generator, a thread, and a queue) is attributed to everilae and their GitHub Gist.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file threaded_generator-0.4.0.tar.gz.
File metadata
- Download URL: threaded_generator-0.4.0.tar.gz
- Upload date:
- Size: 152.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
052681d23a05b2b41f1001ba7beaeed85c73af7ed812b0b059c4dd2c391c4689
|
|
| MD5 |
2a823a197dd8e342b376809e72c4a8d9
|
|
| BLAKE2b-256 |
2f05b0ae0571ddfbcad8b4101e8cd58e86ea9c2e5dc3168df71f98c385567802
|
Provenance
The following attestation bundles were made for threaded_generator-0.4.0.tar.gz:
Publisher:
publish.yml on kwon-young/threaded-generator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
threaded_generator-0.4.0.tar.gz -
Subject digest:
052681d23a05b2b41f1001ba7beaeed85c73af7ed812b0b059c4dd2c391c4689 - Sigstore transparency entry: 1059856347
- Sigstore integration time:
-
Permalink:
kwon-young/threaded-generator@c08d377ad971b22f49e7105ba4bd654e9e420f8e -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/kwon-young
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c08d377ad971b22f49e7105ba4bd654e9e420f8e -
Trigger Event:
push
-
Statement type:
File details
Details for the file threaded_generator-0.4.0-py3-none-any.whl.
File metadata
- Download URL: threaded_generator-0.4.0-py3-none-any.whl
- Upload date:
- Size: 28.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46f0e62eb30a8bf2959e4548284283507ea944b59f9a6d151249a3ce9ba0f936
|
|
| MD5 |
34e01f4abc9b80dffaa27681ea7cc3c4
|
|
| BLAKE2b-256 |
2e64fdd98160aae4820d3dfeb7f0dc2678595a81a8e7a27c769346297b2f49c9
|
Provenance
The following attestation bundles were made for threaded_generator-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on kwon-young/threaded-generator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
threaded_generator-0.4.0-py3-none-any.whl -
Subject digest:
46f0e62eb30a8bf2959e4548284283507ea944b59f9a6d151249a3ce9ba0f936 - Sigstore transparency entry: 1059856348
- Sigstore integration time:
-
Permalink:
kwon-young/threaded-generator@c08d377ad971b22f49e7105ba4bd654e9e420f8e -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/kwon-young
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c08d377ad971b22f49e7105ba4bd654e9e420f8e -
Trigger Event:
push
-
Statement type: