Pipelines for asynchronous processing
Project description
tinypl
A lightweight library for building data-processing pipelines in Python.
Content
Install
pip install git+https://github.com/fferflo/tinypl.git
Usage
Basics
The following is an example of a single-threaded multi-stage pipeline for processing a sequence of numbers:
import tinypl as pl
# Input to the pipeline: Can be any iterable object (list, sequence, set, etc), or an iterator.
pipe = range(100)
# Apply a function to every element: [0, 1, 2, 3, ...] -> [1, 4, 7, 10, ...]
pipe = pl.map(pipe, lambda x: 3 * x + 1)
# Keep only even numbers: [1, 4, 7, 10, ...] -> [4, 10, 16, 22, ...]
pipe = pl.filter(pipe, lambda x: x % 2 == 0)
# Partition the pipe into lists with size 2: [4, 10, 16, 22, ...] -> [[4, 10], [16, 22], ...]
pipe = pl.partition(pipe, 2)
# Alternative 1: Iterate the output using a for-loop
for x in pipe:
print(x)
# Alternative 2: Pull all items into a list
print(list(pipe))
# Alternative 3: Pull only a single item from the pipeline
print(next(pipe))
The pipeline follows a lazy-evaluation paradigm: Elements are only processed when they are retrieved from the pipeline using one of the three shown alternatives. Once an item is retrieved, it is consumed and cannot be retrieved again.
For simplicity, the above code overrides the pipe
variable at every step to point to the current stage, since the entire pipeline consists only of a single track. Nevertheless, any intermediate stage remains valid and can be used to pull elements:
input = range(100)
# Apply a function to every element
pipe_mapped = pl.map(input, lambda x: x + 1)
# Construct separate stages for even and odd numbers
pipe_even_only = pl.filter(pipe_mapped, lambda x: x % 2 == 0)
pipe_odd_only = pl.filter(pipe_mapped, lambda x: x % 2 == 1)
print(next(pipe_mapped)) # Prints 1
print(next(pipe_odd_only)) # Discards 2, prints 3
print(next(pipe_even_only)) # Prints 4
print(next(pipe_even_only)) # Discards 5, prints 6
print(next(pipe_mapped)) # Prints 7
Iterators & iterables
The input to the first stage can either be
- an iterable sequence (e.g. list, set, dictionary, etc), or
- an iterator over a sequence (an object that captures the current state of the iteration and is updated whenever an item is pulled).
When an iterable sequence is passed to the first stage, a stateful iterator is implicitly constructed for it:
numbers = [0, 1, 2, 3, 4]
pipe1 = pl.map(numbers, lambda x: x + 1) # Constructs an iterator over numbers
pipe2 = pl.map(numbers, lambda x: x + 1) # Constructs another iterator over numbers
print(next(pipe1)) # Prints 1
print(next(pipe2)) # Prints 1, since it uses a different iterator
An iterable sequence can also first be converted into an iterator using the builtin iter
function:
numbers = iter([0, 1, 2, 3, 4])
pipe1 = pl.map(numbers, lambda x: x + 1)
pipe2 = pl.map(numbers, lambda x: x + 1)
print(next(pipe1)) # Prints 1
print(next(pipe2)) # Prints 2, since both pipelines were constructed from the same iterator
Any pipeline stage also implements the iterator interface and can be used as a drop-in replacement, e.g. to pull the next item using next
or to pull all items into a list using list
.
Multi-threading & multi-processing
tinypl employs concurrent processing to speed up computation and prefetch elements. It offers the following paradigms for this case:
- Multi-threading launches multiple threads in a single process that concurrently run computations. Since the Global Interpreter Lock prevents Python code itself from being executed in parallel, multi-threading is only useful when running code that releases the GIL. This includes many libraries implemented with a C/C++ backend (e.g. Numpy, OpenCV), as well as blocking operations like reading and writing to disk.
- Multi-processing launches multiple processes each with their own Python instance and GIL, such that Python code itself can be executed in parallel. This comes with an overhead of starting up multiple processes, (de)serializing Python objects via pickle and copying memory between processes.
Multi-threading in tinypl is supported using the pl.thread.queue
stage that prefetches items from the previous stage using concurrent threads and stores the results in a queue for later retrieval.
pipe = range(100)
pipe = pl.map(pipe, lambda x: 2 * x)
# Prefetches up to 8 items from the pipe using 4 concurrent threads
pipe = pl.thread.queue(pipe, workers=4, maxsize=8)
print(next(pipe)) # Pulls a single prefetched item from the queue
As soon as items are removed from the queue, the worker threads will fill the queue up to maxsize
again. Due to the concurrent evaluation, the order of items is not guaranteed to be the same after the stage.
Stacking multiple pl.thread.queue
stages works out-of-the-box, since each worker thread will execute the code of the previous stages up to pulling items from the previous queue (and potentially waiting if it is empty).
pipe = range(100) # ||
# ||
pipe = pl.map(pipe, lambda x: x + 1) # ||
pipe = pl.thread.queue(pipe, workers=4, maxsize=8) # || \/ Workers from this queue will executed these stages
# ||
pipe = pl.map(pipe, lambda x: x * 2) # ||
pipe = pl.thread.queue(pipe, workers=4, maxsize=8) # \/ Workers from this queue will executed these stages
The utility function pl.thread.map
can be used as a concurrent alternative to pl.map
, and is equivalent to pl.map
followed by pl.thread.queue
.
pipe = pl.thread.map(pipe, lambda x: 2 * x, workers=4, maxsize=8)
Lastly, stages of the pipeline that are not thread-safe can be prevented from being accessed concurrently using pl.thread.mutex
:
def numbers(): # Raises "ValueError: generator already executing" when called concurrently
i = 0
while True:
i += 2
yield i
pipe = numbers()
pipe = pl.thread.mutex(pipe) # Enforce mutually exclusive access to previous stages
pipe = pl.thread.map(pipe, ..., workers=16)
Multi-processing in tinypl is supported using the pl.process.map
stage that applies a function to all elements using concurrent processes and stores the result in a queue for later retrieval.
pipe = pl.process.map(pipe, lambda x: 2 * x, workers=4, maxsize=8)
Any item passed to or returned from pl.process.map must be picklable to enable inter-process communication. The mapping function cannot have side-effects like writing global/ stage variables. Unlike the multi-threading stage pl.thread.queue
, multi-processing therefore only supports a map function rather than evaluating arbitrary previous stages.
In addition to starting up processes, this also creates a new thread that reads elements from the input stage and writes them in a memory structure that allows for inter-process communication with the worker processes.
Multi-thread and multi-process pipelines can be intermixed arbitrarily:
pipe = range(100)
pipe = pl.thread.map(pipe, lambda x: 1 + x, workers=4, maxsize=8)
pipe = pl.process.map(pipe, lambda x: 2 * x, workers=4, maxsize=8)
To reduce the overhead of copying memory between different processes, tinypl implements the pl.process.SharedMemoryRingBuffer
class that allows
passing objects with a large memory footprint between processes using shared memory instead:
ringbuffer = pl.process.SharedMemoryRingBuffer(4 * 1024 * 1024 * 1024) # Allocate 4 GB of shared memory
pipe = range(100)
def stage1(i):
# Create a large numpy array in this process
x = np.random.rand(1024, 1024, 200) # 200 MB
# Serialize and move the array to shared memory. The return value is a reference to the shared memory
# that can be passed between processes cheaply and can be deserialized later
x = ringbuffer.write(x)
return x
pipe = pl.process.map(pipe, stage1, workers=16)
def stage2(x):
# Read from shared memory and deserialize in main process
x = ringbuffer.read(x)
return x
pipe = pl.map(pipe, stage2) # (Reading is currently only supported from the main process)
Should I use multi-threading or multi-processing?
Some factors have to be considered here: Does some library release the GIL or call another library that releases the GIL? How much time is spent in GIL-locked code compared to GIL-free code? What is the overhead of running additional processes and copying items between processes in pl.process
?
Both pl.process.map
and pl.thread.map
are provided with the same interface and can be exchanged easily to test which one achieves better throughput and/ or memory consumption.
Markers
Markers are virtual items that can be injected into the pipeline and raise events when they arrive at later stages, e.g. to indicate which part of the input sequence has been processed so far. Markers are forwarded through stages without applying any of the processing steps.
pipe = [range(i + 1) for i in range(50)]
# -> [[0], [0, 1], [0, 1, 2], ...]
# Inject a marker after every item. marker variable acts as identifier for markers inserted here
pipe, marker = pl.marker.insert_periodic(pipe, n=1, after=True)
# -> [[0], marker, [0, 1], marker, ...]
# Flatten the lists (ignores markers)
pipe = pl.flatten(pipe)
# -> [0, marker, 0, 1, marker, 0, 1, 2, marker, ...]
# Apply a function to all items (ignores markers)
pipe = pl.map(pipe, lambda x: x + 1)
# -> [1, marker, 1, 2, marker, 1, 2, 3, marker, ...]
# Print "Hello world!" whenever we see a marker
def hello_world():
print(f"Hello world!")
pipe = pl.marker.on(pipe, hello_world, marker=marker)
# Create new stages that stop when marker is found, and pull into list
print(list(pl.marker.until(pipe, marker=marker))) # Prints [1]
print(list(pl.marker.until(pipe, marker=marker))) # Prints [1, 2]
print(list(pl.marker.until(pipe, marker=marker))) # Prints [1, 2, 3]
Ordering
The ordering of items at a pipeline stage can be saved via pl.order.save
and restored at a later stage using pl.order.load
, for example to retain the order of elements through multi-thread or multi-process stages. To do this, pl.order.save
annotates each item with a sequence number, and pl.order.load
pulls items and stores them until they are next in line.
pipe = range(50)
# Save the order of items
pipe = order = pl.order.save(pipe, maxsize=8) # pl.order.save stage is passed to pl.order.load constructor
# Change the order of items
import time, random
pipe = pl.thread.each(pipe, lambda x: time.sleep(random.uniform(0.1, 0.01)), workers=8)
# Restore the order
pipe = pl.order.load(pipe, order)
assert list(pipe) == list(range(50))
To prevent the temporary buffer in pl.order.load
from running out-of-memory (i.e. if the next item is taking very long to process, but items after it are processed faster), pl.order.save
allows at most maxsize
items to pass through and otherwise blocks until items are pulled and removed from the pl.order.load
stage. The pl.order.save
stage further enforces mutually exclusive access to previous stages similar to pl.thread.mutex
.
Related libraries
- PyTorch's
torch.utils.data
- Tensorflow's
tf.data
- Pypeln
- mpipe
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file tinypl-0.1.0.tar.gz
.
File metadata
- Download URL: tinypl-0.1.0.tar.gz
- Upload date:
- Size: 23.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 53a183f6c83e100adf45db9518e9c1b8f6b768c575fb2f1d684e33e2fa91e6b8 |
|
MD5 | e046b2947c90842d55d971c386fd371a |
|
BLAKE2b-256 | 33077a332db0a43f464bb3fe2b3bf24792eceedeec0d94765174c1cbf7c645a3 |
File details
Details for the file tinypl-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: tinypl-0.1.0-py3-none-any.whl
- Upload date:
- Size: 21.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4919a4f05c4738865d686dde45d4ff45aa45fb3fbb886237e4b5e99c7a066d34 |
|
MD5 | 330fc201e0f0dcca98f5d0802d3318ec |
|
BLAKE2b-256 | 023d39a1cb56a43f19b889cae946885c3d69dcc42f83e8c99a1c0650e2fac91e |