Compose functions and sequentially call them. Run sequence in multiple threads or processes!
Project description
Sequential Functions
Compose functions into a sequence that are called sequentially.
Build fast readable code.
Break your problem into small functional steps and let sequential functions run them sequentially.
Want to go faster? Simply increase the number of threads or processes.
Examples
Toy Example
Toy example that highlights the syntax.
import sequential_functions as sf
def main():
# Compose an easy to read list of steps
sequence = sf.Compose(
square,
plus_one,
)
# Use list to pull items through the sequence
outputs = list(sequence(range(5)))
print(outputs)
def square(x):
return x*x
def plus_one(x):
return x + 1
if __name__ == "__main__":
main()
Output
[1, 2, 5, 10, 17]
Under the Hood
Compose uses generator chaining to run items through each of the functions. Both of these methods produce the same output
import sequential_functions as sf
def main():
# Method 1
sequence = sf.Compose(
square,
plus_one,
)
outputs = list(sequence(range(5)))
print(outputs,"Method 1 - Composed Sequence")
# Method 2
generator_chain = range(5)
generator_chain = (square(x) for x in generator_chain)
generator_chain = (plus_one(x) for x in generator_chain)
output = list(generator_chain)
print(outputs,"Method 2 - Generator Chain")
def square(x):
return x*x
def plus_one(x):
return x + 1
if __name__ == "__main__":
main()
Output
[1, 2, 5, 10, 17] Method 1 - Composed Sequence
[1, 2, 5, 10, 17] Method 2 - Generator Chain
Best Practice
It's best practice to pass a dict in and out of each function. Each function can modify the dict as they complete their computation. This design seems the most readable and extensible.
import sequential_functions as sf
def main():
sequence = sf.Compose(
create_item_dict,
load_image,
preprocess_image,
detect_objects,
)
paths = ["cat.jpg","dog.jpg"]
for item in sequence(paths):
print(f"Results: {item['image_path']}")
print(item["detections"])
print()
def create_item_dict(path):
print(f"Item Dict: {path}")
item = { "image_path": path}
return item
def load_image(item):
print(f"Loading: {item['image_path']}")
item["image"] = "e.g. numpy array"
return item
def preprocess_image(item):
print(f"Preprocessing: {item['image_path']}")
item["tensor"] = "e.g. torch tensor"
return item
def detect_objects(item):
print(f"Detecting: {item['image_path']}")
item["detections"] = ["box 1", "box 2"]
return item
if __name__ == "__main__":
main()
Output
Item Dict: cat.jpg
Loading: cat.jpg
Preprocessing: cat.jpg
Detecting: cat.jpg
Results: cat.jpg
['box 1', 'box 2']
Item Dict: dog.jpg
Loading: dog.jpg
Preprocessing: dog.jpg
Detecting: dog.jpg
Results: dog.jpg
['box 1', 'box 2']
Multi Processing
It's trivial to distribute work to multiple processes by providing the num_processes argument. Order is not preserved with multiprocessing. Use multiprocessing when computation is the bottle neck.
import sequential_functions as sf
import time
import os
def main():
sequence = sf.Compose(
slow_task,
record_process_id,
num_processes=5, # Simply choose the number of processes
)
start_time = time.perf_counter()
for x in sequence(range(5)):
print(x)
end_time = time.perf_counter()
print(f"total time: {end_time-start_time}")
def slow_task(x):
time.sleep(1) # sleep 1 second
return x
def record_process_id(x):
return f"Task {x} completed by process {os.getpid()}"
if __name__ == "__main__":
main()
Output
Task 1 completed by process 1253843
Task 0 completed by process 1253837
Task 2 completed by process 1253838
Task 3 completed by process 1253846
Task 4 completed by process 1253840
total time: 1.241348128998652
Multi Threading
It's trivial to distribute work to multiple threads by providing the num_threads argument. Order is not preserved with multithreading. Use threading when IO is the bottle neck. e.g loading urls.
import sequential_functions as sf
import time
import threading
def main():
sequence = sf.Compose(
slow_task,
record_thread_name,
num_threads=5, # Simply choose the number of thread
)
start_time = time.perf_counter()
for x in sequence(range(5)):
print(x)
end_time = time.perf_counter()
print(f"total time: {end_time-start_time}")
def slow_task(x):
time.sleep(1) # sleep 1 second
return x
def record_thread_name(x):
name = threading.current_thread().name
return f"Task {x} completed by thread {name}"
if __name__ == "__main__":
main()
Output
Task 0 completed by thread ThreadPoolExecutor-0_2
Task 1 completed by thread ThreadPoolExecutor-0_3
Task 4 completed by thread ThreadPoolExecutor-0_1
Task 2 completed by thread ThreadPoolExecutor-0_0
Task 3 completed by thread ThreadPoolExecutor-0_4
total time: 1.2234888289822266
Nesting
Compose returns a callable that can be nesting inside another Compose. Each compose can use threads and processes independently.
import sequential_functions as sf
import threading
import time
import os
def main():
sequence = sf.Compose(
function_a,
sf.Compose(
function_b,
num_threads=3,
),
sf.Compose(
function_c,
num_processes=3,
),
)
outputs=list(sequence(range(3)))
print(outputs)
def function_a(x):
print(f"function_a({x}) ran in main thread")
return x
def function_b(x):
time.sleep(1) # sleep 1 second
print(f"function_b({x}) ran in thread {threading.current_thread().name}")
return x
def function_c(x):
time.sleep(1) # sleep 1 second
print(f"function_c({x}) ran in process {os.getpid()}")
return x
if __name__ == "__main__":
main()
Output
function_c(0) ran in process 1253950
function_c(1) ran in process 1253958
function_c(2) ran in process 1253948
function_a(0) ran in main thread
function_a(1) ran in main thread
function_a(2) ran in main thread
function_b(0) ran in thread ThreadPoolExecutor-0_1
function_b(2) ran in thread ThreadPoolExecutor-0_0
function_b(1) ran in thread ThreadPoolExecutor-0_2
[0, 1, 2]
Batching
Use batching to collate multiple items into a batch. A machine learning model may more efficient on batches.
import sequential_functions as sf
import time
def main():
sequence = sf.Compose(
# Build the batches in background processes.
sf.Compose(
load_image,
sf.Batch(batch_size=3),
collate_images,
num_processes=3,
),
# Detect in the main process.
detect_objects,
debatch_detections,
)
image_paths = (f"image_{i}.jpg" for i in range(10))
results = list(sequence(image_paths))
for result in results:
print(result)
def load_image(path):
return path.replace("image","tensor").replace(".jpg","")
def collate_images(x_list):
# Ideally you would stack images into a tensor
return ",".join(x_list)
def detect_objects(x_batch):
print(f"Detecting on Batch: {x_batch}")
# Ideally your detection runs faster with a batch of images.
return x_batch.replace("tensor","Detections tensor")
def debatch_detections(x_batch):
yield from x_batch.split(",")
if __name__ == "__main__":
main()
Output
Detecting on Batch: tensor_3,tensor_4,tensor_5
Detecting on Batch: tensor_0,tensor_1,tensor_2
Detecting on Batch: tensor_6,tensor_7,tensor_9
Detecting on Batch: tensor_8
Detections tensor_3
Detections tensor_4
Detections tensor_5
Detections tensor_0
Detections tensor_1
Detections tensor_2
Detections tensor_6
Detections tensor_7
Detections tensor_9
Detections tensor_8
Callables
Functions can be any type of callable. Use closures and callable objects to change the behaviour of functions
import sequential_functions as sf
def main():
sequence = sf.Compose(
to_string,
append_string(" Hello"),
append_string(" World!"),
EncloseString("**"),
EncloseString(".."),
)
for x in sequence(range(5)):
print(x)
def to_string(x):
return str(x)
def append_string(s):
# create new function on the fly
def closure(x):
return x + s
# return this new function
return closure
class EncloseString():
# Callable class
def __init__(self,s):
self.s = s
def __call__(self,x):
return self.s + x + self.s
if __name__ == "__main__":
main()
Output
..**0 Hello World!**..
..**1 Hello World!**..
..**2 Hello World!**..
..**3 Hello World!**..
..**4 Hello World!**..
Item Growth
Functions can yield out more items than they take in.
import sequential_functions as sf
def main():
sequence = sf.Compose(
yield_video_frames,
detect_objects,
)
for x in sequence(range(3)):
print(x)
def yield_video_frames(x):
num_frames = 3
for i in range(num_frames):
yield f"Video {x}, Frame {i}"
def detect_objects(x):
return f" Detecting objects in {x}"
if __name__ == "__main__":
main()
Output
Detecting objects in Video 0, Frame 0
Detecting objects in Video 0, Frame 1
Detecting objects in Video 0, Frame 2
Detecting objects in Video 1, Frame 0
Detecting objects in Video 1, Frame 1
Detecting objects in Video 1, Frame 2
Detecting objects in Video 2, Frame 0
Detecting objects in Video 2, Frame 1
Detecting objects in Video 2, Frame 2
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sequential_functions-0.1.3.tar.gz.
File metadata
- Download URL: sequential_functions-0.1.3.tar.gz
- Upload date:
- Size: 10.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d1c767f4adc5a4d2d79334baee3decab97dd9cd6aaf41367d7a6e7d80cf89d2d
|
|
| MD5 |
fa1024e4c42bdb46b4a5e369f1ca9b93
|
|
| BLAKE2b-256 |
0d5a57c0737a2398e592a60748001865f6dadd3ec9437b6236c1db6a4f044565
|
File details
Details for the file sequential_functions-0.1.3-py3-none-any.whl.
File metadata
- Download URL: sequential_functions-0.1.3-py3-none-any.whl
- Upload date:
- Size: 8.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
de03b9d1c7dec08e2349719cd31b8b6a1821f6bb6a95c5373d43e26d0fd2cc7a
|
|
| MD5 |
d0cc34244207c64e083103f210d349f5
|
|
| BLAKE2b-256 |
92dd2961613290fbd01fb28ac051fba40840a5e70574f3d1c21354b44dfbc449
|