ATBU atbu.mp_pipeline package, a multiprocessing work item pipeline.
Project description
Python Multiprocessing Pipeline
Overview
The atbu.mp_pipeline package uses Python multiprocessing capabilities to support multi-stage pipeline capabilities, including support for dual-stage parallel execution of a producer and consumer stages, automatically providing each of those stages with pipe connection, allowing them share what is being produced/consumed.
The atbu.mp_pipeline package is currently used by the following project for supporting a backup compression pipeline stage:
- ATBU Backup & Persistent File Information utility package (atbu-pkg).
Documentation: https://atbu-mp-pipeline.readthedocs.io/en/latest/
Setup
To install atbu-common-pkg:
pip install atbu-mp-pipeline-pkg
See below for a few examples. See source code for other packages mentioned above and for additional details and usage information.
Examples
Basic/Simple Pipeline Example
import os
from multiprocessing import freeze_support
from atbu.mp_pipeline.mp_pipeline import (
MultiprocessingPipeline,
SubprocessPipelineStage,
PipelineWorkItem,
)
def queue_worker_func(parm_top_secret, parent_pid):
assert parent_pid != os.getpid()
return (
parent_pid,
os.getpid(),
parm_top_secret,
)
def int_stage0(wi: PipelineWorkItem):
wi.user_obj[0] = 100
return wi
def int_stage1(wi: PipelineWorkItem):
wi.user_obj[0] = wi.user_obj[0] * 2
wi.user_obj[1] = "stage1"
return wi
def int_stage2(wi: PipelineWorkItem):
wi.user_obj[0] = wi.user_obj[0] * 2
wi.user_obj[2] = f"stage2: got this from parent: {wi.user_obj['parent']}"
return wi
def always_yes(wi: PipelineWorkItem):
return True
def main():
sp = MultiprocessingPipeline(
name="test_mp_pipeline_basic",
stages=[
SubprocessPipelineStage(
fn_determiner=always_yes,
fn_worker=int_stage0,
),
SubprocessPipelineStage(
fn_determiner=always_yes,
fn_worker=int_stage1,
),
SubprocessPipelineStage(
fn_determiner=always_yes,
fn_worker=int_stage2,
),
],
)
d = {}
d["parent"] = "This is from parent"
wi = PipelineWorkItem(d.copy())
f = sp.submit(wi)
r_wi: PipelineWorkItem = f.result()
if not wi.is_failed:
assert wi == r_wi # On success, should be equal.
assert wi.user_obj["parent"] == d["parent"] # Should not change.
# Should have values as set by stages...
assert wi.user_obj[0] == 400
assert wi.user_obj[1] == "stage1"
assert wi.user_obj[2] == "stage2: got this from parent: This is from parent"
print(f"Work item completed successfully:")
print(f" wi.user_obj[0]={wi.user_obj[0]}")
print(f" wi.user_obj[1]={wi.user_obj[1]}")
print(f" wi.user_obj[2]={wi.user_obj[2]}")
else:
print(f"Something did not go as planned:")
for ex in r_wi.exceptions:
print(f" ex={ex}")
sp.shutdown()
assert sp.was_graceful_shutdown
if __name__ == '__main__':
freeze_support()
main()
Output:
Work item completed successfully:
wi.user_obj[0]=400
wi.user_obj[1]=stage1
wi.user_obj[2]=stage2: got this from parent: This is from parent
Large Pipeline Example
import os
from multiprocessing import freeze_support
from atbu.mp_pipeline.mp_pipeline import (
MultiprocessingPipeline,
SubprocessPipelineStage,
PipelineWorkItem,
)
class LargePipelineWorkItem(PipelineWorkItem):
def __init__(self) -> None:
super().__init__(user_obj=self)
self.is_ok = True
self.num = 0
self.pid = os.getpid()
class LargePipelineStage(SubprocessPipelineStage):
def __init__(self) -> None:
super().__init__()
def is_for_stage(self, pwi: LargePipelineWorkItem) -> bool:
# Example of stage rejecting its chance to run the work item.
# Say "no" to every other request.
pwi.is_ok = not pwi.is_ok
return not pwi.is_ok
def perform_stage_work(
self,
pwi: LargePipelineWorkItem,
**kwargs,
):
pwi.num += 1
return pwi
def main():
stages = 100
mpp = MultiprocessingPipeline(
max_simultaneous_work_items=min(os.cpu_count(), 15),
name="test_mp_pipeline_large",
)
for _ in range(stages):
mpp.add_stage(stage=LargePipelineStage())
print(f"Pipeline has {mpp.num_stages} stages.")
wi = LargePipelineWorkItem()
f = mpp.submit(wi)
r_wi: PipelineWorkItem = f.result()
if not r_wi.is_failed:
assert wi == r_wi
assert r_wi.is_ok
assert r_wi.num == 50
print(f"Work item completed successfully:")
print(f" r_wi.is_ok={r_wi.is_ok}")
print(f" r_wi.pid={r_wi.pid}")
else:
if r_wi.exceptions is not None:
print(f"Something did not go as planned:")
for ex in r_wi.exceptions:
print(f" ex={ex}")
mpp.shutdown()
assert mpp.was_graceful_shutdown
if __name__ == '__main__':
freeze_support()
main()
Output:
Pipeline has 100 stages.
Work item completed successfully:
r_wi.is_ok=True
r_wi.pid=8204
r_wi.num=50
Mixed Pipeline Example
import os
from multiprocessing import freeze_support
from atbu.mp_pipeline.mp_pipeline import (
MultiprocessingPipeline,
PipelineStage,
SubprocessPipelineStage,
ThreadPipelineStage,
PipelineWorkItem,
)
class LargePipelineWorkItem(PipelineWorkItem):
def __init__(self) -> None:
super().__init__(user_obj=self)
self.is_ok = True
self.num = 0
self.pid = os.getpid()
class MixedPipelineSubprocessStage(PipelineStage):
def __init__(self) -> None:
super().__init__()
@property
def is_subprocess(self):
return True
def is_for_stage(self, pwi: LargePipelineWorkItem) -> bool:
return True # Yes, this stage wants all work items.
def perform_stage_work(
self,
pwi: LargePipelineWorkItem,
**kwargs,
):
# Run the work item.
assert pwi.pid != os.getpid() # Subprocess should have different pid.
pwi.num += 1
return pwi
def perform_thread_stage_work(
pwi: LargePipelineWorkItem,
**kwargs,
):
assert pwi.pid == os.getpid() # Thread should have same process.
pwi.num += 1
return pwi
def main():
mpp = MultiprocessingPipeline(
name="mp_pipeline_large_mixed",
max_simultaneous_work_items=min(os.cpu_count(), 15),
)
for _ in range(10):
# Add a stage using our derived-class pipeline stage.
mpp.add_stage(stage=MixedPipelineSubprocessStage())
# Add a stage using the library's thread pipeline stage,
# where we specify the callable to call for both asking
# the pipeline if it wants to run a given work item, and
# and other to actually do the work.
mpp.add_stage(
stage=ThreadPipelineStage(
fn_determiner=lambda pwi: True, # Run all work items.
fn_worker=perform_thread_stage_work, # Call this to run them.
)
)
print(f"Pipeline has {mpp.num_stages} stages.")
wi = LargePipelineWorkItem()
f = mpp.submit(wi)
r_wi: PipelineWorkItem = f.result()
if not r_wi.is_failed:
assert wi == r_wi
assert not r_wi.is_failed
assert r_wi.is_ok
assert r_wi.num == 20
assert r_wi.pid == os.getpid()
print(f"Work item completed successfully:")
print(f" r_wi.num={r_wi.num}")
print(f" r_wi.pid={r_wi.pid}")
else:
if r_wi.exceptions is not None:
print(f"Something did not go as planned:")
for ex in r_wi.exceptions:
print(f" ex={ex}")
mpp.shutdown()
assert mpp.was_graceful_shutdown
if __name__ == '__main__':
freeze_support()
main()
Output:
Pipeline has 20 stages.
Work item completed successfully:
r_wi.num=20
r_wi.pid=18068
Dual-stage Producer/Consumer Example
import os
from multiprocessing import freeze_support
import concurrent.futures
from atbu.mp_pipeline.mp_pipeline import (
MultiprocessingPipeline,
SubprocessPipelineStage,
PipelineWorkItem,
PipeConnectionIO,
)
class ProducerConsumerWorkItem(PipelineWorkItem):
def __init__(self, demo_pipe_io_wrapper) -> None:
super().__init__(auto_copy_attr=False)
self.demo_pipe_io_wrapper = demo_pipe_io_wrapper
self.producer_data = None # What producer sent.
self.producer_bytes_written = None
self.consumer_data = None # What consumer received
def stage_complete(
self,
stage_num: int, # pylint: disable=unused-argument
wi: "PipelineWorkItem", # pylint: disable=unused-argument
ex: Exception,
):
super().stage_complete(
stage_num=stage_num,
wi=wi,
ex=ex,
)
if not wi.is_failed:
if stage_num == 0:
self.producer_data = wi.producer_data
self.producer_bytes_written = wi.producer_bytes_written
elif stage_num == 1:
self.consumer_data = wi.consumer_data
class ProducerPipelineSubprocessStage(SubprocessPipelineStage):
def __init__(self) -> None:
super().__init__()
def is_for_stage(self, pwi: ProducerConsumerWorkItem) -> bool:
return True # Yes, we want to see all work items in this stage.
@property
def is_pipe_with_next_stage(self):
"""Return True to indicate we want this stage and the next one
to run in parallel, where this stage is the producer feeding a
pipeline-supplied pipe, and the next stage is consuming from that
same pipe (on the reader side).
"""
return True
def perform_stage_work(
self,
pwi: ProducerConsumerWorkItem,
**kwargs,
):
try:
if not isinstance(pwi, ProducerConsumerWorkItem):
raise ValueError(
f"Producer: Pipeline gave us unexpected work item."
)
pwi.producer_data = os.urandom(10)
if pwi.demo_pipe_io_wrapper:
# PipeConnectionIO wraps the multiprocessing Pipe, providing
# an io.RawIOBase interface (with limitations... i.e., seek is
# not supported).
with PipeConnectionIO(pwi.pipe_conn, is_write=True) as pipe_io:
pwi.producer_bytes_written = pipe_io.write(pwi.producer_data)
else:
# Just use the pipe connection directly.
pwi.pipe_conn.send_bytes(pwi.producer_data)
pwi.producer_bytes_written = len(pwi.producer_data)
return pwi
except Exception:
if not pwi.pipe_conn.closed:
pwi.pipe_conn.close()
raise
class ConsumerPipelineSubprocessStage(SubprocessPipelineStage):
def __init__(self) -> None:
super().__init__()
def is_for_stage(self, pwi: ProducerConsumerWorkItem) -> bool:
return True # Yes, we want to see all work items in this stage.
def perform_stage_work(
self,
pwi: ProducerConsumerWorkItem,
**kwargs,
):
try:
if not isinstance(pwi, ProducerConsumerWorkItem):
raise ValueError(
f"Consumer: Pipeline gave us unexpected work item."
)
if pwi.demo_pipe_io_wrapper:
# PipeConnectionIO wraps the multiprocessing Pipe, providing
# an io.RawIOBase interface (with limitations... i.e., seek is
# not supported).
with PipeConnectionIO(pwi.pipe_conn, is_write=False) as pipe_io:
pwi.consumer_data = pipe_io.read()
else:
# Just use the pipe connection directly.
pwi.consumer_data = pwi.pipe_conn.recv_bytes()
return pwi
except Exception:
if not pwi.pipe_conn.closed:
pwi.pipe_conn.close()
raise
def main():
mpp = MultiprocessingPipeline(
name="test_mp_producer_oncsoler",
stages=[
ProducerPipelineSubprocessStage(),
ConsumerPipelineSubprocessStage()
]
)
wil = [
ProducerConsumerWorkItem(demo_pipe_io_wrapper=True),
ProducerConsumerWorkItem(demo_pipe_io_wrapper=False)
]
fut = [mpp.submit(wi) for wi in wil]
done, _ = concurrent.futures.wait(
fs=fut,
return_when=concurrent.futures.ALL_COMPLETED
)
assert len(done) == 2
for i, f in enumerate(done):
r_wi: ProducerConsumerWorkItem = f.result()
if not r_wi.is_failed:
assert r_wi.producer_bytes_written == 10
assert r_wi.producer_data == r_wi.consumer_data
print(f"Success for #{i}:")
print(f" {i}: num_bytes={r_wi.producer_bytes_written}")
print(f" {i}: p_bytes={r_wi.producer_data.hex(' ')}")
print(f" {i}: c_bytes={r_wi.consumer_data.hex(' ')}")
else:
if r_wi.exceptions is not None:
print(f"Something did not go as planned for #{i}:")
for ex in r_wi.exceptions:
print(f" {i}: Bad thing happened: {ex}")
mpp.shutdown()
assert mpp.was_graceful_shutdown
if __name__ == '__main__':
freeze_support()
main()
Output:
Success for #0:
0: num_bytes=10
0: p_bytes=76 d4 32 f0 4f 3f 31 30 19 00
0: c_bytes=76 d4 32 f0 4f 3f 31 30 19 00
Success for #1:
1: num_bytes=10
1: p_bytes=34 d6 5c d0 be 82 62 c3 5d 61
1: c_bytes=34 d6 5c d0 be 82 62 c3 5d 61
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file atbu_mp_pipeline_pkg-0.0.10.tar.gz
.
File metadata
- Download URL: atbu_mp_pipeline_pkg-0.0.10.tar.gz
- Upload date:
- Size: 31.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.11
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 619879fdcbb4ac8a50121d87dee5b179c7384a06a905cbd9af2140ffbcf52c6f |
|
MD5 | ccc3f0197d75349b18963389f1037296 |
|
BLAKE2b-256 | 666e1425b061e320dc8a9edbd42e4ce1ed2cd631b40bb80fcb08a081a2cd0f3e |
File details
Details for the file atbu_mp_pipeline_pkg-0.0.10-py3-none-any.whl
.
File metadata
- Download URL: atbu_mp_pipeline_pkg-0.0.10-py3-none-any.whl
- Upload date:
- Size: 29.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.11
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e4421c1206c65b11cde7ffb4c4220670747f231bd251e15299f7e0a870ac7e53 |
|
MD5 | d3e7cf2fe67fbaf54c53a38dfe1aeb61 |
|
BLAKE2b-256 | 2e2215e577d4344bd64c7e4d9c6f202be20841e2468b98b978907e92e4ccecd7 |