Skip to main content

ATBU atbu.mp_pipeline package, a multiprocessing work item pipeline.

Project description

Python Multiprocessing Pipeline

Overview

The atbu.mp_pipeline package uses Python multiprocessing capabilities to support multi-stage pipeline capabilities, including support for dual-stage parallel execution of a producer and consumer stages, automatically providing each of those stages with pipe connection, allowing them share what is being produced/consumed.

The atbu.mp_pipeline package is currently used by the following project for supporting a backup compression pipeline stage:

Documentation: https://atbu-mp-pipeline.readthedocs.io/en/latest/

Setup

To install atbu-common-pkg:

pip install atbu-mp-pipeline-pkg

See below for a few examples. See source code for other packages mentioned above and for additional details and usage information.

Examples

Basic/Simple Pipeline Example

import os
from multiprocessing import freeze_support
from atbu.mp_pipeline.mp_pipeline import (
    MultiprocessingPipeline,
    SubprocessPipelineStage,
    PipelineWorkItem,
)

def queue_worker_func(parm_top_secret, parent_pid):
    assert parent_pid != os.getpid()
    return (
        parent_pid,
        os.getpid(),
        parm_top_secret,
    )

def int_stage0(wi: PipelineWorkItem):
    wi.user_obj[0] = 100
    return wi


def int_stage1(wi: PipelineWorkItem):
    wi.user_obj[0] = wi.user_obj[0] * 2
    wi.user_obj[1] = "stage1"
    return wi


def int_stage2(wi: PipelineWorkItem):
    wi.user_obj[0] = wi.user_obj[0] * 2
    wi.user_obj[2] = f"stage2: got this from parent: {wi.user_obj['parent']}"
    return wi


def always_yes(wi: PipelineWorkItem):
    return True

def main():
    sp = MultiprocessingPipeline(
        name="test_mp_pipeline_basic",
        stages=[
            SubprocessPipelineStage(
                fn_determiner=always_yes,
                fn_worker=int_stage0,
            ),
            SubprocessPipelineStage(
                fn_determiner=always_yes,
                fn_worker=int_stage1,
            ),
            SubprocessPipelineStage(
                fn_determiner=always_yes,
                fn_worker=int_stage2,
            ),
        ],
    )

    d = {}
    d["parent"] = "This is from parent"
    wi = PipelineWorkItem(d.copy())
    f = sp.submit(wi)
    r_wi: PipelineWorkItem = f.result()
    if not wi.is_failed:
        assert wi == r_wi # On success, should be equal.
        assert wi.user_obj["parent"] == d["parent"] # Should not change.
        # Should have values as set by stages...
        assert wi.user_obj[0] == 400
        assert wi.user_obj[1] == "stage1"
        assert wi.user_obj[2] == "stage2: got this from parent: This is from parent"
        print(f"Work item completed successfully:")
        print(f"  wi.user_obj[0]={wi.user_obj[0]}")
        print(f"  wi.user_obj[1]={wi.user_obj[1]}")
        print(f"  wi.user_obj[2]={wi.user_obj[2]}")
    else:
        print(f"Something did not go as planned:")
        for ex in r_wi.exceptions:
            print(f" ex={ex}")
    sp.shutdown()
    assert sp.was_graceful_shutdown

if __name__ == '__main__':
    freeze_support()
    main()

Output:

Work item completed successfully:
  wi.user_obj[0]=400
  wi.user_obj[1]=stage1
  wi.user_obj[2]=stage2: got this from parent: This is from parent

Large Pipeline Example

import os
from multiprocessing import freeze_support
from atbu.mp_pipeline.mp_pipeline import (
    MultiprocessingPipeline,
    SubprocessPipelineStage,
    PipelineWorkItem,
)

class LargePipelineWorkItem(PipelineWorkItem):
    def __init__(self) -> None:
        super().__init__(user_obj=self)
        self.is_ok = True
        self.num = 0
        self.pid = os.getpid()


class LargePipelineStage(SubprocessPipelineStage):
    def __init__(self) -> None:
        super().__init__()

    def is_for_stage(self, pwi: LargePipelineWorkItem) -> bool:
        # Example of stage rejecting its chance to run the work item.
        # Say "no" to every other request.
        pwi.is_ok = not pwi.is_ok
        return not pwi.is_ok

    def perform_stage_work(
        self,
        pwi: LargePipelineWorkItem,
        **kwargs,
    ):
        pwi.num += 1
        return pwi

def main():
    stages = 100
    mpp = MultiprocessingPipeline(
        max_simultaneous_work_items=min(os.cpu_count(), 15),
        name="test_mp_pipeline_large",
    )
    for _ in range(stages):
        mpp.add_stage(stage=LargePipelineStage())
    print(f"Pipeline has {mpp.num_stages} stages.")
    wi = LargePipelineWorkItem()
    f = mpp.submit(wi)
    r_wi: PipelineWorkItem = f.result()
    if not r_wi.is_failed:
        assert wi == r_wi
        assert r_wi.is_ok
        assert r_wi.num == 50
        print(f"Work item completed successfully:")
        print(f"  r_wi.is_ok={r_wi.is_ok}")
        print(f"  r_wi.pid={r_wi.pid}")
    else:
        if r_wi.exceptions is not None:
            print(f"Something did not go as planned:")
            for ex in r_wi.exceptions:
                print(f" ex={ex}")
    mpp.shutdown()
    assert mpp.was_graceful_shutdown

if __name__ == '__main__':
    freeze_support()
    main()

Output:

Pipeline has 100 stages.
Work item completed successfully:
  r_wi.is_ok=True
  r_wi.pid=8204
  r_wi.num=50

Mixed Pipeline Example

import os
from multiprocessing import freeze_support
from atbu.mp_pipeline.mp_pipeline import (
    MultiprocessingPipeline,
    PipelineStage,
    SubprocessPipelineStage,
    ThreadPipelineStage,
    PipelineWorkItem,
)

class LargePipelineWorkItem(PipelineWorkItem):
    def __init__(self) -> None:
        super().__init__(user_obj=self)
        self.is_ok = True
        self.num = 0
        self.pid = os.getpid()


class MixedPipelineSubprocessStage(PipelineStage):
    def __init__(self) -> None:
        super().__init__()

    @property
    def is_subprocess(self):
        return True

    def is_for_stage(self, pwi: LargePipelineWorkItem) -> bool:
        return True # Yes, this stage wants all work items.

    def perform_stage_work(
        self,
        pwi: LargePipelineWorkItem,
        **kwargs,
    ):
        # Run the work item.
        assert pwi.pid != os.getpid() # Subprocess should have different pid.
        pwi.num += 1
        return pwi


def perform_thread_stage_work(
    pwi: LargePipelineWorkItem,
    **kwargs,
):
    assert pwi.pid == os.getpid() # Thread should have same process.
    pwi.num += 1
    return pwi

def main():
    mpp = MultiprocessingPipeline(
        name="mp_pipeline_large_mixed",
        max_simultaneous_work_items=min(os.cpu_count(), 15),
    )
    for _ in range(10):
        # Add a stage using our derived-class pipeline stage.
        mpp.add_stage(stage=MixedPipelineSubprocessStage())
        # Add a stage using the library's thread pipeline stage,
        # where we specify the callable to call for both asking
        # the pipeline if it wants to run a given work item, and
        # and other to actually do the work.
        mpp.add_stage(
            stage=ThreadPipelineStage(
                fn_determiner=lambda pwi: True, # Run all work items.
                fn_worker=perform_thread_stage_work, # Call this to run them.
            )
        )
    print(f"Pipeline has {mpp.num_stages} stages.")
    wi = LargePipelineWorkItem()
    f = mpp.submit(wi)
    r_wi: PipelineWorkItem = f.result()
    if not r_wi.is_failed:
        assert wi == r_wi
        assert not r_wi.is_failed
        assert r_wi.is_ok
        assert r_wi.num == 20
        assert r_wi.pid == os.getpid()
        print(f"Work item completed successfully:")
        print(f"  r_wi.num={r_wi.num}")
        print(f"  r_wi.pid={r_wi.pid}")
    else:
        if r_wi.exceptions is not None:
            print(f"Something did not go as planned:")
            for ex in r_wi.exceptions:
                print(f" ex={ex}")
    mpp.shutdown()
    assert mpp.was_graceful_shutdown

if __name__ == '__main__':
    freeze_support()
    main()

Output:

Pipeline has 20 stages.
Work item completed successfully:
  r_wi.num=20
  r_wi.pid=18068

Dual-stage Producer/Consumer Example

import os
from multiprocessing import freeze_support
import concurrent.futures
from atbu.mp_pipeline.mp_pipeline import (
    MultiprocessingPipeline,
    SubprocessPipelineStage,
    PipelineWorkItem,
    PipeConnectionIO,
)

class ProducerConsumerWorkItem(PipelineWorkItem):
    def __init__(self, demo_pipe_io_wrapper) -> None:
        super().__init__(auto_copy_attr=False)
        self.demo_pipe_io_wrapper = demo_pipe_io_wrapper
        self.producer_data = None # What producer sent.
        self.producer_bytes_written = None
        self.consumer_data = None # What consumer received

    def stage_complete(
        self,
        stage_num: int,  # pylint: disable=unused-argument
        wi: "PipelineWorkItem",  # pylint: disable=unused-argument
        ex: Exception,
    ):
        super().stage_complete(
            stage_num=stage_num,
            wi=wi,
            ex=ex,
        )
        if not wi.is_failed:
            if stage_num == 0:
                self.producer_data = wi.producer_data
                self.producer_bytes_written = wi.producer_bytes_written
            elif stage_num == 1:
                self.consumer_data = wi.consumer_data


class ProducerPipelineSubprocessStage(SubprocessPipelineStage):
    def __init__(self) -> None:
        super().__init__()

    def is_for_stage(self, pwi: ProducerConsumerWorkItem) -> bool:
        return True # Yes, we want to see all work items in this stage.

    @property
    def is_pipe_with_next_stage(self):
        """Return True to indicate we want this stage and the next one
        to run in parallel, where this stage is the producer feeding a
        pipeline-supplied pipe, and the next stage is consuming from that
        same pipe (on the reader side).
        """
        return True

    def perform_stage_work(
        self,
        pwi: ProducerConsumerWorkItem,
        **kwargs,
    ):
        try:
            if not isinstance(pwi, ProducerConsumerWorkItem):
                raise ValueError(
                    f"Producer: Pipeline gave us unexpected work item."
                )
            pwi.producer_data = os.urandom(10)
            if pwi.demo_pipe_io_wrapper:
                # PipeConnectionIO wraps the multiprocessing Pipe, providing
                # an io.RawIOBase interface (with limitations... i.e., seek is
                # not supported).
                with PipeConnectionIO(pwi.pipe_conn, is_write=True) as pipe_io:
                    pwi.producer_bytes_written = pipe_io.write(pwi.producer_data)
            else:
                # Just use the pipe connection directly.
                pwi.pipe_conn.send_bytes(pwi.producer_data)
                pwi.producer_bytes_written = len(pwi.producer_data)
            return pwi
        except Exception:
            if not pwi.pipe_conn.closed:
                pwi.pipe_conn.close()
            raise


class ConsumerPipelineSubprocessStage(SubprocessPipelineStage):
    def __init__(self) -> None:
        super().__init__()

    def is_for_stage(self, pwi: ProducerConsumerWorkItem) -> bool:
        return True # Yes, we want to see all work items in this stage.

    def perform_stage_work(
        self,
        pwi: ProducerConsumerWorkItem,
        **kwargs,
    ):
        try:
            if not isinstance(pwi, ProducerConsumerWorkItem):
                raise ValueError(
                    f"Consumer: Pipeline gave us unexpected work item."
                )
            if pwi.demo_pipe_io_wrapper:
                # PipeConnectionIO wraps the multiprocessing Pipe, providing
                # an io.RawIOBase interface (with limitations... i.e., seek is
                # not supported).
                with PipeConnectionIO(pwi.pipe_conn, is_write=False) as pipe_io:
                    pwi.consumer_data = pipe_io.read()
            else:
                # Just use the pipe connection directly.
                pwi.consumer_data = pwi.pipe_conn.recv_bytes()
            return pwi
        except Exception:
            if not pwi.pipe_conn.closed:
                pwi.pipe_conn.close()
            raise

def main():
    mpp = MultiprocessingPipeline(
        name="test_mp_producer_oncsoler",
        stages=[
            ProducerPipelineSubprocessStage(),
            ConsumerPipelineSubprocessStage()
        ]
    )
    wil = [
        ProducerConsumerWorkItem(demo_pipe_io_wrapper=True),
        ProducerConsumerWorkItem(demo_pipe_io_wrapper=False)
    ]
    fut = [mpp.submit(wi) for wi in wil]
    done, _ = concurrent.futures.wait(
        fs=fut,
        return_when=concurrent.futures.ALL_COMPLETED
    )
    assert len(done) == 2
    for i, f in enumerate(done):
        r_wi: ProducerConsumerWorkItem = f.result()
        if not r_wi.is_failed:
            assert r_wi.producer_bytes_written == 10
            assert r_wi.producer_data == r_wi.consumer_data
            print(f"Success for #{i}:")
            print(f"  {i}: num_bytes={r_wi.producer_bytes_written}")
            print(f"  {i}: p_bytes={r_wi.producer_data.hex(' ')}")
            print(f"  {i}: c_bytes={r_wi.consumer_data.hex(' ')}")
        else:
            if r_wi.exceptions is not None:
                print(f"Something did not go as planned for #{i}:")
                for ex in r_wi.exceptions:
                    print(f"  {i}: Bad thing happened: {ex}")
    mpp.shutdown()
    assert mpp.was_graceful_shutdown

if __name__ == '__main__':
    freeze_support()
    main()

Output:

Success for #0:
  0: num_bytes=10
  0: p_bytes=76 d4 32 f0 4f 3f 31 30 19 00
  0: c_bytes=76 d4 32 f0 4f 3f 31 30 19 00
Success for #1:
  1: num_bytes=10
  1: p_bytes=34 d6 5c d0 be 82 62 c3 5d 61
  1: c_bytes=34 d6 5c d0 be 82 62 c3 5d 61

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

atbu-mp-pipeline-pkg-0.0.9.tar.gz (31.9 kB view hashes)

Uploaded Source

Built Distribution

atbu_mp_pipeline_pkg-0.0.9-py3-none-any.whl (29.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page