Skip to main content

Python package implementing a full featured producer/consumer pattern for concurrent workers

Project description

Advanced Producer-Consumer

https://www.travis-ci.com/acreegan/adv_prodcon.svg?branch=main Documentation Status Updates

Advanced Producer-Consumer is a python package implementing a full featured producer-consumer pattern for concurrent workers. This is useful for developing data acquisition programs or programs that involve real-time data processing while maintatining a responsive UI. It is compatible with PyQt5, which allows it to be used to develop graphical data acquisition and visualisation applications.

Features

  • Producer and Consumer background workers are defined as metaclasses that can be extended by the user. They implement work functions that run in separate processes.

  • Consumers are buffered and can be configured to run based on a timeout, a max buffer size, or both.

  • The queues that connect Producers to Consumers can be either non-lossy or lossy.

  • Producers and Consumers are connected by a subscription model. Producers can have multiple consumers subscribed to them. Consumers can be subscribed to multiple Producers.

  • Producers and Consumers have on_start and on_stop functions that can be defined to run code for setup and teardown.

  • Results from Consumers (and Producers) can be accessed in the main process through a user-defined callback.

  • User defined functions can be defined to communicate between the main process and the work functions.

  • Compatible with PyQt5, which allows development of graphical data acquisition and visualisation applications.

Installation

To install Advanced Producer Consumer, run this command in your terminal:

$ pip install adv_prodcon

Quick Start

The following is a quick example of how to use the adv_prodcon package

import adv_prodcon
import time
from itertools import count

Imports.

class ExampleProducer(adv_prodcon.Producer):

    @staticmethod
    def on_start(state, message_pipe, *args, **kwargs):
        return {"count": count()}

    @staticmethod
    def work(on_start_result, state, message_pipe, *args):
        return next(on_start_result["count"])

Define a Producer class. Here we are using the on_start method to establish a itertools.count iterator. This is made available in the work function through the on_start_result argument. The work function will return the next count each time it is run.

class ExampleConsumer(adv_prodcon.Consumer):

    @staticmethod
    def work(items, on_start_result, state, message_pipe, *args):
        return f"Got :{items} from producer"

    def on_result_ready(self, result):
        print(result)

Define a Consumer Class. This Consumer will just be used as a buffer, returning a string with the items received from the Producer. The on_result_ready function is called when the main process receives the result of the work function. Here we are just printing out the result.

if __name__ == "__main__":
    example_producer = ExampleProducer(work_timeout=1)
    example_consumer = ExampleConsumer(work_timeout=2,
                                       max_buffer_size=1000)

    example_producer.set_subscribers([example_consumer.get_work_queue()])
    example_producer.start_new()
    example_consumer.start_new()

    time.sleep(10)

In the main code block, we create an instance of both our ExampleProducer and our ExampleConsumer. We set the work_timeout of the ExampleProducer to 1 so that it runs once per second. We set the work_timeout of the ExampleConsumer to 2 so that every 2 seconds it performs work on all items in its queue. The max_buffer_size is set high so that the ExampleConsumer is controlled by its work_timeout.

The output of this code is shown below:

Got :[0, 1] from producer
Got :[2, 3] from producer
Got :[4, 5] from producer
Got :[6, 7] from producer

Process finished with exit code 0

Note that the output may be slightly different depending on the time taken to start the worker processes.

Credits

History

0.1.0 (2021-05-05)

  • First release on PyPI.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

adv_prodcon-0.1.14.tar.gz (54.8 kB view details)

Uploaded Source

Built Distribution

adv_prodcon-0.1.14-py2.py3-none-any.whl (8.6 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file adv_prodcon-0.1.14.tar.gz.

File metadata

  • Download URL: adv_prodcon-0.1.14.tar.gz
  • Upload date:
  • Size: 54.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.8.7

File hashes

Hashes for adv_prodcon-0.1.14.tar.gz
Algorithm Hash digest
SHA256 65526e0f49a7662f683e93e6a2963d1c06892c52aa4f6b135bac6594e989b1e6
MD5 8386403fbd73bdfab8419443e4f0116e
BLAKE2b-256 20621b25e2678704805246ce4aa6c364b262c54a21eb5f31d4edcd11e6ffb8ce

See more details on using hashes here.

File details

Details for the file adv_prodcon-0.1.14-py2.py3-none-any.whl.

File metadata

  • Download URL: adv_prodcon-0.1.14-py2.py3-none-any.whl
  • Upload date:
  • Size: 8.6 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.8.7

File hashes

Hashes for adv_prodcon-0.1.14-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 3f864ca932246d689de0db4bffb8e7aa9e49d965367ac7bb344fc83afdb2aa5d
MD5 afebf0bdeffd29683e9fb59b0676a723
BLAKE2b-256 c4d6e03cb5bba0a69e4b3f87c7fc3110b7170752f29f5e441edd4b93266d5b4a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page