Skip to main content

Manage producer and consumer workers

Project description

warehut

Installation

pip install warehut

Example

import time
import datetime
import random

from warehut import Warehut
from warehut import Producer
from warehut import Consumer, listen

# There also exists a `Hybrid` worker,
# that can both listen to events and
# when responding, can forward to other consumers.
# Use this with caution and consideration.

from warehut.worker import Worker



class MyWarehut(Warehut):
    def handle_error(self, worker_type: type[Worker], exception: Exception):
        # Handle exception raised within a worker process
        # You could launch a window, write the error to a log file, etc.

        # Stopping all other workers
        self.stop()

        # And printing the error
        print(
            f'\nWorker of type {worker_type.__name__} '
            f'encountered an error.\n{exception!r}\n')



class MyProducer(Producer):

    async def __aenter__(self):
        print(f'Preparing environment for {self}')

    async def __aexit__(self, exc_type, exc_value, trace):
        print(f'Gracefully closing environment for {self}')
    
    async def update(self):
        # Put whatever you want your producer to do, here.
        # It can read from any source and `forward` data to consumer queues.
        # This method is run on repeat in-between status checks of the worker.

        # Forward the current timestamp to 'ping'
        self.forward('ping', time.time())
        print('Pinged!')
        
        # Sleep a random amount of time to create obvious offset
        time.sleep(random.random() * 2)

        # Forward a randomly generated number to 'random'
        self.forward('random', random.randint(0, 100))
        print('Randomed!')

        # Sleep again ... same reason.
        time.sleep(random.random() * 2)


class MyConsumer(Consumer):
    
    async def __aenter__(self):
        print(f'Preparing environment for {self}')

    async def __aexit__(self, exc_type, exc_value, trace):
        print(f'Gracefully closing environment for {self}')


    # `listen` defines a function to be called with data
    # that is labeled with the specified event name.
    
    @listen('ping')
    async def on_ping(self, timestamp):
        """Print the time at which a ping was sent"""        
        timestamp = datetime.datetime.fromtimestamp(int(timestamp))
        print(f'Ping at {timestamp.strftime("%Y-%m-%d %H:%M:%S")}')
    
    @listen('random')
    async def on_random(self, number):
        """Print generated random numbers"""

        if number > 80:
            raise RuntimeError('An error to show off the `Warehut` error handler')

        print(f'Random number generated: {number}')



if __name__ == '__main__':
    
    with MyWarehut([MyProducer, MyConsumer]):
        # `Warehut.start` is called upon entering the context
        input('\nPress Enter to exit\n\n')
        # `Warehut.stop` is called upon exiting the context


    # They can be called on their own with the same effect
    # 
    # Ex.
    # warehut = MyWarehut([MyProducer, MyConsumer])
    # warehut.start()
    # input('\nPress Enter to exit\n\n')
    # warehut.stop()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

warehut-3.2.1.tar.gz (17.8 kB view details)

Uploaded Source

Built Distribution

warehut-3.2.1-py3-none-any.whl (18.7 kB view details)

Uploaded Python 3

File details

Details for the file warehut-3.2.1.tar.gz.

File metadata

  • Download URL: warehut-3.2.1.tar.gz
  • Upload date:
  • Size: 17.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.11 CPython/3.9.7 Linux/5.10.79-1-MANJARO

File hashes

Hashes for warehut-3.2.1.tar.gz
Algorithm Hash digest
SHA256 3a2eb22a0811912f4352751644971d9e798451e5b412db14454faec2def51127
MD5 e57c61e778c23ccc28301e813b646e66
BLAKE2b-256 daf92b3a8662ac835402d5f94a55e716951e5e6161c41130c230d4dc363ceb90

See more details on using hashes here.

File details

Details for the file warehut-3.2.1-py3-none-any.whl.

File metadata

  • Download URL: warehut-3.2.1-py3-none-any.whl
  • Upload date:
  • Size: 18.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.11 CPython/3.9.7 Linux/5.10.79-1-MANJARO

File hashes

Hashes for warehut-3.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9ba39d702f1c52660a2a6cb627df5cbe98d634a83a0c737b195c9edb3cccda3e
MD5 66133a27c2ea961c99c13bc3ff669dd9
BLAKE2b-256 97f06c384f9f3e55162cf21bc19ff8910a2a4c2adc2e1a3b2b73bfce17f7621a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page